From 8ed280126c1647a3353d1b58f5b8a6b50dd672e7 Mon Sep 17 00:00:00 2001 From: DcruBro Date: Thu, 15 Jan 2026 19:56:18 +0100 Subject: [PATCH] test data sending --- include/common/packettype.h | 4 +- src/client/main.c | 289 +++++++++++++++++++++++++++++++++++- src/common/dynarr.c | 18 ++- src/common/task/task.c | 9 +- src/server/main.c | 64 +++++++- src/server/task/taskqueue.c | 2 +- test_computation | Bin 0 -> 33648 bytes 7 files changed, 371 insertions(+), 15 deletions(-) create mode 100755 test_computation diff --git a/include/common/packettype.h b/include/common/packettype.h index fa5a467..54d3d76 100644 --- a/include/common/packettype.h +++ b/include/common/packettype.h @@ -9,7 +9,9 @@ typedef enum { PACKET_TYPE_STATUS_UPDATE = 4, PACKET_TYPE_CLIENT_CAPABILITIES = 5, PACKET_TYPE_TASK_REQUEST = 6, - PACKET_TYPE_MISSING_INFO = 7 + PACKET_TYPE_MISSING_INFO = 7, + PACKET_TYPE_ACKNOWLEDGE = 8, + PACKET_TYPE_TASK_REJECT = 9 } PacketType; #endif diff --git a/src/client/main.c b/src/client/main.c index 63f8805..a771d13 100644 --- a/src/client/main.c +++ b/src/client/main.c @@ -1,6 +1,293 @@ #include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +char dataBuf[1500]; +DynArr* capabilities; +int sock; + +char command[256]; + +char LAST_PACKET_SENT = PACKET_TYPE_NONE; + +void on_data(ssize_t dataBufLen) { + char type = dataBuf[0]; + switch (type) { + case PACKET_TYPE_HELLO: + printf("Received HELLO packet from server\n"); + + // Respond with capabilities + if (capabilities) { + DynArr* packet = DYNARR_CREATE(uint8_t, 1); + uint8_t packetType = PACKET_TYPE_CLIENT_CAPABILITIES; + DynArr_push_back(packet, &packetType); + + for (size_t i = 0; i < DynArr_size(capabilities); i++) { + capability_t* cap = (capability_t*)DynArr_at(capabilities, i); + char* capPtr = (char*)cap; + for (size_t j = 0; j < sizeof(capability_t); j++) { + DynArr_push_back(packet, &capPtr[j]); + } + } + + // Send + send(sock, packet->data, DynArr_size(packet) * sizeof(uint8_t), 0); + + DynArr_destroy(packet); + printf("Sent capabilities to server\n"); + + LAST_PACKET_SENT = PACKET_TYPE_CLIENT_CAPABILITIES; + } + + break; + case PACKET_TYPE_ACKNOWLEDGE: { + switch (LAST_PACKET_SENT) { + case PACKET_TYPE_CLIENT_CAPABILITIES: + printf("Server acknowledged CLIENT_CAPABILITIES packet\n"); + // Get task + + // For simplicity, just request a task immediately + char taskRequestPacket[1]; + taskRequestPacket[0] = PACKET_TYPE_TASK_REQUEST; + send(sock, taskRequestPacket, 1, 0); + printf("Requested task from server\n"); + LAST_PACKET_SENT = PACKET_TYPE_TASK_REQUEST; + + break; + default: + printf("Server acknowledged unknown packet type %u\n", LAST_PACKET_SENT); + break; + } + break; + } + case PACKET_TYPE_TASK_ASSIGN: + task_t task; + memcpy(&task, &dataBuf[1], sizeof(task_t)); + printf("Received TASK_ASSIGN packet from server for task %u\n", task.taskId); + + DynArr* execArgs = DYNARR_CREATE(char*, 4); + // RESULT report is ACKNOWLEDGE for now + // Fork with binary name + pid_t pid = fork(); + if (pid < 0) { + perror("Fork failed"); + // TODO; Send TASK_REJECT packet to server to inform that task could not be processed + return; + } + + printf("Data Size: %zu\n", dataBufLen); + + char** argv = NULL; + + if (pid == 0) { + // Child with args + // Jump to special sequence to find args "ARGSTART" + size_t offset = 1 + sizeof(task_t); + while (offset < dataBufLen - 8) { + if (memcmp(&dataBuf[offset], "ARGSTART", 8) == 0) { + offset += 8; + break; + } + offset++; + } + + printf("Args Offset: %zu\n", offset); + printf("Args length: %zu\n", dataBufLen - offset); + + // Parse args + DynArr_push_back(execArgs, task.binary); // First arg is binary name + while (offset < dataBufLen) { + //if (memcmp(&dataBuf[offset], "ARGNEXT", 7) == 0) { + // offset += 7; + // continue; + //} + + // Read raw arg as 32 bytes (name) + //char argName[32]; + //memcpy(argName, &dataBuf[offset], 32); + //offset += 32; + //printf("Arg Name: %s\n", argName); + + // Ignore name for now; just pass raw data + offset += 32; + + // Find position of next ARGNEXT or end + size_t argDataStart = offset; + size_t argDataEnd = dataBufLen;; + for (size_t i = offset; i < dataBufLen - 7; i++) { + if (memcmp(&dataBuf[i], "ARGNEXT", 7) == 0) { + argDataEnd = i; + break; + } + } + + if (argDataEnd <= argDataStart) { + printf("No more arg data found\n"); + break; + } + + printf("Arg Data Start: %zu\n", argDataStart); + printf("Arg Data End: %zu\n", argDataEnd); + + size_t argDataLen = argDataEnd - argDataStart; + printf("Arg Data Length: %zu\n", argDataLen); + + char* argData = malloc(argDataLen); + memcpy(argData, &dataBuf[argDataStart], argDataLen); // No null-termination; this is raw data + + offset += argDataLen; + } + DynArr_push_back(execArgs, NULL); // Null-terminate args array + + // Convert into char** + argv = malloc(DynArr_size(execArgs) * sizeof(char*) + 1); + for (size_t i = 0; i < DynArr_size(execArgs); i++) { + char** argPtr = (char**)DynArr_at(execArgs, i); + argv[i] = *argPtr; + } + argv[DynArr_size(execArgs)] = NULL; // Null-terminate + + printf("Executing task %u: %s\n", task.taskId, task.binary); + char execName[66]; + snprintf(execName, sizeof(execName), "./%s", task.binary); // TODO; Improve + + // Exec + execvp(execName, (char* const*)argv); + // If execvp returns, there was an error + perror("execvp failed"); + exit(1); // Crash child + + // SEND result packet later + } else { + int status; + waitpid(pid, &status, 0); + + if (WIFEXITED(status)) { + int exitStatus = WEXITSTATUS(status); + printf("Task %u completed with exit status %d\n", task.taskId, exitStatus); + + if (exitStatus == 0) { + // Send TASK_RESULT packet + DynArr* packet = DYNARR_CREATE(uint8_t, 1); + uint8_t packetType = PACKET_TYPE_TASK_RESULT; + DynArr_push_back(packet, &packetType); + + // Read from ./output.bin + FILE* outFile = fopen("output.bin", "rb"); + if (!outFile) { + perror("Failed to open output.bin"); + } + int byte; + while ((byte = fgetc(outFile)) != EOF) { + uint8_t b = (uint8_t)byte; + DynArr_push_back(packet, &b); + } + fclose(outFile); + + // Send + send(sock, packet->data, DynArr_size(packet) * sizeof(uint8_t), 0); + printf("Sent TASK_RESULT packet for task %u to server\n", task.taskId); + + DynArr_destroy(packet); + } else { + // TODO; Send TASK_REJECT packet to server to inform that task failed + break; + } + + } else { + printf("Task %u did not complete successfully\n", task.taskId); + } + } + + for (size_t i = 0; i < DynArr_size(execArgs); i++) { + char** argPtr = (char**)DynArr_at(execArgs, i); + if (argPtr && *argPtr) { + free(*argPtr); + } + } + + DynArr_destroy(execArgs); + free(argv); + + break; + default: + printf("Received unknown packet type %u from server\n", type); + break; + } +} + +void* TCPRoutine(void* arg) { + while (1) { + // Check for incoming data on recv + memset(dataBuf, 0, sizeof(dataBuf)); + ssize_t n = recv(sock, dataBuf, sizeof(dataBuf), 0); + if (n == 0) { + printf("Server disconnected\n"); + break; + } else if (n > 0) { + on_data(n); + } + + // Sleep 10 millis + usleep(10000); + } + + return NULL; +} int main(void) { - printf("Hello, World!\n"); + srand(time(NULL)); + // Temporary; move to TcpClient (non-server) impl. later + struct sockaddr_in serverAddr; + memset(&serverAddr, 0, sizeof(serverAddr)); + serverAddr.sin_family = AF_INET; + serverAddr.sin_port = htons(6175); + serverAddr.sin_addr.s_addr = inet_addr("127.0.0.1"); + sock = socket(AF_INET, SOCK_STREAM, 0); + if (sock < 0) { + perror("Socket creation failed"); + return 1; + } + + if (connect(sock, (struct sockaddr*)&serverAddr, sizeof(serverAddr)) < 0) { + perror("Connection to server failed"); + close(sock); + return 1; + } + + capabilities = DYNARR_CREATE(capability_t, 4); + // Example capabilitys + capability_t cap1 = { "test_computation", 1 }; + DynArr_push_back(capabilities, &cap1); + + // Start thread + pthread_t tcpThread; + pthread_create(&tcpThread, NULL, TCPRoutine, NULL); + + while (1) { + memset(command, 0, sizeof(command)); + printf("Enter command (type 'exit' to quit): "); + if (fgets(command, sizeof(command), stdin) == NULL) { + printf("Error reading command\n"); + continue; + } + } + + pthread_cancel(tcpThread); + pthread_join(tcpThread, NULL); + DynArr_destroy(capabilities); + + close(sock); return 0; } diff --git a/src/common/dynarr.c b/src/common/dynarr.c index 8aa2378..5bfb84e 100644 --- a/src/common/dynarr.c +++ b/src/common/dynarr.c @@ -37,12 +37,12 @@ void DynArr_reserve(DynArr* p, size_t n) { p->capacity = n; } -// Push data into a new block at the end of the array +// Push data into a new block at the end of the array; If value is NULL, the new block will be zeroed. void* DynArr_push_back(DynArr* p, void* value) { - if (value == NULL) { - printf("push_back ignored; value is null"); - return NULL; - } + //if (value == NULL) { + // printf("push_back ignored; value is null"); + // return NULL; + //} if (p->size >= p->capacity) { size_t new_cap = (p->capacity == 0) ? 1 : p->capacity * 2; @@ -62,7 +62,13 @@ void* DynArr_push_back(DynArr* p, void* value) { } void* dst = (void*)((char*)p->data + (p->size * p->elemSize)); - memcpy((char*)dst, value, p->elemSize); + + if (value == NULL) { + memset(dst, 0, p->elemSize); // Handle NULL value. + } else { + memcpy((char*)dst, value, p->elemSize); + } + p->size++; return dst; diff --git a/src/common/task/task.c b/src/common/task/task.c index 47a33cb..2db1d7d 100644 --- a/src/common/task/task.c +++ b/src/common/task/task.c @@ -16,7 +16,7 @@ void Task_Create(task_t* tsk) { tsk->state = TASK_NONE; memset(tsk->binary, 0, 64); tsk->args = DYNARR_CREATE(task_arg_t, 1); - tsk->results = DYNARR_CREATE(task_arg_t, 1); + tsk->results = DYNARR_CREATE(uint8_t, 1); } void Task_DestroyArgs(task_t* task) { @@ -35,6 +35,13 @@ void Task_DestroyArgs(task_t* task) { return; } + for (size_t i = 0; i < DynArr_size(task->args); i++) { + task_arg_t* arg = (task_arg_t*)DynArr_at(task->args, i); + if (arg && arg->data) { + DynArr_destroy(arg->data); + } + } + DynArr_destroy(task->args); DynArr_destroy(task->results); } diff --git a/src/server/main.c b/src/server/main.c index fd6a189..e08c944 100644 --- a/src/server/main.c +++ b/src/server/main.c @@ -57,6 +57,15 @@ void on_data(TcpClient* client) { } } + // TODO; Move inside check, this is redundant + if (capabilitiesCount > 0) { + // Send ACKNOWLEDGE; Might embed which packet is being acknowledged later + char ackPacket[1]; + ackPacket[0] = PACKET_TYPE_ACKNOWLEDGE; + send(client->clientFd, ackPacket, 1, 0); + printf("Sent ACKNOWLEDGE to client %u\n", client->clientId); + } + break; case PACKET_TYPE_TASK_REQUEST: printf("Received TASK_REQUEST packet from client %u\n", client->clientId); @@ -74,10 +83,18 @@ void on_data(TcpClient* client) { break; } + int assigned = 0; + + printf("Task count: %d", (int)DynArr_size(taskQueue.tasks)); + // Find a task that matches capabilities and assign it - for (size_t i = 0; i < DynArr_size(&taskQueue.tasks); i++) { - task_t* task = (task_t*)DynArr_at(&taskQueue.tasks, i); - int assigned = 0; + for (size_t i = 0; i < DynArr_size(taskQueue.tasks); i++) { + //printf("Iterated!\n"); + task_t* task = (task_t*)DynArr_at(taskQueue.tasks, i); + + if (!task) { + continue; + } if (task->state == TASK_PENDING) { // For simplicity, assign the first pending task with capabilities @@ -97,7 +114,36 @@ void on_data(TcpClient* client) { DynArr_push_back(packet, &packetType); // Push back the task struct - DynArr_push_back(packet, task); + for (size_t k = 0; k < sizeof(task_t); k++) { + DynArr_push_back(packet, ((char*)task) + k); + } + + // Special sequence to indicate start of args + char sequence[] = "ARGSTART"; + for (size_t k = 0; k < sizeof(sequence); k++) { + DynArr_push_back(packet, &sequence[k]); + } + + // Push back args + for (size_t k = 0; k < DynArr_size(task->args); k++) { + task_arg_t* arg = (task_arg_t*)DynArr_at(task->args, k); + for (size_t m = 0; m < sizeof(task_arg_t); m++) { + DynArr_push_back(packet, ((char*)arg) + m); + } + // Push back arg data + for (size_t m = 0; m < DynArr_size(arg->data); m++) { + char* dataPtr = (char*)DynArr_at(arg->data, m); + for (size_t n = 0; n < sizeof(char); n++) { + DynArr_push_back(packet, dataPtr + n); + } + } + + // Push back next arg indicator + char nextArgIndicator[] = "ARGNEXT"; + for (size_t n = 0; n < sizeof(nextArgIndicator); n++) { + DynArr_push_back(packet, &nextArgIndicator[n]); + } + } send(client->clientFd, packet->data, DynArr_size(packet) * sizeof(uint8_t), 0); printf("Assigned task %u to client %u\n", task->taskId, client->clientId); @@ -131,6 +177,7 @@ void on_disconnect(TcpClient* client) { } int main(void) { + srand(time(NULL)); signal(SIGINT, signalHandler); TaskQueue_Init(&taskQueue); @@ -139,8 +186,15 @@ int main(void) { task_t task1; Task_Create(&task1); task1.state = TASK_PENDING; - + task1.args = DYNARR_CREATE(task_arg_t, 1); + strcpy(task1.binary, "test_computation"); + task_arg_t* blk = DynArr_push_back(task1.args, &(task_arg_t){ .name = "input_data", .data = DYNARR_CREATE(char, 1) }); + DynArr_push_back(blk->data, "12"); // Compute the square of 12 + task1.taskId = 1; + TaskQueue_AddTask(&taskQueue, &task1); + //Task_DestroyArgs(&task1); + printf("Task added!\n"); TcpServer* svr = TcpServer_Create(); diff --git a/src/server/task/taskqueue.c b/src/server/task/taskqueue.c index 0884591..d2107e1 100644 --- a/src/server/task/taskqueue.c +++ b/src/server/task/taskqueue.c @@ -63,7 +63,7 @@ void TaskQueue_RemoveTask(task_queue_t* queue, size_t idx) { task_t* ptr = DynArr_at(queue->tasks, idx); Task_DestroyArgs(ptr); - DynArr_remove(queue->tasks, n); + DynArr_remove(queue->tasks, idx); } void TaskQueue_RemoveTaskByPtr(task_queue_t* queue, task_t* task) { diff --git a/test_computation b/test_computation new file mode 100755 index 0000000000000000000000000000000000000000..2b10b15a1bd19594eb4455b5790a42256bbe9730 GIT binary patch literal 33648 zcmeI5ZHQE56vv-Cv)ZnjJL$4kYwjpitBKkRSYa`n4l0?hy5-V`9{1($dTDoNow=i# zmRH2cO4^Zyj1X4BS|RC$qU}RuELlRJI1(?LCc{zX{N~QqqRe zDJm=?P^~+XfyZ#&c=IsVm6*5Q_taPzqItQx&z3D+{$za7()=EAO7em)(k;nABp=bd zJfxH(`N>n3Iv@1?J@6$LVRJil2!1(QXPZ(DXvagP4T{Va4dxjf7rO!L^UppwHW5w~ zDr0jq>D1g#t7UH6rp`9_A~jISWB<6%r9qYbYOr>oZ+q>)zHsem|8_l4 zUZK`IhH6BZs{XJZEjMDhGhvmjGgJ-%dz4e1#qXrIPC2XkvG!hno*EO&q3+(k!D$2i zgU<~Vtr@D$&cm4WAApv>R8cchqrPzC{aKyGy?Ulkosyk~Im6>sZe)BcS8k}!BnNj+ z-IOe9EZlP(`cGlbaI4C0Q_k@j&`IvvSPo;oQj@Ih8`J}rHZMGYdb5YZ`X}Vj6A8s~ zDpc#)V*~cMfITr_PYT%8p8a&Xxx;)kdUrayAd%^6F;kC^Q;U-}dfHSnV|QolIW1N~ zZER8TR4SQ@CcDi9{(YkDR;MWs1-YrIVnBke>D!Q6+`t_~KosgeiX+*?y>!?OsiGt*;>-<^RXbUCI@a7eKIbS_E zet(JoNQqxx;>-6F