test data sending

This commit is contained in:
2026-01-15 19:56:18 +01:00
parent e68ae73a87
commit 8ed280126c
7 changed files with 371 additions and 15 deletions

View File

@@ -9,7 +9,9 @@ typedef enum {
PACKET_TYPE_STATUS_UPDATE = 4, PACKET_TYPE_STATUS_UPDATE = 4,
PACKET_TYPE_CLIENT_CAPABILITIES = 5, PACKET_TYPE_CLIENT_CAPABILITIES = 5,
PACKET_TYPE_TASK_REQUEST = 6, PACKET_TYPE_TASK_REQUEST = 6,
PACKET_TYPE_MISSING_INFO = 7 PACKET_TYPE_MISSING_INFO = 7,
PACKET_TYPE_ACKNOWLEDGE = 8,
PACKET_TYPE_TASK_REJECT = 9
} PacketType; } PacketType;
#endif #endif

View File

@@ -1,6 +1,293 @@
#include <stdio.h> #include <stdio.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <string.h>
#include <unistd.h>
#include <common/packettype.h>
#include <stdlib.h>
#include <time.h>
#include <common/capability.h>
#include <common/task/task.h>
#include <common/dynarr.h>
#include <pthread.h>
#include <sys/types.h>
#include <sys/wait.h>
char dataBuf[1500];
DynArr* capabilities;
int sock;
char command[256];
char LAST_PACKET_SENT = PACKET_TYPE_NONE;
void on_data(ssize_t dataBufLen) {
char type = dataBuf[0];
switch (type) {
case PACKET_TYPE_HELLO:
printf("Received HELLO packet from server\n");
// Respond with capabilities
if (capabilities) {
DynArr* packet = DYNARR_CREATE(uint8_t, 1);
uint8_t packetType = PACKET_TYPE_CLIENT_CAPABILITIES;
DynArr_push_back(packet, &packetType);
for (size_t i = 0; i < DynArr_size(capabilities); i++) {
capability_t* cap = (capability_t*)DynArr_at(capabilities, i);
char* capPtr = (char*)cap;
for (size_t j = 0; j < sizeof(capability_t); j++) {
DynArr_push_back(packet, &capPtr[j]);
}
}
// Send
send(sock, packet->data, DynArr_size(packet) * sizeof(uint8_t), 0);
DynArr_destroy(packet);
printf("Sent capabilities to server\n");
LAST_PACKET_SENT = PACKET_TYPE_CLIENT_CAPABILITIES;
}
break;
case PACKET_TYPE_ACKNOWLEDGE: {
switch (LAST_PACKET_SENT) {
case PACKET_TYPE_CLIENT_CAPABILITIES:
printf("Server acknowledged CLIENT_CAPABILITIES packet\n");
// Get task
// For simplicity, just request a task immediately
char taskRequestPacket[1];
taskRequestPacket[0] = PACKET_TYPE_TASK_REQUEST;
send(sock, taskRequestPacket, 1, 0);
printf("Requested task from server\n");
LAST_PACKET_SENT = PACKET_TYPE_TASK_REQUEST;
break;
default:
printf("Server acknowledged unknown packet type %u\n", LAST_PACKET_SENT);
break;
}
break;
}
case PACKET_TYPE_TASK_ASSIGN:
task_t task;
memcpy(&task, &dataBuf[1], sizeof(task_t));
printf("Received TASK_ASSIGN packet from server for task %u\n", task.taskId);
DynArr* execArgs = DYNARR_CREATE(char*, 4);
// RESULT report is ACKNOWLEDGE for now
// Fork with binary name
pid_t pid = fork();
if (pid < 0) {
perror("Fork failed");
// TODO; Send TASK_REJECT packet to server to inform that task could not be processed
return;
}
printf("Data Size: %zu\n", dataBufLen);
char** argv = NULL;
if (pid == 0) {
// Child with args
// Jump to special sequence to find args "ARGSTART"
size_t offset = 1 + sizeof(task_t);
while (offset < dataBufLen - 8) {
if (memcmp(&dataBuf[offset], "ARGSTART", 8) == 0) {
offset += 8;
break;
}
offset++;
}
printf("Args Offset: %zu\n", offset);
printf("Args length: %zu\n", dataBufLen - offset);
// Parse args
DynArr_push_back(execArgs, task.binary); // First arg is binary name
while (offset < dataBufLen) {
//if (memcmp(&dataBuf[offset], "ARGNEXT", 7) == 0) {
// offset += 7;
// continue;
//}
// Read raw arg as 32 bytes (name)
//char argName[32];
//memcpy(argName, &dataBuf[offset], 32);
//offset += 32;
//printf("Arg Name: %s\n", argName);
// Ignore name for now; just pass raw data
offset += 32;
// Find position of next ARGNEXT or end
size_t argDataStart = offset;
size_t argDataEnd = dataBufLen;;
for (size_t i = offset; i < dataBufLen - 7; i++) {
if (memcmp(&dataBuf[i], "ARGNEXT", 7) == 0) {
argDataEnd = i;
break;
}
}
if (argDataEnd <= argDataStart) {
printf("No more arg data found\n");
break;
}
printf("Arg Data Start: %zu\n", argDataStart);
printf("Arg Data End: %zu\n", argDataEnd);
size_t argDataLen = argDataEnd - argDataStart;
printf("Arg Data Length: %zu\n", argDataLen);
char* argData = malloc(argDataLen);
memcpy(argData, &dataBuf[argDataStart], argDataLen); // No null-termination; this is raw data
offset += argDataLen;
}
DynArr_push_back(execArgs, NULL); // Null-terminate args array
// Convert into char**
argv = malloc(DynArr_size(execArgs) * sizeof(char*) + 1);
for (size_t i = 0; i < DynArr_size(execArgs); i++) {
char** argPtr = (char**)DynArr_at(execArgs, i);
argv[i] = *argPtr;
}
argv[DynArr_size(execArgs)] = NULL; // Null-terminate
printf("Executing task %u: %s\n", task.taskId, task.binary);
char execName[66];
snprintf(execName, sizeof(execName), "./%s", task.binary); // TODO; Improve
// Exec
execvp(execName, (char* const*)argv);
// If execvp returns, there was an error
perror("execvp failed");
exit(1); // Crash child
// SEND result packet later
} else {
int status;
waitpid(pid, &status, 0);
if (WIFEXITED(status)) {
int exitStatus = WEXITSTATUS(status);
printf("Task %u completed with exit status %d\n", task.taskId, exitStatus);
if (exitStatus == 0) {
// Send TASK_RESULT packet
DynArr* packet = DYNARR_CREATE(uint8_t, 1);
uint8_t packetType = PACKET_TYPE_TASK_RESULT;
DynArr_push_back(packet, &packetType);
// Read from ./output.bin
FILE* outFile = fopen("output.bin", "rb");
if (!outFile) {
perror("Failed to open output.bin");
}
int byte;
while ((byte = fgetc(outFile)) != EOF) {
uint8_t b = (uint8_t)byte;
DynArr_push_back(packet, &b);
}
fclose(outFile);
// Send
send(sock, packet->data, DynArr_size(packet) * sizeof(uint8_t), 0);
printf("Sent TASK_RESULT packet for task %u to server\n", task.taskId);
DynArr_destroy(packet);
} else {
// TODO; Send TASK_REJECT packet to server to inform that task failed
break;
}
} else {
printf("Task %u did not complete successfully\n", task.taskId);
}
}
for (size_t i = 0; i < DynArr_size(execArgs); i++) {
char** argPtr = (char**)DynArr_at(execArgs, i);
if (argPtr && *argPtr) {
free(*argPtr);
}
}
DynArr_destroy(execArgs);
free(argv);
break;
default:
printf("Received unknown packet type %u from server\n", type);
break;
}
}
void* TCPRoutine(void* arg) {
while (1) {
// Check for incoming data on recv
memset(dataBuf, 0, sizeof(dataBuf));
ssize_t n = recv(sock, dataBuf, sizeof(dataBuf), 0);
if (n == 0) {
printf("Server disconnected\n");
break;
} else if (n > 0) {
on_data(n);
}
// Sleep 10 millis
usleep(10000);
}
return NULL;
}
int main(void) { int main(void) {
printf("Hello, World!\n"); srand(time(NULL));
// Temporary; move to TcpClient (non-server) impl. later
struct sockaddr_in serverAddr;
memset(&serverAddr, 0, sizeof(serverAddr));
serverAddr.sin_family = AF_INET;
serverAddr.sin_port = htons(6175);
serverAddr.sin_addr.s_addr = inet_addr("127.0.0.1");
sock = socket(AF_INET, SOCK_STREAM, 0);
if (sock < 0) {
perror("Socket creation failed");
return 1;
}
if (connect(sock, (struct sockaddr*)&serverAddr, sizeof(serverAddr)) < 0) {
perror("Connection to server failed");
close(sock);
return 1;
}
capabilities = DYNARR_CREATE(capability_t, 4);
// Example capabilitys
capability_t cap1 = { "test_computation", 1 };
DynArr_push_back(capabilities, &cap1);
// Start thread
pthread_t tcpThread;
pthread_create(&tcpThread, NULL, TCPRoutine, NULL);
while (1) {
memset(command, 0, sizeof(command));
printf("Enter command (type 'exit' to quit): ");
if (fgets(command, sizeof(command), stdin) == NULL) {
printf("Error reading command\n");
continue;
}
}
pthread_cancel(tcpThread);
pthread_join(tcpThread, NULL);
DynArr_destroy(capabilities);
close(sock);
return 0; return 0;
} }

View File

@@ -37,12 +37,12 @@ void DynArr_reserve(DynArr* p, size_t n) {
p->capacity = n; p->capacity = n;
} }
// Push data into a new block at the end of the array // Push data into a new block at the end of the array; If value is NULL, the new block will be zeroed.
void* DynArr_push_back(DynArr* p, void* value) { void* DynArr_push_back(DynArr* p, void* value) {
if (value == NULL) { //if (value == NULL) {
printf("push_back ignored; value is null"); // printf("push_back ignored; value is null");
return NULL; // return NULL;
} //}
if (p->size >= p->capacity) { if (p->size >= p->capacity) {
size_t new_cap = (p->capacity == 0) ? 1 : p->capacity * 2; size_t new_cap = (p->capacity == 0) ? 1 : p->capacity * 2;
@@ -62,7 +62,13 @@ void* DynArr_push_back(DynArr* p, void* value) {
} }
void* dst = (void*)((char*)p->data + (p->size * p->elemSize)); void* dst = (void*)((char*)p->data + (p->size * p->elemSize));
if (value == NULL) {
memset(dst, 0, p->elemSize); // Handle NULL value.
} else {
memcpy((char*)dst, value, p->elemSize); memcpy((char*)dst, value, p->elemSize);
}
p->size++; p->size++;
return dst; return dst;

View File

@@ -16,7 +16,7 @@ void Task_Create(task_t* tsk) {
tsk->state = TASK_NONE; tsk->state = TASK_NONE;
memset(tsk->binary, 0, 64); memset(tsk->binary, 0, 64);
tsk->args = DYNARR_CREATE(task_arg_t, 1); tsk->args = DYNARR_CREATE(task_arg_t, 1);
tsk->results = DYNARR_CREATE(task_arg_t, 1); tsk->results = DYNARR_CREATE(uint8_t, 1);
} }
void Task_DestroyArgs(task_t* task) { void Task_DestroyArgs(task_t* task) {
@@ -35,6 +35,13 @@ void Task_DestroyArgs(task_t* task) {
return; return;
} }
for (size_t i = 0; i < DynArr_size(task->args); i++) {
task_arg_t* arg = (task_arg_t*)DynArr_at(task->args, i);
if (arg && arg->data) {
DynArr_destroy(arg->data);
}
}
DynArr_destroy(task->args); DynArr_destroy(task->args);
DynArr_destroy(task->results); DynArr_destroy(task->results);
} }

View File

@@ -57,6 +57,15 @@ void on_data(TcpClient* client) {
} }
} }
// TODO; Move inside check, this is redundant
if (capabilitiesCount > 0) {
// Send ACKNOWLEDGE; Might embed which packet is being acknowledged later
char ackPacket[1];
ackPacket[0] = PACKET_TYPE_ACKNOWLEDGE;
send(client->clientFd, ackPacket, 1, 0);
printf("Sent ACKNOWLEDGE to client %u\n", client->clientId);
}
break; break;
case PACKET_TYPE_TASK_REQUEST: case PACKET_TYPE_TASK_REQUEST:
printf("Received TASK_REQUEST packet from client %u\n", client->clientId); printf("Received TASK_REQUEST packet from client %u\n", client->clientId);
@@ -74,11 +83,19 @@ void on_data(TcpClient* client) {
break; break;
} }
// Find a task that matches capabilities and assign it
for (size_t i = 0; i < DynArr_size(&taskQueue.tasks); i++) {
task_t* task = (task_t*)DynArr_at(&taskQueue.tasks, i);
int assigned = 0; int assigned = 0;
printf("Task count: %d", (int)DynArr_size(taskQueue.tasks));
// Find a task that matches capabilities and assign it
for (size_t i = 0; i < DynArr_size(taskQueue.tasks); i++) {
//printf("Iterated!\n");
task_t* task = (task_t*)DynArr_at(taskQueue.tasks, i);
if (!task) {
continue;
}
if (task->state == TASK_PENDING) { if (task->state == TASK_PENDING) {
// For simplicity, assign the first pending task with capabilities // For simplicity, assign the first pending task with capabilities
// The capabilites (for now) are the required binary name; we're just gonna strcmp it for now // The capabilites (for now) are the required binary name; we're just gonna strcmp it for now
@@ -97,7 +114,36 @@ void on_data(TcpClient* client) {
DynArr_push_back(packet, &packetType); DynArr_push_back(packet, &packetType);
// Push back the task struct // Push back the task struct
DynArr_push_back(packet, task); for (size_t k = 0; k < sizeof(task_t); k++) {
DynArr_push_back(packet, ((char*)task) + k);
}
// Special sequence to indicate start of args
char sequence[] = "ARGSTART";
for (size_t k = 0; k < sizeof(sequence); k++) {
DynArr_push_back(packet, &sequence[k]);
}
// Push back args
for (size_t k = 0; k < DynArr_size(task->args); k++) {
task_arg_t* arg = (task_arg_t*)DynArr_at(task->args, k);
for (size_t m = 0; m < sizeof(task_arg_t); m++) {
DynArr_push_back(packet, ((char*)arg) + m);
}
// Push back arg data
for (size_t m = 0; m < DynArr_size(arg->data); m++) {
char* dataPtr = (char*)DynArr_at(arg->data, m);
for (size_t n = 0; n < sizeof(char); n++) {
DynArr_push_back(packet, dataPtr + n);
}
}
// Push back next arg indicator
char nextArgIndicator[] = "ARGNEXT";
for (size_t n = 0; n < sizeof(nextArgIndicator); n++) {
DynArr_push_back(packet, &nextArgIndicator[n]);
}
}
send(client->clientFd, packet->data, DynArr_size(packet) * sizeof(uint8_t), 0); send(client->clientFd, packet->data, DynArr_size(packet) * sizeof(uint8_t), 0);
printf("Assigned task %u to client %u\n", task->taskId, client->clientId); printf("Assigned task %u to client %u\n", task->taskId, client->clientId);
@@ -131,6 +177,7 @@ void on_disconnect(TcpClient* client) {
} }
int main(void) { int main(void) {
srand(time(NULL));
signal(SIGINT, signalHandler); signal(SIGINT, signalHandler);
TaskQueue_Init(&taskQueue); TaskQueue_Init(&taskQueue);
@@ -139,8 +186,15 @@ int main(void) {
task_t task1; task_t task1;
Task_Create(&task1); Task_Create(&task1);
task1.state = TASK_PENDING; task1.state = TASK_PENDING;
task1.args = DYNARR_CREATE(task_arg_t, 1);
strcpy(task1.binary, "test_computation");
task_arg_t* blk = DynArr_push_back(task1.args, &(task_arg_t){ .name = "input_data", .data = DYNARR_CREATE(char, 1) });
DynArr_push_back(blk->data, "12"); // Compute the square of 12
task1.taskId = 1;
TaskQueue_AddTask(&taskQueue, &task1); TaskQueue_AddTask(&taskQueue, &task1);
//Task_DestroyArgs(&task1);
printf("Task added!\n"); printf("Task added!\n");
TcpServer* svr = TcpServer_Create(); TcpServer* svr = TcpServer_Create();

View File

@@ -63,7 +63,7 @@ void TaskQueue_RemoveTask(task_queue_t* queue, size_t idx) {
task_t* ptr = DynArr_at(queue->tasks, idx); task_t* ptr = DynArr_at(queue->tasks, idx);
Task_DestroyArgs(ptr); Task_DestroyArgs(ptr);
DynArr_remove(queue->tasks, n); DynArr_remove(queue->tasks, idx);
} }
void TaskQueue_RemoveTaskByPtr(task_queue_t* queue, task_t* task) { void TaskQueue_RemoveTaskByPtr(task_queue_t* queue, task_t* task) {

BIN
test_computation Executable file

Binary file not shown.