diff --git a/CMakeLists.txt b/CMakeLists.txt index 14746b3..d0ed92f 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -9,6 +9,8 @@ set(CMAKE_C_STANDARD 23) set(CMAKE_C_STANDARD_REQUIRED ON) set(CMAKE_C_EXTENSIONS OFF) +find_package(Threads REQUIRED) + # --------------------------------------------------------- # Output directories # --------------------------------------------------------- @@ -26,12 +28,12 @@ endforeach() # Common file(GLOB_RECURSE COMMON_SRC CONFIGURE_DEPENDS src/common/*.c) add_library(common STATIC ${COMMON_SRC}) -target_link_libraries(common PUBLIC) +target_link_libraries(common PUBLIC ${CMAKE_THREAD_LIBS_INIT}) target_compile_options(common PRIVATE -Wall -Wextra -Wpedantic - -lpthread + -g ) target_include_directories(common PUBLIC ${PROJECT_SOURCE_DIR}/include @@ -40,7 +42,7 @@ target_include_directories(common PUBLIC # Client file(GLOB_RECURSE CLIENT_SRC CONFIGURE_DEPENDS src/client/*.c) add_executable(client ${CLIENT_SRC}) -target_link_libraries(client PRIVATE common) +target_link_libraries(client PRIVATE common ${CMAKE_THREAD_LIBS_INIT}) target_include_directories(client PRIVATE ${PROJECT_SOURCE_DIR}/include ) @@ -48,14 +50,14 @@ target_compile_options(client PRIVATE -Wall -Wextra -Wpedantic - -lpthread + -g ) set_target_properties(client PROPERTIES OUTPUT_NAME "miniboinc_client") # Server file(GLOB_RECURSE SERVER_SRC CONFIGURE_DEPENDS src/server/*.c) add_executable(server ${SERVER_SRC}) -target_link_libraries(server PRIVATE common) +target_link_libraries(server PRIVATE common ${CMAKE_THREAD_LIBS_INIT}) target_include_directories(server PRIVATE ${PROJECT_SOURCE_DIR}/include ) @@ -63,7 +65,6 @@ target_compile_options(server PRIVATE -Wall -Wextra -Wpedantic - -lpthread -g ) target_compile_definitions(server PRIVATE) diff --git a/include/common/capability.h b/include/common/capability.h new file mode 100644 index 0000000..82201f4 --- /dev/null +++ b/include/common/capability.h @@ -0,0 +1,12 @@ +#ifndef CAPABILITY_H +#define CAPABILITY_H + +#include +#include + +typedef struct { + char name[32]; + bool supported; +} capability_t; + +#endif diff --git a/include/common/dynarr.h b/include/common/dynarr.h index 098b87d..fa818f1 100644 --- a/include/common/dynarr.h +++ b/include/common/dynarr.h @@ -54,4 +54,4 @@ void DynArr_destroy(DynArr* p); #define DYNARR_CREATE(T, initialCapacity) DynArr_create(sizeof(T), initialCapacity) -#endif \ No newline at end of file +#endif diff --git a/include/common/dynset.h b/include/common/dynset.h new file mode 100644 index 0000000..3efe4a6 --- /dev/null +++ b/include/common/dynset.h @@ -0,0 +1,20 @@ +#ifndef DYNSET_H +#define DYNSET_H + +#include + +// Dynamic Set structure - basically DynArr with uniqueness enforced +typedef struct { + DynArr* arr; +} DynSet; + +// Function prototypes +DynSet* DynSet_Create(size_t elemSize); +void DynSet_Destroy(DynSet* set); +int DynSet_Insert(DynSet* set, const void* element); +int DynSet_Contains(DynSet* set, const void* element); +size_t DynSet_Size(DynSet* set); +void* DynSet_Get(DynSet* set, size_t index); +void DynSet_Remove(DynSet* set, const void* element); + +#endif diff --git a/include/common/numgen.h b/include/common/numgen.h index 3e74955..ad3207e 100644 --- a/include/common/numgen.h +++ b/include/common/numgen.h @@ -7,6 +7,8 @@ #include unsigned char random_byte(void); +uint16_t random_two_byte(void); uint32_t random_four_byte(void); +uint64_t random_eight_byte(void); -#endif \ No newline at end of file +#endif diff --git a/include/common/packettype.h b/include/common/packettype.h new file mode 100644 index 0000000..fa5a467 --- /dev/null +++ b/include/common/packettype.h @@ -0,0 +1,15 @@ +#ifndef PACKETTYPE_H +#define PACKETTYPE_H + +typedef enum { + PACKET_TYPE_NONE = 0, + PACKET_TYPE_HELLO = 1, + PACKET_TYPE_TASK_ASSIGN = 2, + PACKET_TYPE_TASK_RESULT = 3, + PACKET_TYPE_STATUS_UPDATE = 4, + PACKET_TYPE_CLIENT_CAPABILITIES = 5, + PACKET_TYPE_TASK_REQUEST = 6, + PACKET_TYPE_MISSING_INFO = 7 +} PacketType; + +#endif diff --git a/include/common/task/task.h b/include/common/task/task.h index b3e55ff..060455d 100644 --- a/include/common/task/task.h +++ b/include/common/task/task.h @@ -38,9 +38,10 @@ typedef struct { task_state_t state; uint32_t assigned_to; time_t assigned_at; + DynArr* results; } task_t; void Task_Create(task_t* tsk); void Task_DestroyArgs(task_t* task); -#endif \ No newline at end of file +#endif diff --git a/include/common/tcpd/tcpclient.h b/include/common/tcpd/tcpclient.h index 7bf1331..882081c 100644 --- a/include/common/tcpd/tcpclient.h +++ b/include/common/tcpd/tcpclient.h @@ -7,6 +7,7 @@ #include #include #include +#include #define MTU 1500 @@ -15,13 +16,16 @@ struct TcpClient { struct sockaddr_in clientAddr; uint32_t clientId; - unsigned char dataBuf[MTU]; + unsigned char dataBuf[MTU]; + ssize_t dataBufLen; void (*on_data)(struct TcpClient* client); void (*on_disconnect)(struct TcpClient* client); + DynArr* capabilities; + pthread_t clientThread; }; typedef struct TcpClient TcpClient; -#endif \ No newline at end of file +#endif diff --git a/include/common/tcpd/tcpserver.h b/include/common/tcpd/tcpserver.h index a847a17..3216e16 100644 --- a/include/common/tcpd/tcpserver.h +++ b/include/common/tcpd/tcpserver.h @@ -12,6 +12,8 @@ #include #include +#include +#include typedef struct { int sockFd; @@ -51,4 +53,4 @@ void TcpServer_KillClient(TcpServer* ptr, TcpClient* cli); size_t Generic_FindClientInArrayByPtr(TcpClient** arr, TcpClient* ptr, size_t len); -#endif \ No newline at end of file +#endif diff --git a/include/server/info.h b/include/server/info.h new file mode 100644 index 0000000..0ab4170 --- /dev/null +++ b/include/server/info.h @@ -0,0 +1,10 @@ +#ifndef INFO_H +#define INFO_H + +#include + +#define SERVER_HELLO_PACKET_SIZE 17 // 1 byte version + 16 byte identifier +const uint8_t SERVER_INFO_VERSION = 1; +const char SERVER_INFO_IDENTIFIER[16] = "EXAMPLE_SERVER\0"; + +#endif diff --git a/include/server/task/taskqueue.h b/include/server/task/taskqueue.h index 1f80962..f757834 100644 --- a/include/server/task/taskqueue.h +++ b/include/server/task/taskqueue.h @@ -14,4 +14,4 @@ void TaskQueue_AddTask(task_queue_t* queue, task_t* task); void TaskQueue_RemoveTask(task_queue_t* queue, size_t idx); void TaskQueue_RemoveTaskByPtr(task_queue_t* queue, task_t* task); -#endif \ No newline at end of file +#endif diff --git a/src/client/main.c b/src/client/main.c index e1988a3..63f8805 100644 --- a/src/client/main.c +++ b/src/client/main.c @@ -3,4 +3,4 @@ int main(void) { printf("Hello, World!\n"); return 0; -} \ No newline at end of file +} diff --git a/src/common/dynarr.c b/src/common/dynarr.c index 6df3dc2..8aa2378 100644 --- a/src/common/dynarr.c +++ b/src/common/dynarr.c @@ -161,4 +161,4 @@ void DynArr_destroy(DynArr* p) { if (!p) return; free(p->data); free(p); -} \ No newline at end of file +} diff --git a/src/common/dynset.c b/src/common/dynset.c new file mode 100644 index 0000000..07952da --- /dev/null +++ b/src/common/dynset.c @@ -0,0 +1,58 @@ +#include + +DynSet* DynSet_Create(size_t elemSize) { + DynSet* set = (DynSet*)malloc(sizeof(DynSet)); + if (!set) { + return NULL; + } + set->arr = DynArr_create(elemSize, 1); + if (!set->arr) { + free(set); + return NULL; + } + return set; +} + +void DynSet_Destroy(DynSet* set) { + if (set) { + DynArr_destroy(set->arr); + free(set); + } +} + +int DynSet_Insert(DynSet* set, const void* element) { + if (DynSet_Contains(set, element)) { + return 0; // Element already exists + } + return DynArr_push_back(set->arr, element) != NULL; +} + +int DynSet_Contains(DynSet* set, const void* element) { + size_t size = DynArr_size(set->arr); + for (size_t i = 0; i < size; i++) { + void* current = DynArr_at(set->arr, i); + if (memcmp(current, element, set->arr->elemSize) == 0) { + return 1; // Found + } + } + return 0; // Not found +} + +size_t DynSet_Size(DynSet* set) { + return DynArr_size(set->arr); +} + +void* DynSet_Get(DynSet* set, size_t index) { + return DynArr_at(set->arr, index); +} + +void DynSet_Remove(DynSet* set, const void* element) { + size_t size = DynArr_size(set->arr); + for (size_t i = 0; i < size; i++) { + void* current = DynArr_at(set->arr, i); + if (memcmp(current, element, set->arr->elemSize) == 0) { + DynArr_remove(set->arr, i); + return; + } + } +} diff --git a/src/common/numgen.c b/src/common/numgen.c index a39fd2a..5f6a286 100644 --- a/src/common/numgen.c +++ b/src/common/numgen.c @@ -4,14 +4,38 @@ unsigned char random_byte(void) { return (unsigned char)(rand() % 256); } -uint32_t random_four_byte(void) { - uint32_t x; - unsigned char bytes[4]; - for (char i = 0; i < 4; i++) { +uint16_t random_two_byte(void) { + uint16_t x; + unsigned char bytes[2]; + for (unsigned char i = 0; i < 2; i++) { bytes[i] = random_byte(); } memcpy(&x, bytes, sizeof(x)); return x; -} \ No newline at end of file +} + +uint32_t random_four_byte(void) { + uint32_t x; + unsigned char bytes[4]; + for (unsigned char i = 0; i < 4; i++) { + bytes[i] = random_byte(); + } + + memcpy(&x, bytes, sizeof(x)); + + return x; +} + +uint64_t random_eight_byte(void) { + uint64_t x; + unsigned char bytes[8]; + for (unsigned char i = 0; i < 8; i++) { + bytes[i] = random_byte(); + } + + memcpy(&x, bytes, sizeof(x)); + + return x; +} diff --git a/src/common/task/task.c b/src/common/task/task.c index 34d3af2..47a33cb 100644 --- a/src/common/task/task.c +++ b/src/common/task/task.c @@ -16,6 +16,7 @@ void Task_Create(task_t* tsk) { tsk->state = TASK_NONE; memset(tsk->binary, 0, 64); tsk->args = DYNARR_CREATE(task_arg_t, 1); + tsk->results = DYNARR_CREATE(task_arg_t, 1); } void Task_DestroyArgs(task_t* task) { @@ -29,5 +30,11 @@ void Task_DestroyArgs(task_t* task) { return; } + if (!task->results) { + TASK_ERR_CODE = TASK_ERR_INVARGS; + return; + } + DynArr_destroy(task->args); -} \ No newline at end of file + DynArr_destroy(task->results); +} diff --git a/src/common/tcpd/tcpserver.c b/src/common/tcpd/tcpserver.c index 8177275..6e7a16b 100644 --- a/src/common/tcpd/tcpserver.c +++ b/src/common/tcpd/tcpserver.c @@ -25,6 +25,9 @@ void TcpServer_Destroy(TcpServer* ptr) { if (ptr->clientsArrPtr) { for (size_t i = 0; i < ptr->clients; i++) { if (ptr->clientsArrPtr[i]) { + if (ptr->clientsArrPtr[i]->capabilities) { + DynArr_destroy(ptr->clientsArrPtr[i]->capabilities); + } free(ptr->clientsArrPtr[i]); } } @@ -84,6 +87,7 @@ void* TcpServer_clientthreadprocess(void* ptr) { while (1) { memset(cli->dataBuf, 0, MTU); // Reset buffer ssize_t n = recv(cli->clientFd, cli->dataBuf, MTU, 0); + cli->dataBufLen = n; if (n == 0) { break; // Client disconnected @@ -106,6 +110,9 @@ void* TcpServer_clientthreadprocess(void* ptr) { size_t idx = Generic_FindClientInArrayByPtr(arr, cli, svr->clients); if (idx != SIZE_MAX) { if (arr[idx]) { + if (arr[idx]->capabilities) { + DynArr_destroy(arr[idx]->capabilities); + } free(arr[idx]); arr[idx] = NULL; } @@ -147,6 +154,8 @@ void* TcpServer_threadprocess(void* ptr) { heapCli->on_data = tempclient.on_data; heapCli->on_disconnect = tempclient.on_disconnect; heapCli->clientId = random_four_byte(); + heapCli->dataBufLen = 0; + heapCli->capabilities = DYNARR_CREATE(capability_t, 4); size_t i; for (i = 0; i < svr->clients; i++) { @@ -289,4 +298,4 @@ size_t Generic_FindClientInArrayByPtr(TcpClient** arr, TcpClient* ptr, size_t le } return SIZE_MAX; // Returns max unsigned, likely improbable to be correct -} \ No newline at end of file +} diff --git a/src/server/main.c b/src/server/main.c index d8194ff..fd6a189 100644 --- a/src/server/main.c +++ b/src/server/main.c @@ -5,29 +5,134 @@ #include #include #include +#include +#include +#include volatile int running = 1; +task_queue_t taskQueue; void signalHandler(int sig) { + printf("Caught signal %d, shutting down...\n", sig); running = 0; } void on_connect(TcpClient* client) { - return; + char helloPacket[SERVER_HELLO_PACKET_SIZE + 1]; // +1 for packet type + helloPacket[0] = PACKET_TYPE_HELLO; + helloPacket[1] = SERVER_INFO_VERSION; + memcpy(&helloPacket[2], SERVER_INFO_IDENTIFIER, 16); + send(client->clientFd, helloPacket, SERVER_HELLO_PACKET_SIZE + 1, 0); + + printf("Client connected: %u\n", client->clientId); } void on_data(TcpClient* client) { - return; + uint8_t packetType; + ssize_t bytesReceived = client->dataBufLen; + + if (bytesReceived == 0) { + return; + } else if (bytesReceived < 0) { + perror("Error receiving data from client\n"); + return; + } + + packetType = client->dataBuf[0]; + switch (packetType) { + case PACKET_TYPE_HELLO: + // Unexpected, ignore + break; + case PACKET_TYPE_CLIENT_CAPABILITIES: + printf("Received CLIENT_CAPABILITIES packet from client %u\n", client->clientId); + // Decode capabilities here + char* ptr = (char*)&client->dataBuf[1]; // Skip packet type byte + size_t capabilitiesCount = (bytesReceived - 1) / sizeof(capability_t); + for (size_t i = 0; i < capabilitiesCount; i++) { + capability_t cap; + memcpy(&cap, ptr + i * sizeof(capability_t), sizeof(capability_t)); + printf("Capability: %s, Supported: %s\n", cap.name, cap.supported ? "Yes" : "No"); + if (client->capabilities) { + DynArr_push_back(client->capabilities, &cap); + } + } + + break; + case PACKET_TYPE_TASK_REQUEST: + printf("Received TASK_REQUEST packet from client %u\n", client->clientId); + + if (client->capabilities) { + size_t capCount = DynArr_size(client->capabilities); + + if (capCount <= 0) { + // Request capabilities first + char missingInfoPacket[2]; + missingInfoPacket[0] = PACKET_TYPE_MISSING_INFO; + missingInfoPacket[1] = PACKET_TYPE_CLIENT_CAPABILITIES; + send(client->clientFd, missingInfoPacket, 2, 0); + printf("Requested capabilities from client %u\n", client->clientId); + break; + } + + // Find a task that matches capabilities and assign it + for (size_t i = 0; i < DynArr_size(&taskQueue.tasks); i++) { + task_t* task = (task_t*)DynArr_at(&taskQueue.tasks, i); + int assigned = 0; + + if (task->state == TASK_PENDING) { + // For simplicity, assign the first pending task with capabilities + // The capabilites (for now) are the required binary name; we're just gonna strcmp it for now + for (size_t j = 0; j < capCount; j++) { + capability_t* cap = (capability_t*)DynArr_at(client->capabilities, j); + if (strcmp(cap->name, task->binary) == 0 && cap->supported) { + // Assign task to client + assigned = 1; + task->state = TASK_ASSIGNED; + task->assigned_to = client->clientId; + task->assigned_at = time(NULL); + + // Make packet + DynArr* packet = DYNARR_CREATE(uint8_t, 1); + uint8_t packetType = PACKET_TYPE_TASK_ASSIGN; + DynArr_push_back(packet, &packetType); + + // Push back the task struct + DynArr_push_back(packet, task); + + send(client->clientFd, packet->data, DynArr_size(packet) * sizeof(uint8_t), 0); + printf("Assigned task %u to client %u\n", task->taskId, client->clientId); + + break; + } + } + + if (assigned) { + break; + } + } + } + break; + } + + break; + case PACKET_TYPE_TASK_RESULT: + printf("Received TASK_RESULT packet from client %u\n", client->clientId); + break; + default: + printf("Received unknown packet type %u from client %u\n", packetType, client->clientId); + break; + } } void on_disconnect(TcpClient* client) { - return; + printf("Client disconnected: %u\n", client->clientId); + + // TODO; Check if tasks were assigned to this client and handle them } int main(void) { signal(SIGINT, signalHandler); - task_queue_t taskQueue; TaskQueue_Init(&taskQueue); printf("Task Queue Created!\n"); @@ -59,4 +164,4 @@ int main(void) { printf("Destroyed!\n"); return 0; -} \ No newline at end of file +} diff --git a/src/server/task/taskqueue.c b/src/server/task/taskqueue.c index e5ece39..0884591 100644 --- a/src/server/task/taskqueue.c +++ b/src/server/task/taskqueue.c @@ -90,4 +90,4 @@ void TaskQueue_RemoveTaskByPtr(task_queue_t* queue, task_t* task) { } DynArr_remove(queue->tasks, at); -} \ No newline at end of file +}