#include #include #include #include #include #include #include #include #include #include volatile int running = 1; task_queue_t taskQueue; TcpServer* svr; void signalHandler(int sig) { printf("Caught signal %d, shutting down...\n", sig); running = 0; } void on_connect(TcpClient* client) { char helloPacket[SERVER_HELLO_PACKET_SIZE + 1]; // +1 for packet type helloPacket[0] = PACKET_TYPE_HELLO; helloPacket[1] = SERVER_INFO_VERSION; memcpy(&helloPacket[2], SERVER_INFO_IDENTIFIER, 16); TcpServer_Send(svr, client, helloPacket, SERVER_HELLO_PACKET_SIZE + 1); //send(client->clientFd, helloPacket, SERVER_HELLO_PACKET_SIZE + 1, 0); printf("Client connected: %u\n", client->clientId); } void on_data(TcpClient* client) { uint8_t packetType; ssize_t bytesReceived = client->dataBufLen; if (bytesReceived == 0) { return; } else if (bytesReceived < 0) { perror("Error receiving data from client\n"); return; } packetType = client->dataBuf[0]; switch (packetType) { case PACKET_TYPE_HELLO: // Unexpected, ignore break; case PACKET_TYPE_CLIENT_CAPABILITIES: printf("Received CLIENT_CAPABILITIES packet from client %u\n", client->clientId); // Decode capabilities here char* ptr = (char*)&client->dataBuf[1]; // Skip packet type byte size_t capabilitiesCount = (bytesReceived - 1) / sizeof(capability_t); for (size_t i = 0; i < capabilitiesCount; i++) { capability_t cap; memcpy(&cap, ptr + i * sizeof(capability_t), sizeof(capability_t)); printf("Capability: %s, Supported: %s\n", cap.name, cap.supported ? "Yes" : "No"); if (client->capabilities) { DynArr_push_back(client->capabilities, &cap); } } // TODO; Move inside check, this is redundant if (capabilitiesCount > 0) { // Send ACKNOWLEDGE; Might embed which packet is being acknowledged later char ackPacket[1]; ackPacket[0] = PACKET_TYPE_ACKNOWLEDGE; TcpServer_Send(svr, client, ackPacket, 1); //send(client->clientFd, ackPacket, 1, 0); printf("Sent ACKNOWLEDGE to client %u\n", client->clientId); } break; case PACKET_TYPE_TASK_REQUEST: printf("Received TASK_REQUEST packet from client %u\n", client->clientId); if (client->assignedTaskNumber != -1) { // Client already has a task assigned printf("Client %u already has a task assigned (%ld), ignoring request\n", client->clientId, client->assignedTaskNumber); break; } if (client->capabilities) { size_t capCount = DynArr_size(client->capabilities); if (capCount <= 0) { // Request capabilities first char missingInfoPacket[2]; missingInfoPacket[0] = PACKET_TYPE_MISSING_INFO; missingInfoPacket[1] = PACKET_TYPE_CLIENT_CAPABILITIES; TcpServer_Send(svr, client, missingInfoPacket, 2); printf("Requested capabilities from client %u\n", client->clientId); break; } int assigned = 0; printf("Task count: %d", (int)DynArr_size(taskQueue.tasks)); // Find a task that matches capabilities and assign it for (size_t i = 0; i < DynArr_size(taskQueue.tasks); i++) { //printf("Iterated!\n"); task_t* task = (task_t*)DynArr_at(taskQueue.tasks, i); if (!task) { continue; } if (task->state == TASK_PENDING) { // For simplicity, assign the first pending task with capabilities // The capabilites (for now) are the required binary name; we're just gonna strcmp it for now for (size_t j = 0; j < capCount; j++) { capability_t* cap = (capability_t*)DynArr_at(client->capabilities, j); if (strcmp(cap->name, task->binary) == 0 && cap->supported) { // Assign task to client assigned = 1; task->state = TASK_ASSIGNED; task->assigned_to = client->clientId; task->assigned_at = time(NULL); // Make packet DynArr* packet = DYNARR_CREATE(uint8_t, 1); uint8_t packetType = PACKET_TYPE_TASK_ASSIGN; DynArr_push_back(packet, &packetType); uint32_t dataSize = sizeof(uint32_t) + 64 + DynArr_size(task->args); // Push data size for (size_t k = 0; k < sizeof(uint32_t); k++) { DynArr_push_back(packet, &((uint8_t*)&dataSize)[k]); } // Task ID for (size_t k = 0; k < sizeof(uint32_t); k++) { DynArr_push_back(packet, &((uint8_t*)&task->taskId)[k]); } // Binary name for (size_t k = 0; k < 64; k++) { DynArr_push_back(packet, &task->binary[k]); // char is uint8_t compatible } // Args uint32_t argsSize = (uint32_t)DynArr_size(task->args); for (size_t k = 0; k < argsSize; k++) { uint8_t* argByte = (uint8_t*)DynArr_at(task->args, k); DynArr_push_back(packet, argByte); } TcpServer_Send(svr, client, packet->data, DynArr_size(packet) * sizeof(uint8_t)); client->assignedTaskNumber = task->taskId; DynArr_destroy(packet); printf("Assigned task %u to client %u\n", task->taskId, client->clientId); break; } } if (assigned) { break; } } } if (!assigned) { // No task available char noneAvailablePacket[1]; noneAvailablePacket[0] = PACKET_TYPE_TASK_NONE_AVAILABLE; TcpServer_Send(svr, client, noneAvailablePacket, 1); printf("No available tasks for client %u\n", client->clientId); } break; } break; case PACKET_TYPE_TASK_REJECT: printf("Received TASK_REJECT packet from client %u\n", client->clientId); if (client->assignedTaskNumber == -1) { // No task assigned printf("Client %u has no task assigned, ignoring reject\n", client->clientId); break; } else { // Mark task as pending again for (size_t i = 0; i < DynArr_size(taskQueue.tasks); i++) { task_t* task = (task_t*)DynArr_at(taskQueue.tasks, i); if (task && task->taskId == client->assignedTaskNumber) { task->state = TASK_PENDING; task->assigned_to = 0; task->assigned_at = 0; printf("Marked task %u as pending again\n", task->taskId); break; } } client->assignedTaskNumber = -1; // Clear assigned task } break; case PACKET_TYPE_TASK_RESULT: printf("Received TASK_RESULT packet from client %u\n", client->clientId); if (client->assignedTaskNumber == -1) { // No task assigned printf("Client %u has no task assigned, ignoring result\n", client->clientId); break; } else { // Mark task as done; TODO: Store results for (size_t i = 0; i < DynArr_size(taskQueue.tasks); i++) { task_t* task = (task_t*)DynArr_at(taskQueue.tasks, i); if (task && task->taskId == client->assignedTaskNumber) { task->state = TASK_DONE; printf("Marked task %u as done\n", task->taskId); // Temporary: log as int //if (bytesReceived - 1 >= sizeof(int)) { // int result; // memcpy(&result, &client->dataBuf[1 + sizeof(uint32_t)], sizeof(int)); // printf("Task %u result: %d\n", task->taskId, result); //} uint32_t resultDataSize; memcpy(&resultDataSize, &client->dataBuf[1], sizeof(uint32_t)); printf("Task %u result data size: %u bytes\n", task->taskId, resultDataSize); // Store results task->assigned_to = 0; task->assigned_at = 0; task->results = DYNARR_CREATE(uint8_t, 1); // Store result data for (size_t j = 1; j < resultDataSize; j++) { uint8_t byte = client->dataBuf[j + 1 + sizeof(uint32_t)]; // +1 for packet type, +4 for size DynArr_push_back(task->results, &byte); } // TODO: Optionally dump results to file or process them here // Note: This assumes that the results are trustworthy, which might need to be verified via multiple clients break; } } client->assignedTaskNumber = -1; // Clear assigned task } break; default: printf("Received unknown packet type %u from client %u\n", packetType, client->clientId); break; } } void on_disconnect(TcpClient* client) { printf("Client disconnected: %u\n", client->clientId); // TODO; Check if tasks were assigned to this client and handle them } int main(void) { srand(time(NULL)); signal(SIGINT, signalHandler); TaskQueue_Init(&taskQueue); printf("Task Queue Created!\n"); //task_t task1; //Task_Create(&task1); //task1.state = TASK_PENDING; //task1.args = DYNARR_CREATE(uint8_t, 1); //int argVal = 10000019; // This should stay in the stack due to the loop below (I guess we'll see soon enough) //strcpy(task1.binary, "test_computation"); //for (size_t i = 0; i < sizeof(argVal); i++) { // uint8_t b = ((uint8_t*)&argVal)[i]; // Byte hacking to get each byte of the int // DynArr_push_back(task1.args, &b); //} //task1.taskId = 1; // //TaskQueue_AddTask(&taskQueue, &task1); // Create some dummy tasks for (int i = 0; i < 5; i++) { task_t task; Task_Create(&task); task.state = TASK_PENDING; task.args = DYNARR_CREATE(uint8_t, 1); int argVal = rand(); // Different arg for each task snprintf(task.binary, sizeof(task.binary), "test_computation"); for (size_t j = 0; j < sizeof(argVal); j++) { uint8_t b = ((uint8_t*)&argVal)[j]; // Byte hacking to get each byte of the int DynArr_push_back(task.args, &b); } TaskQueue_AddTask(&taskQueue, &task); // AddTask copies to the dynamic array, so we can free the stack task safely } //Task_DestroyArgs(&task1); printf("Task added!\n"); svr = TcpServer_Create(); TcpServer_Init(svr, 6175, "0.0.0.0"); svr->on_connect = on_connect; svr->on_data = on_data; svr->on_disconnect = on_disconnect; TcpServer_Start(svr, 10); while (running) { // Main Loop sleep(1); } TcpServer_Stop(svr); TcpServer_Destroy(svr); TaskQueue_RemoveTask(&taskQueue, 0); TaskQueue_DestroyQueue(&taskQueue); printf("Destroyed!\n"); return 0; }