From 9c99eec3a821b0671da020dee229199082c2b629 Mon Sep 17 00:00:00 2001 From: DcruBro Date: Thu, 23 Apr 2026 16:24:26 +0200 Subject: [PATCH] TCP Node boilerplate; CLI interface --- TODO.txt | 1 + include/block/chain.h | 2 +- include/nets/net_node.h | 31 +- include/packettype.h | 12 +- include/tcpd/tcpclient.h | 33 ++ include/tcpd/tcpconnection.h | 73 +++- include/tcpd/tcpserver.h | 18 +- src/block/chain.c | 55 ++- src/main.c | 778 +++++++++++++++++++++++------------ src/nets/net_node.c | 227 +++++++++- src/tcpd/tcpclient.c | 171 ++++++++ src/tcpd/tcpconnection.c | 264 ++++++++++++ src/tcpd/tcpserver.c | 548 ++++++++++++------------ 13 files changed, 1635 insertions(+), 578 deletions(-) create mode 100644 include/tcpd/tcpclient.h create mode 100644 src/tcpd/tcpclient.c create mode 100644 src/tcpd/tcpconnection.c diff --git a/TODO.txt b/TODO.txt index bcc8449..60d2ed6 100644 --- a/TODO.txt +++ b/TODO.txt @@ -6,6 +6,7 @@ I want to make an "optional privacy" system, where the TX can be public or priva I need to figure out a way to make the privacy work without a UTXO system, and instead, with a "Balance Sheet" approach. Move the Networking Code to support win32 as well, as I'm just doing POSIX right now +Maybe move the node system to an async event loop instead of spawning threads. DONE: I want to move away from the Monero emission. I want to do something a bit radical for cryptocurrency, but I feel like it's necessary to make it more like money: diff --git a/include/block/chain.h b/include/block/chain.h index 145eaaa..8680882 100644 --- a/include/block/chain.h +++ b/include/block/chain.h @@ -26,7 +26,7 @@ void Chain_Wipe(blockchain_t* chain); // I/O bool Chain_SaveToFile(blockchain_t* chain, const char* dirpath, uint256_t currentSupply, uint64_t currentReward); -bool Chain_LoadFromFile(blockchain_t* chain, const char* dirpath, uint256_t* outCurrentSupply, uint32_t* outDifficultyTarget, uint64_t* outCurrentReward, uint8_t* outLastSavedHash); +bool Chain_LoadFromFile(blockchain_t* chain, const char* dirpath, uint256_t* outCurrentSupply, uint32_t* outDifficultyTarget, uint64_t* outCurrentReward, uint8_t* outLastSavedHash, bool loadTransactions); // Difficulty uint32_t Chain_ComputeNextTarget(blockchain_t* chain, uint32_t currentTarget); diff --git a/include/nets/net_node.h b/include/nets/net_node.h index 368934e..ad1baf0 100644 --- a/include/nets/net_node.h +++ b/include/nets/net_node.h @@ -4,26 +4,47 @@ #ifndef _WIN32 // POSIX #include +#include #include #endif -#include -#include -#include -#include #include +#include + +#include typedef struct { tcp_server_t* server; - // TODO: Add the list of clients as well + tcp_client_t outboundClients[MAX_CONS]; + size_t outboundCount; + void (*on_connect)(tcp_connection_t* conn, void* user); + void (*on_data)(tcp_connection_t* conn, const unsigned char* data, size_t len, void* user); + void (*on_disconnect)(tcp_connection_t* conn, void* user); + void* callbackUser; } net_node_t; net_node_t* Node_Create(); void Node_Destroy(net_node_t* node); +void Node_SetCallbacks( + net_node_t* node, + void (*on_connect)(tcp_connection_t* conn, void* user), + void (*on_data)(tcp_connection_t* conn, const unsigned char* data, size_t len, void* user), + void (*on_disconnect)(tcp_connection_t* conn, void* user), + void* user +); + +int Node_ConnectPeer(net_node_t* node, const char* ip, unsigned short port); +int Node_ConnectStartupPeers(net_node_t* node, const char** ips, const unsigned short* ports, size_t peersCount); + +int Node_SendPacket(net_node_t* node, tcp_connection_t* conn, packet_type_t packetType, const void* payload, size_t payloadLen); + // Callback logic void Node_Server_OnConnect(tcp_connection_t* client); void Node_Server_OnData(tcp_connection_t* client); void Node_Server_OnDisconnect(tcp_connection_t* client); +void Node_Client_OnConnect(tcp_connection_t* client); +void Node_Client_OnData(tcp_connection_t* client); +void Node_Client_OnDisconnect(tcp_connection_t* client); #endif diff --git a/include/packettype.h b/include/packettype.h index f968561..95c7b4d 100644 --- a/include/packettype.h +++ b/include/packettype.h @@ -1,9 +1,17 @@ #ifndef PACKETTYPE_H #define PACKETTYPE_H +#include + typedef enum { PACKET_TYPE_NONE = 0, - // Add more here -} PacketType; + PACKET_TYPE_REQUEST = 1, + PACKET_TYPE_RESPONSE = 2, + PACKET_TYPE_MAX = 3 +} packet_type_t; + +static inline int PacketType_IsValid(uint8_t packetType) { + return packetType > PACKET_TYPE_NONE && packetType < PACKET_TYPE_MAX; +} #endif diff --git a/include/tcpd/tcpclient.h b/include/tcpd/tcpclient.h new file mode 100644 index 0000000..b25983c --- /dev/null +++ b/include/tcpd/tcpclient.h @@ -0,0 +1,33 @@ +#ifndef TCPCLIENT_H +#define TCPCLIENT_H + +#include +#include + +#include + +typedef struct { + tcp_connection_t* connection; + void (*on_connect)(tcp_connection_t* conn); + void (*on_data)(tcp_connection_t* conn); + void (*on_disconnect)(tcp_connection_t* conn); + void* owner; +} tcp_client_t; + +int TcpClient_Init(tcp_client_t* client); +void TcpClient_Destroy(tcp_client_t* client); + +int TcpClient_Connect( + tcp_client_t* client, + const char* peerIp, + unsigned short peerPort, + void (*on_connect)(tcp_connection_t* conn), + void (*on_data)(tcp_connection_t* conn), + void (*on_disconnect)(tcp_connection_t* conn), + void* owner +); + +int TcpClient_Send(tcp_client_t* client, const void* data, size_t len); +void TcpClient_Disconnect(tcp_client_t* client); + +#endif diff --git a/include/tcpd/tcpconnection.h b/include/tcpd/tcpconnection.h index 1cb72cf..9d82189 100644 --- a/include/tcpd/tcpconnection.h +++ b/include/tcpd/tcpconnection.h @@ -1,29 +1,64 @@ -#ifndef TCPCLIENT_H -#define TCPCLIENT_H +#ifndef TCPCONNECTION_H +#define TCPCONNECTION_H -#include -#include -#include #include #include +#include +#include #include -#include -#define MTU 1500 +#define TCP_IO_BUFFER_SIZE 1500 +#define TCP_FRAME_HEADER_SIZE 4U +#define TCP_MAX_FRAME_PAYLOAD (1024U * 1024U) -struct tcp_connection_t { - int clientFd; - struct sockaddr_in clientAddr; - uint32_t clientId; - - unsigned char dataBuf[MTU]; - ssize_t dataBufLen; - void (*on_data)(struct tcp_connection_t* client); - void (*on_disconnect)(struct tcp_connection_t* client); - - pthread_t clientThread; -}; +typedef enum { + TCP_CONNECTION_ROLE_INBOUND = 0, + TCP_CONNECTION_ROLE_OUTBOUND = 1 +} tcp_connection_role_t; typedef struct tcp_connection_t tcp_connection_t; +struct tcp_connection_t { + int sockFd; + struct sockaddr_in peerAddr; + uint32_t connectionId; + tcp_connection_role_t role; + + pthread_t ioThread; + pthread_mutex_t sendLock; + pthread_mutex_t stateLock; + + bool closing; + bool disconnectedNotified; + + unsigned char* dataBuf; + size_t dataBufLen; + size_t dataBufCap; + + unsigned char headerBuf[TCP_FRAME_HEADER_SIZE]; + size_t headerBytesRead; + uint32_t expectedPayloadLen; + unsigned char* frameBuf; + size_t frameBytesRead; + + void (*on_data)(tcp_connection_t* conn); + void (*on_disconnect)(tcp_connection_t* conn); + void* owner; +}; + +int TcpConnection_Init(tcp_connection_t* conn, int sockFd, const struct sockaddr_in* peerAddr, tcp_connection_role_t role); +void TcpConnection_Destroy(tcp_connection_t* conn); + +int TcpConnection_SetDataBuffer(tcp_connection_t* conn, const unsigned char* data, size_t len); + +void TcpConnection_ResetFramingState(tcp_connection_t* conn); +int TcpConnection_FeedFramedData(tcp_connection_t* conn, const unsigned char* input, size_t inputLen); + +int TcpConnection_SendRaw(int sockFd, const void* data, size_t len); +int TcpConnection_SendFramed(tcp_connection_t* conn, const void* payload, size_t payloadLen); + +void TcpConnection_RequestClose(tcp_connection_t* conn); +void TcpConnection_MarkDisconnectNotified(tcp_connection_t* conn); +bool TcpConnection_IsDisconnectNotified(tcp_connection_t* conn); + #endif diff --git a/include/tcpd/tcpserver.h b/include/tcpd/tcpserver.h index e8f9716..139e7f9 100644 --- a/include/tcpd/tcpserver.h +++ b/include/tcpd/tcpserver.h @@ -1,23 +1,18 @@ #ifndef TCPSERVER_H #define TCPSERVER_H -#include -#include -#include -#include #include -#include #include -#include +#include #include -#include -#include typedef struct { int sockFd; struct sockaddr_in addr; int opt; + int isRunning; + void* owner; // Called before the client thread runs void (*on_connect)(tcp_connection_t* client); @@ -27,8 +22,9 @@ typedef struct { void (*on_disconnect)(tcp_connection_t* client); // max clients - size_t clients; + size_t maxClients; tcp_connection_t** clientsArrPtr; + pthread_mutex_t clientsMutex; pthread_t svrThread; } tcp_server_t; @@ -46,8 +42,8 @@ void TcpServer_Destroy(tcp_server_t* ptr); void TcpServer_Init(tcp_server_t* ptr, unsigned short port, const char* addr); void TcpServer_Start(tcp_server_t* ptr, int maxcons); void TcpServer_Stop(tcp_server_t* ptr); -void TcpServer_Send(tcp_server_t* ptr, tcp_connection_t* cli, void* data, size_t len); -void Generic_SendSocket(int sock, void* data, size_t len); +int TcpServer_Send(tcp_server_t* ptr, tcp_connection_t* cli, const void* data, size_t len); +void Generic_SendSocket(int sock, const void* data, size_t len); void TcpServer_Disconnect(tcp_server_t* ptr, tcp_connection_t* cli); void TcpServer_KillClient(tcp_server_t* ptr, tcp_connection_t* cli); diff --git a/src/block/chain.c b/src/block/chain.c index d770789..e7aacf6 100644 --- a/src/block/chain.c +++ b/src/block/chain.c @@ -401,14 +401,18 @@ bool Chain_SaveToFile(blockchain_t* chain, const char* dirpath, uint256_t curren size_t newSize = DynArr_size(chain->blocks); fseek(metaFile, 0, SEEK_SET); fwrite(&newSize, sizeof(size_t), 1, metaFile); + uint32_t difficultyTarget = INITIAL_DIFFICULTY; if (newSize > 0) { block_t* lastBlock = (block_t*)DynArr_at(chain->blocks, newSize - 1); uint8_t lastHash[32]; Block_CalculateHash(lastBlock, lastHash); fwrite(lastHash, sizeof(uint8_t), 32, metaFile); + difficultyTarget = lastBlock->header.difficultyTarget; + } else { + uint8_t zeroHash[32] = {0}; + fwrite(zeroHash, sizeof(uint8_t), 32, metaFile); } fwrite(¤tSupply, sizeof(uint256_t), 1, metaFile); - uint32_t difficultyTarget = ((block_t*)DynArr_at(chain->blocks, newSize - 1))->header.difficultyTarget; fwrite(&difficultyTarget, sizeof(uint32_t), 1, metaFile); fwrite(¤tReward, sizeof(uint64_t), 1, metaFile); @@ -425,7 +429,7 @@ bool Chain_SaveToFile(blockchain_t* chain, const char* dirpath, uint256_t curren return true; } -bool Chain_LoadFromFile(blockchain_t* chain, const char* dirpath, uint256_t* outCurrentSupply, uint32_t* outDifficultyTarget, uint64_t* outCurrentReward, uint8_t* outLastSavedHash) { +bool Chain_LoadFromFile(blockchain_t* chain, const char* dirpath, uint256_t* outCurrentSupply, uint32_t* outDifficultyTarget, uint64_t* outCurrentReward, uint8_t* outLastSavedHash, bool loadTransactions) { if (!chain || !chain->blocks || !dirpath || !outCurrentSupply || !outLastSavedHash) { return false; } @@ -521,7 +525,6 @@ bool Chain_LoadFromFile(blockchain_t* chain, const char* dirpath, uint256_t* out return false; } - // Header-only load path: do not allocate per-block transaction arrays. block_t* blk = (block_t*)calloc(1, sizeof(block_t)); if (!blk) { fclose(chainFile); @@ -544,18 +547,42 @@ bool Chain_LoadFromFile(blockchain_t* chain, const char* dirpath, uint256_t* out free(blk); return false; } - (void)txSize; - - /*for (size_t j = 0; j < txSize; j++) { - signed_transaction_t tx; - if (fread(&tx, sizeof(signed_transaction_t), 1, blockFile) != 1) { - fclose(blockFile); - Block_Destroy(blk); + if (loadTransactions) { + blk->transactions = DYNARR_CREATE(signed_transaction_t, txSize == 0 ? 1 : txSize); + if (!blk->transactions) { + fclose(chainFile); + fclose(tableFile); + free(blk); return false; } - Block_AddTransaction(blk, &tx); - }*/ // Transactions are not read, we use the merkle root for validity - blk->transactions = NULL; + + for (size_t j = 0; j < txSize; j++) { + signed_transaction_t tx; + if (fread(&tx, sizeof(signed_transaction_t), 1, chainFile) != 1) { + fclose(chainFile); + fclose(tableFile); + DynArr_destroy(blk->transactions); + free(blk); + return false; + } + + if (!DynArr_push_back(blk->transactions, &tx)) { + fclose(chainFile); + fclose(tableFile); + DynArr_destroy(blk->transactions); + free(blk); + return false; + } + } + } else { + if (txSize > 0 && fseek(chainFile, (long)(txSize * sizeof(signed_transaction_t)), SEEK_CUR) != 0) { + fclose(chainFile); + fclose(tableFile); + free(blk); + return false; + } + blk->transactions = NULL; + } // Loading from disk currently restores headers only. Do not run Chain_AddBlock, // because it enforces transaction presence and mutates balances. @@ -568,7 +595,7 @@ bool Chain_LoadFromFile(blockchain_t* chain, const char* dirpath, uint256_t* out chain->size++; // DynArr_push_back stores blocks by value, so the copied block now owns - // blk->transactions (NULL in header-only load mode). Free wrapper only. + // blk->transactions. Free wrapper only. free(blk); } diff --git a/src/main.c b/src/main.c index 17a238c..8001f42 100644 --- a/src/main.c +++ b/src/main.c @@ -13,9 +13,9 @@ #include #include -#include #include +#include #ifndef CHAIN_DATA_DIR #define CHAIN_DATA_DIR "chain_data" @@ -83,19 +83,6 @@ static bool GenerateTestMinerIdentity(uint8_t privateKey[32], uint8_t compressed return false; } -static int testCounts = 0; -static void MakeTestRecipientAddress(uint8_t outAddress[32]) { - if (!outAddress) { - return; - } - - const char* label = "skalacoin-test-recipient-address-"; - char buffer[256]; - snprintf(buffer, sizeof(buffer), "%s%d", label, testCounts); - SHA256((const unsigned char*)buffer, strlen(buffer), outAddress); - testCounts++; -} - static void Uint256ToDecimal(const uint256_t* value, char* out, size_t outSize) { if (!value || !out || outSize == 0) { return; @@ -160,14 +147,289 @@ static bool MineBlock(block_t* block) { } } +static bool ParseHexAddress32(const char* in, uint8_t outAddress[32]) { + if (!in || !outAddress) { + return false; + } + + const char* p = in; + if (p[0] == '0' && (p[1] == 'x' || p[1] == 'X')) { + p += 2; + } + + if (strlen(p) != 64) { + return false; + } + + for (size_t i = 0; i < 32; ++i) { + char hi = p[i * 2]; + char lo = p[i * 2 + 1]; + int hiVal = (hi >= '0' && hi <= '9') ? (hi - '0') : + (hi >= 'a' && hi <= 'f') ? (10 + hi - 'a') : + (hi >= 'A' && hi <= 'F') ? (10 + hi - 'A') : -1; + int loVal = (lo >= '0' && lo <= '9') ? (lo - '0') : + (lo >= 'a' && lo <= 'f') ? (10 + lo - 'a') : + (lo >= 'A' && lo <= 'F') ? (10 + lo - 'A') : -1; + + if (hiVal < 0 || loVal < 0) { + return false; + } + + outAddress[i] = (uint8_t)((hiVal << 4) | loVal); + } + + return true; +} + +static void AddressToHexString(const uint8_t address[32], char out[65]) { + if (!address || !out) { + return; + } + to_hex(address, out); +} + +static bool FlushChainAndSheet(blockchain_t* chain, + const char* chainDataDir, + uint256_t currentSupply, + uint64_t currentReward) { + bool chainSaved = Chain_SaveToFile(chain, chainDataDir, currentSupply, currentReward); + bool sheetSaved = BalanceSheet_SaveToFile(chainDataDir); + + if (!chainSaved) { + fprintf(stderr, "failed to save chain to %s\n", chainDataDir); + } + if (!sheetSaved) { + fprintf(stderr, "failed to save balance sheet to %s\n", chainDataDir); + } + + return chainSaved && sheetSaved; +} + +static block_t* BuildNextBlock(blockchain_t* chain, uint32_t difficultyTarget) { + block_t* block = Block_Create(); + if (!block) { + return NULL; + } + + block->header.version = 1; + block->header.blockNumber = (uint64_t)Chain_Size(chain); + if (Chain_Size(chain) > 0) { + block_t* lastBlock = Chain_GetBlock(chain, Chain_Size(chain) - 1); + if (lastBlock) { + Block_CalculateHash(lastBlock, block->header.prevHash); + } else { + memset(block->header.prevHash, 0, sizeof(block->header.prevHash)); + } + } else { + memset(block->header.prevHash, 0, sizeof(block->header.prevHash)); + } + block->header.timestamp = (uint64_t)time(NULL); + block->header.difficultyTarget = difficultyTarget; + block->header.nonce = 0; + + return block; +} + +static void AddCoinbaseTransaction(block_t* block, const uint8_t minerAddress[32], uint64_t reward) { + signed_transaction_t coinbaseTx; + Transaction_Init(&coinbaseTx); + coinbaseTx.transaction.version = 1; + coinbaseTx.transaction.amount1 = reward; + coinbaseTx.transaction.fee = 0; + memcpy(coinbaseTx.transaction.recipientAddress1, minerAddress, 32); + memset(coinbaseTx.transaction.recipientAddress2, 0, sizeof(coinbaseTx.transaction.recipientAddress2)); + coinbaseTx.transaction.amount2 = 0; + memset(coinbaseTx.transaction.compressedPublicKey, 0, sizeof(coinbaseTx.transaction.compressedPublicKey)); + memset(coinbaseTx.transaction.senderAddress, 0xFF, sizeof(coinbaseTx.transaction.senderAddress)); + Block_AddTransaction(block, &coinbaseTx); +} + +static bool MineAndAppendBlock(blockchain_t* chain, + block_t* block, + uint256_t* currentSupply, + uint64_t* currentReward, + uint32_t* difficultyTarget) { + if (!chain || !block || !currentSupply || !currentReward || !difficultyTarget) { + return false; + } + + uint8_t merkleRoot[32]; + Block_CalculateMerkleRoot(block, merkleRoot); + memcpy(block->header.merkleRoot, merkleRoot, sizeof(block->header.merkleRoot)); + + if (!MineBlock(block)) { + fprintf(stderr, "failed to mine block within nonce range\n"); + return false; + } + + if (!Chain_AddBlock(chain, block)) { + fprintf(stderr, "failed to append block to chain\n"); + return false; + } + + uint64_t coinbaseAmount = 0; + if (block->transactions && DynArr_size(block->transactions) > 0) { + signed_transaction_t* firstTx = (signed_transaction_t*)DynArr_at(block->transactions, 0); + if (firstTx && Address_IsCoinbase(firstTx->transaction.senderAddress)) { + coinbaseAmount = firstTx->transaction.amount1; + } + } + + (void)uint256_add_u64(currentSupply, coinbaseAmount); + + uint8_t canonicalHash[32]; + uint8_t powHash[32]; + Block_CalculateHash(block, canonicalHash); + Block_CalculateAutolykos2Hash(block, powHash); + + char supplyStr[80]; + Uint256ToDecimal(currentSupply, supplyStr, sizeof(supplyStr)); + printf("Mined block height=%llu nonce=%llu reward=%llu supply=%s diff=%#x pow=%02x%02x%02x%02x... canonical=%02x%02x%02x%02x...\n", + (unsigned long long)block->header.blockNumber, + (unsigned long long)block->header.nonce, + (unsigned long long)coinbaseAmount, + supplyStr, + (unsigned int)block->header.difficultyTarget, + powHash[0], powHash[1], powHash[2], powHash[3], + canonicalHash[0], canonicalHash[1], canonicalHash[2], canonicalHash[3]); + + *currentReward = CalculateBlockReward(*currentSupply, chain); + + if (Chain_Size(chain) % DIFFICULTY_ADJUSTMENT_INTERVAL == 0) { + *difficultyTarget = Chain_ComputeNextTarget(chain, *difficultyTarget); + } + + if (Chain_Size(chain) % EPOCH_LENGTH == 0 && Chain_Size(chain) > 0) { + uint8_t dagSeed[32]; + GetNextDAGSeed(chain, dagSeed); + (void)Block_RebuildAutolykos2Dag(CalculateTargetDAGSize(chain), dagSeed); + } + + return true; +} + +static bool GenerateRandomTestAddress(uint8_t outAddress[32]) { + if (!outAddress) { + return false; + } + + uint8_t privateKey[32]; + uint8_t compressedPubkey[33]; + secp256k1_pubkey pubkey; + + secp256k1_context* ctx = secp256k1_context_create(SECP256K1_CONTEXT_SIGN); + if (!ctx) { + return false; + } + + for (size_t attempt = 0; attempt < 4096; ++attempt) { + for (size_t i = 0; i < sizeof(privateKey); ++i) { + privateKey[i] = (uint8_t)(rand() & 0xFF); + } + + if (!secp256k1_ec_seckey_verify(ctx, privateKey)) { + continue; + } + + if (!secp256k1_ec_pubkey_create(ctx, &pubkey, privateKey)) { + continue; + } + + size_t pubLen = sizeof(compressedPubkey); + if (!secp256k1_ec_pubkey_serialize(ctx, compressedPubkey, &pubLen, &pubkey, SECP256K1_EC_COMPRESSED) || pubLen != 33) { + continue; + } + + AddressFromCompressedPubkey(compressedPubkey, outAddress); + secp256k1_context_destroy(ctx); + return true; + } + + secp256k1_context_destroy(ctx); + return false; +} + +static void WipeChainFiles(const char* chainDataDir) { + if (!chainDataDir) { + return; + } + + char path[512]; + + snprintf(path, sizeof(path), "%s/chain.meta", chainDataDir); + remove(path); + + snprintf(path, sizeof(path), "%s/chain.data", chainDataDir); + remove(path); + + snprintf(path, sizeof(path), "%s/chain.table", chainDataDir); + remove(path); + + snprintf(path, sizeof(path), "%s/balance_sheet.data", chainDataDir); + remove(path); +} + +static bool VerifyChainFully(blockchain_t* chain) { + if (!chain || !chain->blocks) { + return false; + } + + size_t chainSize = Chain_Size(chain); + for (size_t i = 0; i < chainSize; ++i) { + block_t* blk = Chain_GetBlock(chain, i); + if (!blk || !blk->transactions) { + return false; + } + + if (blk->header.blockNumber != (uint64_t)i) { + return false; + } + + if (i == 0) { + uint8_t zeroHash[32] = {0}; + if (memcmp(blk->header.prevHash, zeroHash, sizeof(zeroHash)) != 0) { + return false; + } + } else { + block_t* prevBlk = Chain_GetBlock(chain, i - 1); + if (!prevBlk) { + return false; + } + + uint8_t expectedPrevHash[32]; + Block_CalculateHash(prevBlk, expectedPrevHash); + if (memcmp(blk->header.prevHash, expectedPrevHash, sizeof(expectedPrevHash)) != 0) { + return false; + } + } + + if (!Block_HasValidProofOfWork(blk)) { + return false; + } + + if (!Block_AllTransactionsValid(blk)) { + return false; + } + + uint8_t expectedMerkle[32]; + Block_CalculateMerkleRoot(blk, expectedMerkle); + if (memcmp(blk->header.merkleRoot, expectedMerkle, sizeof(expectedMerkle)) != 0) { + return false; + } + } + + return true; +} + int main(int argc, char* argv[]) { + (void)argc; + (void)argv; + signal(SIGINT, handle_sigint); srand((unsigned int)time(NULL)); BalanceSheet_Init(); const char* chainDataDir = CHAIN_DATA_DIR; - const uint64_t blocksToMine = 1000; - const double targetSeconds = TARGET_BLOCK_TIME; uint256_t currentSupply = uint256_from_u64(0); @@ -180,13 +442,13 @@ int main(int argc, char* argv[]) { blockchain_t* chain = Chain_Create(); if (!chain) { fprintf(stderr, "failed to create chain\n"); + Node_Destroy(node); + BalanceSheet_Destroy(); return 1; } - uint8_t lastSavedHash[32]; - bool isFirstBlockOfLoadedChain = true; - - if (!Chain_LoadFromFile(chain, chainDataDir, ¤tSupply, &difficultyTarget, ¤tReward, lastSavedHash)) { + uint8_t lastSavedHash[32] = {0}; + if (!Chain_LoadFromFile(chain, chainDataDir, ¤tSupply, &difficultyTarget, ¤tReward, lastSavedHash, false)) { printf("No existing chain loaded from %s\n", chainDataDir); } @@ -199,8 +461,6 @@ int main(int argc, char* argv[]) { ? (PHASE1_TARGET_BLOCKS / EMISSION_ACCELERATION_FACTOR) : 1; - // During phase 1, reward is deterministic from (supply,height), so always recompute. - // This avoids using stale on-disk cached rewards (e.g. floor reward after genesis). if ((uint64_t)Chain_Size(chain) < effectivePhase1Blocks || currentReward == 0) { currentReward = CalculateBlockReward(currentSupply, chain); } @@ -209,286 +469,292 @@ int main(int argc, char* argv[]) { uint8_t dagSeed[32]; GetNextDAGSeed(chain, dagSeed); (void)Block_RebuildAutolykos2Dag(CalculateTargetDAGSize(chain), dagSeed); + printf("Built initial DAG with seed %02x%02x%02x%02x... and size %zu bytes\n", + dagSeed[0], dagSeed[1], dagSeed[2], dagSeed[3], + CalculateTargetDAGSize(chain)); } if (Chain_Size(chain) > 0) { if (Chain_IsValid(chain)) { printf("Loaded chain with %zu blocks from disk\n", Chain_Size(chain)); } else { - fprintf(stderr, "loaded chain is invalid, scrapping, resyncing.\n"); // TODO: Actually implement resyncing from peers instead of just scrapping the chain - const size_t badSize = Chain_Size(chain); - - // Delete files (wipe dir) - for (size_t i = 0; i < badSize; i++) { - char filePath[256]; - snprintf(filePath, sizeof(filePath), "%s/block_%zu.dat", chainDataDir, i); - remove(filePath); - } - - char metaPath[256]; - snprintf(metaPath, sizeof(metaPath), "%s/chain.meta", chainDataDir); - remove(metaPath); - + fprintf(stderr, "loaded chain is invalid, wiping persisted state.\n"); + WipeChainFiles(chainDataDir); Chain_Wipe(chain); + BalanceSheet_Destroy(); + BalanceSheet_Init(); + currentSupply = uint256_from_u64(0); + difficultyTarget = INITIAL_DIFFICULTY; + currentReward = CalculateBlockReward(currentSupply, chain); } } - // Get flag from argv "-mine" to mine blocks - if (argc > 1 && strcmp(argv[1], "-mine") == 0) { - printf("Mining %llu blocks with target time %.0fs...\n", (unsigned long long)blocksToMine, targetSeconds); + uint8_t minerAddress[32]; + uint8_t minerPrivateKey[32]; + uint8_t minerCompressedPubkey[33]; + if (!GenerateTestMinerIdentity(minerPrivateKey, minerCompressedPubkey, minerAddress)) { + fprintf(stderr, "failed to generate test miner keypair\n"); + Chain_Destroy(chain); + Node_Destroy(node); + Block_ShutdownPowContext(); + BalanceSheet_Destroy(); + return 1; + } - uint8_t minerAddress[32]; - uint8_t minerPrivateKey[32]; - uint8_t minerCompressedPubkey[33]; - if (!GenerateTestMinerIdentity(minerPrivateKey, minerCompressedPubkey, minerAddress)) { - fprintf(stderr, "failed to generate test miner keypair\n"); - Chain_Destroy(chain); - Block_ShutdownPowContext(); - BalanceSheet_Destroy(); - return 1; + char minerAddressHex[65]; + AddressToHexString(minerAddress, minerAddressHex); + printf("Test miner address: %s\n", minerAddressHex); + + char supplyStr[80]; + Uint256ToDecimal(¤tSupply, supplyStr, sizeof(supplyStr)); + printf("Current chain has %zu blocks, total supply %s\n", Chain_Size(chain), supplyStr); + printf("Commands: mine , send
, balance [address], flushchain, fullverify, wipechain, genaddr, exit\n"); + + char line[1024]; + while (true) { + printf("> "); + fflush(stdout); + + if (!fgets(line, sizeof(line), stdin)) { + break; } - for (uint64_t mined = 0; mined < blocksToMine; ++mined) { - block_t* block = Block_Create(); + line[strcspn(line, "\r\n")] = '\0'; + if (line[0] == '\0') { + continue; + } + + char* cmd = strtok(line, " \t"); + if (!cmd) { + continue; + } + + if (strcmp(cmd, "mine") == 0) { + char* blocksStr = strtok(NULL, " \t"); + if (!blocksStr) { + printf("usage: mine \n"); + continue; + } + + char* endptr = NULL; + unsigned long long requested = strtoull(blocksStr, &endptr, 10); + if (*blocksStr == '\0' || blocksStr[0] == '-' || (endptr && *endptr != '\0')) { + printf("invalid block count\n"); + continue; + } + + printf("Mining %llu block(s)...\n", requested); + bool minedAll = true; + for (unsigned long long i = 0; i < requested; ++i) { + block_t* block = BuildNextBlock(chain, difficultyTarget); + if (!block) { + fprintf(stderr, "failed to create block\n"); + minedAll = false; + break; + } + + AddCoinbaseTransaction(block, minerAddress, currentReward); + + if (!MineAndAppendBlock(chain, block, ¤tSupply, ¤tReward, &difficultyTarget)) { + Block_Destroy(block); + minedAll = false; + break; + } + + free(block); // Chain stores block by value and owns copied transaction array. + } + + if (minedAll) { + (void)FlushChainAndSheet(chain, chainDataDir, currentSupply, currentReward); + printf("mine finished and chain flushed\n"); + } + continue; + } + + if (strcmp(cmd, "send") == 0) { + char* addressStr = strtok(NULL, " \t"); + char* amountStr = strtok(NULL, " \t"); + if (!addressStr || !amountStr) { + printf("usage: send
\n"); + continue; + } + + uint8_t recipientAddress[32]; + if (!ParseHexAddress32(addressStr, recipientAddress)) { + printf("invalid address: expected 64 hex chars (optionally prefixed with 0x)\n"); + continue; + } + + char* endptr = NULL; + unsigned long long amount = strtoull(amountStr, &endptr, 10); + if (*amountStr == '\0' || amountStr[0] == '-' || (endptr && *endptr != '\0') || amount == 0) { + printf("invalid amount\n"); + continue; + } + + balance_sheet_entry_t senderEntry; + if (!BalanceSheet_Lookup(minerAddress, &senderEntry)) { + printf("send failed: miner address has no balance\n"); + continue; + } + + uint256_t spend = uint256_from_u64((uint64_t)amount); + if (uint256_cmp(&senderEntry.balance, &spend) < 0) { + printf("send failed: insufficient balance\n"); + continue; + } + + block_t* block = BuildNextBlock(chain, difficultyTarget); if (!block) { fprintf(stderr, "failed to create block\n"); - Chain_Destroy(chain); - Block_ShutdownPowContext(); - return 1; + continue; } - block->header.version = 1; - block->header.blockNumber = (uint64_t)Chain_Size(chain); - if (Chain_Size(chain) > 0) { - if (!isFirstBlockOfLoadedChain) { - block_t* lastBlock = Chain_GetBlock(chain, Chain_Size(chain) - 1); - if (lastBlock) { - Block_CalculateHash(lastBlock, block->header.prevHash); - } else { - memset(block->header.prevHash, 0, sizeof(block->header.prevHash)); - } - } else { - memcpy(block->header.prevHash, lastSavedHash, sizeof(lastSavedHash)); - } - } else { - memset(block->header.prevHash, 0, sizeof(block->header.prevHash)); - } - block->header.timestamp = (uint64_t)time(NULL); - block->header.difficultyTarget = difficultyTarget; - block->header.nonce = 0; - - signed_transaction_t coinbaseTx; - Transaction_Init(&coinbaseTx); - coinbaseTx.transaction.version = 1; - coinbaseTx.transaction.amount1 = currentReward; - coinbaseTx.transaction.fee = 0; - memcpy(coinbaseTx.transaction.recipientAddress1, minerAddress, sizeof(minerAddress)); - coinbaseTx.transaction.recipientAddress2[0] = 0; // Mark recipient 2 as unused - coinbaseTx.transaction.amount2 = 0; - memset(coinbaseTx.transaction.compressedPublicKey, 0, sizeof(coinbaseTx.transaction.compressedPublicKey)); - memset(coinbaseTx.transaction.senderAddress, 0xFF, sizeof(coinbaseTx.transaction.senderAddress)); - Block_AddTransaction(block, &coinbaseTx); - - uint8_t merkleRoot[32]; - Block_CalculateMerkleRoot(block, merkleRoot); - memcpy(block->header.merkleRoot, merkleRoot, sizeof(block->header.merkleRoot)); - - if (!MineBlock(block)) { - fprintf(stderr, "failed to mine block within nonce range\n"); - Block_Destroy(block); - Chain_Destroy(chain); - Block_ShutdownPowContext(); - return 1; - } - - if (!Chain_AddBlock(chain, block)) { - fprintf(stderr, "failed to append block to chain\n"); - Block_Destroy(block); - Chain_Destroy(chain); - Block_ShutdownPowContext(); - return 1; - } - - (void)uint256_add_u64(¤tSupply, coinbaseTx.transaction.amount1); - char supplyStr[80]; - Uint256ToDecimal(¤tSupply, supplyStr, sizeof(supplyStr)); - - uint8_t canonicalHash[32]; - uint8_t powHash[32]; - Block_CalculateHash(block, canonicalHash); - Block_CalculateAutolykos2Hash(block, powHash); - printf("Mined block %llu/%llu (height=%llu) nonce=%llu reward=%llu supply=%s diff=%#x merkle=%02x%02x%02x%02x... pow=%02x%02x%02x%02x... canonical=%02x%02x%02x%02x...\n", - (unsigned long long)(mined + 1), - (unsigned long long)blocksToMine, - (unsigned long long)block->header.blockNumber, - (unsigned long long)block->header.nonce, - (unsigned long long)coinbaseTx.transaction.amount1, - supplyStr, - (unsigned int)block->header.difficultyTarget, - block->header.merkleRoot[0], block->header.merkleRoot[1], block->header.merkleRoot[2], block->header.merkleRoot[3], - powHash[0], powHash[1], powHash[2], powHash[3], - canonicalHash[0], canonicalHash[1], canonicalHash[2], canonicalHash[3]); - - free(block); // chain stores blocks by value; transactions are owned by chain copy - - currentReward = CalculateBlockReward(currentSupply, chain); // Update the global currentReward for the next block - - // Save chain after each mined block; NOTE: In reality, blocks will appear every ~90s, so this won't be a realistic bottleneck on the mainnet - // Persist the reward for the *next* block so restart behavior is correct. - printf("Saving chain at height %zu...\n", Chain_Size(chain)); - Chain_SaveToFile(chain, chainDataDir, currentSupply, currentReward); - - if (Chain_Size(chain) % DIFFICULTY_ADJUSTMENT_INTERVAL == 0) { - difficultyTarget = Chain_ComputeNextTarget(chain, difficultyTarget); - } - - if (Chain_Size(chain) % EPOCH_LENGTH == 0 && Chain_Size(chain) > 0) { - uint8_t dagSeed[32]; - GetNextDAGSeed(chain, dagSeed); - (void)Block_RebuildAutolykos2Dag(CalculateTargetDAGSize(chain), dagSeed); - } - - isFirstBlockOfLoadedChain = false; - } - - // Post-loop test: spend some coins from the miner address to a different address. - // This validates sender balance checks, transaction signing, merkle root generation, - // and PoW mining for a non-coinbase transaction. - signed_transaction_t spends[100]; - for (int i = 0; i < 100; i++) { - int rng = rand() % 10; // Random amount between 0 and 9 (inclusive) - const uint64_t spendAmount = rng * DECIMALS; - uint8_t recipientAddress[32]; - MakeTestRecipientAddress(recipientAddress); + AddCoinbaseTransaction(block, minerAddress, currentReward); signed_transaction_t spendTx; Transaction_Init(&spendTx); spendTx.transaction.version = 1; spendTx.transaction.fee = 0; - spendTx.transaction.amount1 = spendAmount; + spendTx.transaction.amount1 = (uint64_t)amount; spendTx.transaction.amount2 = 0; memcpy(spendTx.transaction.senderAddress, minerAddress, sizeof(minerAddress)); memcpy(spendTx.transaction.recipientAddress1, recipientAddress, sizeof(recipientAddress)); memset(spendTx.transaction.recipientAddress2, 0, sizeof(spendTx.transaction.recipientAddress2)); memcpy(spendTx.transaction.compressedPublicKey, minerCompressedPubkey, sizeof(minerCompressedPubkey)); - Transaction_Sign(&spendTx, minerPrivateKey); - spends[i] = spendTx; + + Block_AddTransaction(block, &spendTx); + + if (!MineAndAppendBlock(chain, block, ¤tSupply, ¤tReward, &difficultyTarget)) { + Block_Destroy(block); + continue; + } + + free(block); + printf("send committed in mined block\n"); + continue; } - block_t* spendBlock = Block_Create(); - if (!spendBlock) { - fprintf(stderr, "failed to create test spend block\n"); - Chain_Destroy(chain); - Block_ShutdownPowContext(); - BalanceSheet_Destroy(); - return 1; - } + if (strcmp(cmd, "balance") == 0) { + char* addressStr = strtok(NULL, " \t"); + char* extra = strtok(NULL, " \t"); + if (extra) { + printf("usage: balance [address]\n"); + continue; + } - spendBlock->header.version = 1; - spendBlock->header.blockNumber = (uint64_t)Chain_Size(chain); - if (Chain_Size(chain) > 0) { - block_t* lastBlock = Chain_GetBlock(chain, Chain_Size(chain) - 1); - if (lastBlock) { - Block_CalculateHash(lastBlock, spendBlock->header.prevHash); + uint8_t queryAddress[32]; + uint8_t* effectiveAddress = minerAddress; + + if (addressStr) { + if (!ParseHexAddress32(addressStr, queryAddress)) { + printf("invalid address: expected 64 hex chars (optionally prefixed with 0x)\n"); + continue; + } + effectiveAddress = queryAddress; + } + + balance_sheet_entry_t entry; + char balanceStr[80]; + if (!BalanceSheet_Lookup(effectiveAddress, &entry)) { + uint256_t zero = uint256_from_u64(0); + Uint256ToDecimal(&zero, balanceStr, sizeof(balanceStr)); } else { - memset(spendBlock->header.prevHash, 0, sizeof(spendBlock->header.prevHash)); + Uint256ToDecimal(&entry.balance, balanceStr, sizeof(balanceStr)); } - } else { - memset(spendBlock->header.prevHash, 0, sizeof(spendBlock->header.prevHash)); - } - spendBlock->header.timestamp = (uint64_t)time(NULL); - spendBlock->header.difficultyTarget = difficultyTarget; - spendBlock->header.nonce = 0; - signed_transaction_t testCoinbaseTx; - Transaction_Init(&testCoinbaseTx); - memset(&testCoinbaseTx, 0, sizeof(testCoinbaseTx)); - testCoinbaseTx.transaction.version = 1; - testCoinbaseTx.transaction.amount1 = currentReward; - testCoinbaseTx.transaction.fee = 0; - memcpy(testCoinbaseTx.transaction.recipientAddress1, minerAddress, sizeof(minerAddress)); - testCoinbaseTx.transaction.recipientAddress2[0] = 0; - testCoinbaseTx.transaction.amount2 = 0; - memset(testCoinbaseTx.transaction.compressedPublicKey, 0, sizeof(testCoinbaseTx.transaction.compressedPublicKey)); - memset(testCoinbaseTx.transaction.senderAddress, 0xFF, sizeof(testCoinbaseTx.transaction.senderAddress)); - - Block_AddTransaction(spendBlock, &testCoinbaseTx); - for (int i = 0; i < 100; i++) { - Block_AddTransaction(spendBlock, &spends[i]); + char addrHex[65]; + AddressToHexString(effectiveAddress, addrHex); + printf("Balance %s: %s unit(s)\n", addrHex, balanceStr); + continue; } - uint8_t merkleRoot[32]; - Block_CalculateMerkleRoot(spendBlock, merkleRoot); - memcpy(spendBlock->header.merkleRoot, merkleRoot, sizeof(spendBlock->header.merkleRoot)); - - if (!MineBlock(spendBlock)) { - fprintf(stderr, "failed to mine test spend block\n"); - Block_Destroy(spendBlock); - Chain_Destroy(chain); - Block_ShutdownPowContext(); - BalanceSheet_Destroy(); - return 1; - } - - if (!Chain_AddBlock(chain, spendBlock)) { - fprintf(stderr, "failed to append test spend block to chain\n"); - Block_Destroy(spendBlock); - Chain_Destroy(chain); - Block_ShutdownPowContext(); - BalanceSheet_Destroy(); - return 1; - } - - (void)uint256_add_u64(¤tSupply, testCoinbaseTx.transaction.amount1); - currentReward = CalculateBlockReward(currentSupply, chain); - - //printf("Mined test spend block (height=%llu) sending %llu base units to a new address\n", - // (unsigned long long)spendBlock->header.blockNumber, - // (unsigned long long)spendAmount); - - free(spendBlock); - - bool chainSaved = Chain_SaveToFile(chain, chainDataDir, currentSupply, currentReward); - bool sheetSaved = BalanceSheet_SaveToFile(chainDataDir); - if (!chainSaved || !sheetSaved) { - if (!chainSaved) { - fprintf(stderr, "failed to save chain to %s\n", chainDataDir); + if (strcmp(cmd, "flushchain") == 0) { + if (FlushChainAndSheet(chain, chainDataDir, currentSupply, currentReward)) { + printf("chain flushed\n"); } - if (!sheetSaved) { - fprintf(stderr, "failed to save balance sheet to %s\n", chainDataDir); + continue; + } + + if (strcmp(cmd, "fullverify") == 0) { + blockchain_t* verifyChain = Chain_Create(); + if (!verifyChain) { + printf("Chain Not OK\n"); + continue; } - } else { - char supplyStr[80]; - Uint256ToDecimal(¤tSupply, supplyStr, sizeof(supplyStr)); - printf("Saved chain with %zu blocks to %s (supply=%s)\n", - Chain_Size(chain), + + uint256_t verifySupply = uint256_from_u64(0); + uint32_t verifyDifficulty = INITIAL_DIFFICULTY; + uint64_t verifyReward = 0; + uint8_t verifyLastHash[32] = {0}; + + bool loaded = Chain_LoadFromFile( + verifyChain, chainDataDir, - supplyStr); + &verifySupply, + &verifyDifficulty, + &verifyReward, + verifyLastHash, + true + ); + + bool ok = false; + if (loaded) { + uint8_t dagSeed[32]; + GetNextDAGSeed(verifyChain, dagSeed); + (void)Block_RebuildAutolykos2Dag(CalculateTargetDAGSize(verifyChain), dagSeed); + ok = VerifyChainFully(verifyChain); + } + + printf("%s\n", ok ? "Chain OK" : "Chain Not OK"); + Chain_Destroy(verifyChain); + continue; } - } else { - char supplyStr[80]; - Uint256ToDecimal(¤tSupply, supplyStr, sizeof(supplyStr)); - printf("Current chain has %zu blocks, total supply %s\n", Chain_Size(chain), supplyStr); + + if (strcmp(cmd, "wipechain") == 0) { + WipeChainFiles(chainDataDir); + Chain_Wipe(chain); + BalanceSheet_Destroy(); + BalanceSheet_Init(); + currentSupply = uint256_from_u64(0); + difficultyTarget = INITIAL_DIFFICULTY; + currentReward = CalculateBlockReward(currentSupply, chain); + + uint8_t dagSeed[32]; + memset(dagSeed, DAG_GENESIS_SEED, sizeof(dagSeed)); + (void)Block_RebuildAutolykos2Dag(DAG_BASE_SIZE, dagSeed); + + printf("chain data wiped\n"); + continue; + } + + if (strcmp(cmd, "genaddr") == 0) { + uint8_t testAddress[32]; + if (!GenerateRandomTestAddress(testAddress)) { + printf("failed to generate address\n"); + continue; + } + + char addrHex[65]; + AddressToHexString(testAddress, addrHex); + printf("%s\n", addrHex); + continue; + } + + if (strcmp(cmd, "exit") == 0 || strcmp(cmd, "quit") == 0) { + break; + } + + printf("Unknown command. Available: mine, send, balance, flushchain, fullverify, wipechain, genaddr, exit\n"); } - // Print chain - /*for (size_t i = 0; i < Chain_Size(chain); i++) { - block_t* blk = Chain_GetBlock(chain, i); - if (blk) { - Block_Print(blk); - } - }*/ - - BalanceSheet_Print(); - if (!BalanceSheet_SaveToFile(chainDataDir)) { - fprintf(stderr, "failed to save balance sheet to %s\n", chainDataDir); - } + (void)FlushChainAndSheet(chain, chainDataDir, currentSupply, currentReward); Chain_Destroy(chain); Block_ShutdownPowContext(); - Node_Destroy(node); + BalanceSheet_Destroy(); return 0; } diff --git a/src/nets/net_node.c b/src/nets/net_node.c index f282ea4..38b6c9a 100644 --- a/src/nets/net_node.c +++ b/src/nets/net_node.c @@ -1,8 +1,58 @@ #include +#include +#include +#include + +static net_node_t* Node_FromConnection(tcp_connection_t* conn) { + if (!conn) { + return NULL; + } + + return (net_node_t*)conn->owner; +} + +static int Node_DecodePacket(const tcp_connection_t* conn, packet_type_t* outType, const unsigned char** outPayload, size_t* outPayloadLen) { + if (!conn || !outType || !outPayload || !outPayloadLen || conn->dataBufLen < 1 || !conn->dataBuf) { + return -1; + } + + uint8_t packetType = conn->dataBuf[0]; + if (!PacketType_IsValid(packetType)) { + return -1; + } + + *outType = (packet_type_t)packetType; + *outPayload = conn->dataBuf + 1; + *outPayloadLen = conn->dataBufLen - 1; + return 0; +} + +static void Node_ForwardConnect(net_node_t* node, tcp_connection_t* conn) { + if (node && node->on_connect) { + node->on_connect(conn, node->callbackUser); + } +} + +static void Node_ForwardDisconnect(net_node_t* node, tcp_connection_t* conn) { + if (node && node->on_disconnect) { + node->on_disconnect(conn, node->callbackUser); + } +} + +static void Node_ForwardData(net_node_t* node, tcp_connection_t* conn, const unsigned char* payload, size_t payloadLen) { + if (node && node->on_data) { + node->on_data(conn, payload, payloadLen, node->callbackUser); + } +} + net_node_t* Node_Create() { net_node_t* node = (net_node_t*)malloc(sizeof(net_node_t)); - if (!node) { return NULL; } + if (!node) { + return NULL; + } + + memset(node, 0, sizeof(*node)); node->server = TcpServer_Create(); if (!node->server) { @@ -10,34 +60,189 @@ net_node_t* Node_Create() { return NULL; } - TcpServer_Init(node->server, LISTEN_PORT, "0.0.0.0"); // All interfaces - - // Register callbacks before starting the server + for (size_t i = 0; i < MAX_CONS; ++i) { + if (TcpClient_Init(&node->outboundClients[i]) != 0) { + Node_Destroy(node); + return NULL; + } + } + + TcpServer_Init(node->server, LISTEN_PORT, "0.0.0.0"); + + node->server->owner = node; node->server->on_connect = Node_Server_OnConnect; node->server->on_data = Node_Server_OnData; node->server->on_disconnect = Node_Server_OnDisconnect; - + TcpServer_Start(node->server, MAX_CONS); return node; } void Node_Destroy(net_node_t* node) { - if (!node || !node->server) { return; } - TcpServer_Stop(node->server); - TcpServer_Destroy(node->server); + if (!node) { + return; + } + + for (size_t i = 0; i < MAX_CONS; ++i) { + TcpClient_Destroy(&node->outboundClients[i]); + } + node->outboundCount = 0; + + if (node->server) { + TcpServer_Stop(node->server); + TcpServer_Destroy(node->server); + } free(node); } +void Node_SetCallbacks( + net_node_t* node, + void (*on_connect)(tcp_connection_t* conn, void* user), + void (*on_data)(tcp_connection_t* conn, const unsigned char* data, size_t len, void* user), + void (*on_disconnect)(tcp_connection_t* conn, void* user), + void* user +) { + if (!node) { + return; + } + + node->on_connect = on_connect; + node->on_data = on_data; + node->on_disconnect = on_disconnect; + node->callbackUser = user; +} + +int Node_ConnectPeer(net_node_t* node, const char* ip, unsigned short port) { + if (!node || !ip) { + return -1; + } + + for (size_t i = 0; i < MAX_CONS; ++i) { + if (node->outboundClients[i].connection == NULL) { + if (TcpClient_Connect( + &node->outboundClients[i], + ip, + port, + Node_Client_OnConnect, + Node_Client_OnData, + Node_Client_OnDisconnect, + node + ) == 0) { + node->outboundCount++; + return 0; + } + return -1; + } + } + + return -1; +} + +int Node_ConnectStartupPeers(net_node_t* node, const char** ips, const unsigned short* ports, size_t peersCount) { + if (!node || !ips || !ports) { + return -1; + } + + int successes = 0; + for (size_t i = 0; i < peersCount; ++i) { + if (Node_ConnectPeer(node, ips[i], ports[i]) == 0) { + successes++; + } + } + + return successes; +} + +int Node_SendPacket(net_node_t* node, tcp_connection_t* conn, packet_type_t packetType, const void* payload, size_t payloadLen) { + if (!node || !conn || !PacketType_IsValid((uint8_t)packetType) || (!payload && payloadLen > 0)) { + return -1; + } + + if (conn->role == TCP_CONNECTION_ROLE_INBOUND && packetType != PACKET_TYPE_RESPONSE) { + return -1; + } + + if (conn->role == TCP_CONNECTION_ROLE_OUTBOUND && packetType != PACKET_TYPE_REQUEST) { + return -1; + } + + size_t framePayloadLen = payloadLen + 1; + unsigned char* framed = (unsigned char*)malloc(framePayloadLen); + if (!framed) { + return -1; + } + + framed[0] = (unsigned char)packetType; + if (payloadLen > 0) { + memcpy(framed + 1, payload, payloadLen); + } + + int rc = TcpConnection_SendFramed(conn, framed, framePayloadLen); + free(framed); + + return rc; +} + void Node_Server_OnConnect(tcp_connection_t* client) { - printf("A node connected!\n"); + net_node_t* node = Node_FromConnection(client); + Node_ForwardConnect(node, client); + printf("Inbound node connected: %u\n", client ? client->connectionId : 0U); } void Node_Server_OnData(tcp_connection_t* client) { - printf("A node sent data!\n"); + packet_type_t packetType; + const unsigned char* payload = NULL; + size_t payloadLen = 0; + + if (!client || Node_DecodePacket(client, &packetType, &payload, &payloadLen) != 0) { + return; + } + + if (packetType != PACKET_TYPE_REQUEST) { + return; + } + + net_node_t* node = Node_FromConnection(client); + Node_ForwardData(node, client, payload, payloadLen); } void Node_Server_OnDisconnect(tcp_connection_t* client) { - printf("A node disconnected!\n"); + net_node_t* node = Node_FromConnection(client); + Node_ForwardDisconnect(node, client); + printf("Inbound node disconnected: %u\n", client ? client->connectionId : 0U); +} + +void Node_Client_OnConnect(tcp_connection_t* client) { + net_node_t* node = Node_FromConnection(client); + Node_ForwardConnect(node, client); + printf("Outbound node connected: %u\n", client ? client->connectionId : 0U); +} + +void Node_Client_OnData(tcp_connection_t* client) { + packet_type_t packetType; + const unsigned char* payload = NULL; + size_t payloadLen = 0; + + if (!client || Node_DecodePacket(client, &packetType, &payload, &payloadLen) != 0) { + return; + } + + if (packetType != PACKET_TYPE_RESPONSE) { + return; + } + + net_node_t* node = Node_FromConnection(client); + Node_ForwardData(node, client, payload, payloadLen); +} + +void Node_Client_OnDisconnect(tcp_connection_t* client) { + net_node_t* node = Node_FromConnection(client); + if (node && node->outboundCount > 0) { + node->outboundCount--; + } + + Node_ForwardDisconnect(node, client); + printf("Outbound node disconnected: %u\n", client ? client->connectionId : 0U); } diff --git a/src/tcpd/tcpclient.c b/src/tcpd/tcpclient.c new file mode 100644 index 0000000..758b59d --- /dev/null +++ b/src/tcpd/tcpclient.c @@ -0,0 +1,171 @@ +#ifndef _WIN32 + +#include + +#include +#include +#include +#include +#include +#include +#include + +static void* TcpClient_ThreadProc(void* arg) { + tcp_client_t* client = (tcp_client_t*)arg; + if (!client || !client->connection) { + return NULL; + } + + tcp_connection_t* conn = client->connection; + unsigned char ioBuf[TCP_IO_BUFFER_SIZE]; + + while (1) { + ssize_t n = recv(conn->sockFd, ioBuf, sizeof(ioBuf), 0); + if (n == 0) { + break; + } + if (n < 0) { + if (errno == EINTR) { + continue; + } + break; + } + + if (TcpConnection_FeedFramedData(conn, ioBuf, (size_t)n) != 0) { + break; + } + } + + if (!TcpConnection_IsDisconnectNotified(conn) && conn->on_disconnect) { + TcpConnection_MarkDisconnectNotified(conn); + conn->on_disconnect(conn); + } + + return NULL; +} + +int TcpClient_Init(tcp_client_t* client) { + if (!client) { + return -1; + } + + memset(client, 0, sizeof(*client)); + client->connection = NULL; + return 0; +} + +void TcpClient_Destroy(tcp_client_t* client) { + if (!client) { + return; + } + + TcpClient_Disconnect(client); +} + +int TcpClient_Connect( + tcp_client_t* client, + const char* peerIp, + unsigned short peerPort, + void (*on_connect)(tcp_connection_t* conn), + void (*on_data)(tcp_connection_t* conn), + void (*on_disconnect)(tcp_connection_t* conn), + void* owner +) { + if (!client || !peerIp) { + return -1; + } + + if (client->connection) { + return -1; + } + + int sockFd = socket(AF_INET, SOCK_STREAM, 0); + if (sockFd < 0) { + return -1; + } + + struct sockaddr_in peerAddr; + memset(&peerAddr, 0, sizeof(peerAddr)); + peerAddr.sin_family = AF_INET; + peerAddr.sin_port = htons(peerPort); + + if (inet_pton(AF_INET, peerIp, &peerAddr.sin_addr) <= 0) { + close(sockFd); + return -1; + } + + if (connect(sockFd, (struct sockaddr*)&peerAddr, sizeof(peerAddr)) < 0) { + close(sockFd); + return -1; + } + + tcp_connection_t* conn = (tcp_connection_t*)malloc(sizeof(*conn)); + if (!conn) { + close(sockFd); + return -1; + } + + if (TcpConnection_Init(conn, sockFd, &peerAddr, TCP_CONNECTION_ROLE_OUTBOUND) != 0) { + free(conn); + close(sockFd); + return -1; + } + + conn->connectionId = random_four_byte(); + conn->on_data = on_data; + conn->on_disconnect = on_disconnect; + conn->owner = owner; + + client->connection = conn; + client->on_connect = on_connect; + client->on_data = on_data; + client->on_disconnect = on_disconnect; + client->owner = owner; + + if (client->on_connect) { + client->on_connect(conn); + } + + if (pthread_create(&conn->ioThread, NULL, TcpClient_ThreadProc, client) != 0) { + TcpConnection_Destroy(conn); + free(conn); + client->connection = NULL; + return -1; + } + + return 0; +} + +int TcpClient_Send(tcp_client_t* client, const void* data, size_t len) { + if (!client || !client->connection) { + return -1; + } + + return TcpConnection_SendFramed(client->connection, data, len); +} + +void TcpClient_Disconnect(tcp_client_t* client) { + if (!client || !client->connection) { + return; + } + + tcp_connection_t* conn = client->connection; + + TcpConnection_RequestClose(conn); + + if (!pthread_equal(conn->ioThread, pthread_self())) { + pthread_join(conn->ioThread, NULL); + } + + if (!TcpConnection_IsDisconnectNotified(conn) && conn->on_disconnect) { + TcpConnection_MarkDisconnectNotified(conn); + conn->on_disconnect(conn); + } + + TcpConnection_Destroy(conn); + free(conn); + + client->connection = NULL; +} + +#endif diff --git a/src/tcpd/tcpconnection.c b/src/tcpd/tcpconnection.c new file mode 100644 index 0000000..c6cbfca --- /dev/null +++ b/src/tcpd/tcpconnection.c @@ -0,0 +1,264 @@ +#ifndef _WIN32 + +#include + +#include +#include +#include +#include +#include +#include + +int TcpConnection_Init(tcp_connection_t* conn, int sockFd, const struct sockaddr_in* peerAddr, tcp_connection_role_t role) { + if (!conn || sockFd < 0 || !peerAddr) { + return -1; + } + + memset(conn, 0, sizeof(*conn)); + conn->sockFd = sockFd; + conn->peerAddr = *peerAddr; + conn->role = role; + + if (pthread_mutex_init(&conn->sendLock, NULL) != 0) { + return -1; + } + + if (pthread_mutex_init(&conn->stateLock, NULL) != 0) { + pthread_mutex_destroy(&conn->sendLock); + return -1; + } + + conn->closing = false; + conn->disconnectedNotified = false; + conn->dataBuf = NULL; + conn->dataBufLen = 0; + conn->dataBufCap = 0; + + TcpConnection_ResetFramingState(conn); + + return 0; +} + +void TcpConnection_Destroy(tcp_connection_t* conn) { + if (!conn) { + return; + } + + if (conn->sockFd >= 0) { + close(conn->sockFd); + conn->sockFd = -1; + } + + free(conn->dataBuf); + conn->dataBuf = NULL; + conn->dataBufLen = 0; + conn->dataBufCap = 0; + + free(conn->frameBuf); + conn->frameBuf = NULL; + conn->frameBytesRead = 0; + + pthread_mutex_destroy(&conn->stateLock); + pthread_mutex_destroy(&conn->sendLock); +} + +int TcpConnection_SetDataBuffer(tcp_connection_t* conn, const unsigned char* data, size_t len) { + if (!conn || (!data && len > 0)) { + return -1; + } + + if (len > conn->dataBufCap) { + unsigned char* resized = (unsigned char*)realloc(conn->dataBuf, len); + if (!resized) { + return -1; + } + + conn->dataBuf = resized; + conn->dataBufCap = len; + } + + if (len > 0) { + memcpy(conn->dataBuf, data, len); + } + conn->dataBufLen = len; + + return 0; +} + +void TcpConnection_ResetFramingState(tcp_connection_t* conn) { + if (!conn) { + return; + } + + memset(conn->headerBuf, 0, sizeof(conn->headerBuf)); + conn->headerBytesRead = 0; + conn->expectedPayloadLen = 0; + conn->frameBytesRead = 0; + + free(conn->frameBuf); + conn->frameBuf = NULL; +} + +int TcpConnection_FeedFramedData(tcp_connection_t* conn, const unsigned char* input, size_t inputLen) { + if (!conn || (!input && inputLen > 0)) { + return -1; + } + + size_t offset = 0; + + while (offset < inputLen) { + if (conn->headerBytesRead < TCP_FRAME_HEADER_SIZE) { + size_t needed = TCP_FRAME_HEADER_SIZE - conn->headerBytesRead; + size_t take = (inputLen - offset < needed) ? (inputLen - offset) : needed; + + memcpy(conn->headerBuf + conn->headerBytesRead, input + offset, take); + conn->headerBytesRead += take; + offset += take; + + if (conn->headerBytesRead < TCP_FRAME_HEADER_SIZE) { + continue; + } + + uint32_t beLen = 0; + memcpy(&beLen, conn->headerBuf, sizeof(beLen)); + conn->expectedPayloadLen = ntohl(beLen); + + if (conn->expectedPayloadLen > TCP_MAX_FRAME_PAYLOAD) { + TcpConnection_ResetFramingState(conn); + return -1; + } + + if (conn->expectedPayloadLen == 0) { + if (TcpConnection_SetDataBuffer(conn, NULL, 0) != 0) { + TcpConnection_ResetFramingState(conn); + return -1; + } + + if (conn->on_data) { + conn->on_data(conn); + } + + conn->headerBytesRead = 0; + conn->expectedPayloadLen = 0; + continue; + } + + conn->frameBuf = (unsigned char*)malloc(conn->expectedPayloadLen); + if (!conn->frameBuf) { + TcpConnection_ResetFramingState(conn); + return -1; + } + conn->frameBytesRead = 0; + } + + size_t frameRemaining = conn->expectedPayloadLen - conn->frameBytesRead; + size_t take = (inputLen - offset < frameRemaining) ? (inputLen - offset) : frameRemaining; + + memcpy(conn->frameBuf + conn->frameBytesRead, input + offset, take); + conn->frameBytesRead += take; + offset += take; + + if (conn->frameBytesRead == conn->expectedPayloadLen) { + if (TcpConnection_SetDataBuffer(conn, conn->frameBuf, conn->expectedPayloadLen) != 0) { + TcpConnection_ResetFramingState(conn); + return -1; + } + + if (conn->on_data) { + conn->on_data(conn); + } + + conn->headerBytesRead = 0; + conn->expectedPayloadLen = 0; + conn->frameBytesRead = 0; + free(conn->frameBuf); + conn->frameBuf = NULL; + } + } + + return 0; +} + +int TcpConnection_SendRaw(int sockFd, const void* data, size_t len) { + if (sockFd < 0 || (!data && len > 0)) { + return -1; + } + + size_t totalSent = 0; + const unsigned char* ptr = (const unsigned char*)data; + + while (totalSent < len) { + ssize_t sent = send(sockFd, ptr + totalSent, len - totalSent, 0); + if (sent < 0) { + if (errno == EINTR) { + continue; + } + return -1; + } + if (sent == 0) { + return -1; + } + + totalSent += (size_t)sent; + } + + return 0; +} + +int TcpConnection_SendFramed(tcp_connection_t* conn, const void* payload, size_t payloadLen) { + if (!conn || (!payload && payloadLen > 0) || payloadLen > TCP_MAX_FRAME_PAYLOAD) { + return -1; + } + + uint32_t beLen = htonl((uint32_t)payloadLen); + + pthread_mutex_lock(&conn->sendLock); + + int rc = TcpConnection_SendRaw(conn->sockFd, &beLen, sizeof(beLen)); + if (rc == 0 && payloadLen > 0) { + rc = TcpConnection_SendRaw(conn->sockFd, payload, payloadLen); + } + + pthread_mutex_unlock(&conn->sendLock); + + return rc; +} + +void TcpConnection_RequestClose(tcp_connection_t* conn) { + if (!conn) { + return; + } + + pthread_mutex_lock(&conn->stateLock); + if (!conn->closing) { + conn->closing = true; + if (conn->sockFd >= 0) { + shutdown(conn->sockFd, SHUT_RDWR); + } + } + pthread_mutex_unlock(&conn->stateLock); +} + +void TcpConnection_MarkDisconnectNotified(tcp_connection_t* conn) { + if (!conn) { + return; + } + + pthread_mutex_lock(&conn->stateLock); + conn->disconnectedNotified = true; + pthread_mutex_unlock(&conn->stateLock); +} + +bool TcpConnection_IsDisconnectNotified(tcp_connection_t* conn) { + if (!conn) { + return true; + } + + pthread_mutex_lock(&conn->stateLock); + bool notified = conn->disconnectedNotified; + pthread_mutex_unlock(&conn->stateLock); + + return notified; +} + +#endif diff --git a/src/tcpd/tcpserver.c b/src/tcpd/tcpserver.c index e74c6a3..d50e36b 100644 --- a/src/tcpd/tcpserver.c +++ b/src/tcpd/tcpserver.c @@ -2,322 +2,352 @@ #include -tcp_server_t* TcpServer_Create() { - tcp_server_t* svr = (tcp_server_t*)malloc(sizeof(tcp_server_t)); +#include +#include +#include +#include +#include +#include +#include - if (!svr) { - perror("tcpserver - creation failure"); - exit(1); +static void TcpServer_RemoveClientByPtrUnlocked(tcp_server_t* svr, tcp_connection_t* cli) { + if (!svr || !svr->clientsArrPtr || !cli) { + return; } + size_t idx = Generic_FindClientInArrayByPtr(svr->clientsArrPtr, cli, svr->maxClients); + if (idx != SIZE_MAX) { + svr->clientsArrPtr[idx] = NULL; + } +} + +static void* TcpServer_clientthreadprocess(void* ptr) { + tcpclient_thread_args* args = (tcpclient_thread_args*)ptr; + if (!args || !args->clientPtr || !args->serverPtr) { + free(args); + return NULL; + } + + tcp_connection_t* cli = args->clientPtr; + tcp_server_t* svr = args->serverPtr; + free(args); + + unsigned char ioBuf[TCP_IO_BUFFER_SIZE]; + + while (1) { + ssize_t n = recv(cli->sockFd, ioBuf, sizeof(ioBuf), 0); + if (n == 0) { + break; + } + + if (n < 0) { + if (errno == EINTR) { + continue; + } + break; + } + + if (TcpConnection_FeedFramedData(cli, ioBuf, (size_t)n) != 0) { + break; + } + } + + TcpConnection_RequestClose(cli); + + if (!TcpConnection_IsDisconnectNotified(cli) && cli->on_disconnect) { + TcpConnection_MarkDisconnectNotified(cli); + cli->on_disconnect(cli); + } + + pthread_mutex_lock(&svr->clientsMutex); + TcpServer_RemoveClientByPtrUnlocked(svr, cli); + pthread_mutex_unlock(&svr->clientsMutex); + + TcpConnection_Destroy(cli); + free(cli); + + return NULL; +} + +static void* TcpServer_threadprocess(void* ptr) { + tcp_server_t* svr = (tcp_server_t*)ptr; + if (!svr) { + return NULL; + } + + while (svr->isRunning) { + struct sockaddr_in clientAddr; + socklen_t clientSize = sizeof(clientAddr); + int clientFd = accept(svr->sockFd, (struct sockaddr*)&clientAddr, &clientSize); + + if (clientFd < 0) { + if (!svr->isRunning) { + break; + } + if (errno == EINTR) { + continue; + } + continue; + } + + tcp_connection_t* heapCli = (tcp_connection_t*)malloc(sizeof(*heapCli)); + if (!heapCli) { + close(clientFd); + continue; + } + + if (TcpConnection_Init(heapCli, clientFd, &clientAddr, TCP_CONNECTION_ROLE_INBOUND) != 0) { + close(clientFd); + free(heapCli); + continue; + } + + heapCli->connectionId = random_four_byte(); + heapCli->on_data = svr->on_data; + heapCli->on_disconnect = svr->on_disconnect; + heapCli->owner = svr->owner; + + pthread_mutex_lock(&svr->clientsMutex); + + size_t insertIdx = SIZE_MAX; + for (size_t i = 0; i < svr->maxClients; ++i) { + if (svr->clientsArrPtr[i] == NULL) { + insertIdx = i; + break; + } + } + + if (insertIdx == SIZE_MAX) { + pthread_mutex_unlock(&svr->clientsMutex); + struct linger so_linger; + so_linger.l_onoff = 1; + so_linger.l_linger = 0; + setsockopt(heapCli->sockFd, SOL_SOCKET, SO_LINGER, &so_linger, sizeof(so_linger)); + TcpConnection_Destroy(heapCli); + free(heapCli); + continue; + } + + svr->clientsArrPtr[insertIdx] = heapCli; + pthread_mutex_unlock(&svr->clientsMutex); + + if (svr->on_connect) { + svr->on_connect(heapCli); + } + + tcpclient_thread_args* arg = (tcpclient_thread_args*)malloc(sizeof(*arg)); + if (!arg) { + TcpServer_Disconnect(svr, heapCli); + continue; + } + + arg->clientPtr = heapCli; + arg->serverPtr = svr; + + if (pthread_create(&heapCli->ioThread, NULL, TcpServer_clientthreadprocess, arg) != 0) { + free(arg); + TcpServer_Disconnect(svr, heapCli); + continue; + } + } + + return NULL; +} + +tcp_server_t* TcpServer_Create() { + tcp_server_t* svr = (tcp_server_t*)malloc(sizeof(*svr)); + if (!svr) { + return NULL; + } + + memset(svr, 0, sizeof(*svr)); svr->sockFd = -1; svr->svrThread = 0; - svr->on_connect = NULL; - svr->on_data = NULL; - svr->on_disconnect = NULL; - - svr->clients = 0; + svr->isRunning = 0; + svr->maxClients = 0; svr->clientsArrPtr = NULL; + if (pthread_mutex_init(&svr->clientsMutex, NULL) != 0) { + free(svr); + return NULL; + } + return svr; } void TcpServer_Destroy(tcp_server_t* ptr) { - if (ptr) { - if (ptr->clientsArrPtr) { - for (size_t i = 0; i < ptr->clients; i++) { - if (ptr->clientsArrPtr[i]) { - free(ptr->clientsArrPtr[i]); - } - } - - free(ptr->clientsArrPtr); - } - - close(ptr->sockFd); - free(ptr); + if (!ptr) { + return; } + + TcpServer_Stop(ptr); + + free(ptr->clientsArrPtr); + ptr->clientsArrPtr = NULL; + + pthread_mutex_destroy(&ptr->clientsMutex); + free(ptr); } void TcpServer_Init(tcp_server_t* ptr, unsigned short port, const char* addr) { - if (ptr) { - // Create socket - ptr->sockFd = socket(AF_INET, SOCK_STREAM, 0); - if (ptr->sockFd < 0) { - perror("tcpserver - socket"); - exit(EXIT_FAILURE); - } - - // Allow quick port resue - ptr->opt = 1; - setsockopt(ptr->sockFd, SOL_SOCKET, SO_REUSEADDR, &ptr->opt, sizeof(int)); - - // Fill address structure - memset(&ptr->addr, 0, sizeof(ptr->addr)); - ptr->addr.sin_family = AF_INET; - ptr->addr.sin_port = htons(port); - inet_pton(AF_INET, addr, &ptr->addr.sin_addr); - - // Bind - if (bind(ptr->sockFd, (struct sockaddr*)&ptr->addr, sizeof(ptr->addr)) < 0) { - perror("tcpserver - bind"); - close(ptr->sockFd); - exit(EXIT_FAILURE); - } - } -} - -// Do not call outside of func. -void* TcpServer_clientthreadprocess(void* ptr) { - if (!ptr) { - perror("Client ptr is null!\n"); - return NULL; + if (!ptr || !addr) { + return; } - tcpclient_thread_args* args = (tcpclient_thread_args*)ptr; - - tcp_connection_t* cli = args->clientPtr; - tcp_server_t* svr = args->serverPtr; - - if (args) { - free(args); + ptr->sockFd = socket(AF_INET, SOCK_STREAM, 0); + if (ptr->sockFd < 0) { + return; } - while (1) { - memset(cli->dataBuf, 0, MTU); // Reset buffer - ssize_t n = recv(cli->clientFd, cli->dataBuf, MTU, 0); - cli->dataBufLen = n; + ptr->opt = 1; + setsockopt(ptr->sockFd, SOL_SOCKET, SO_REUSEADDR, &ptr->opt, sizeof(int)); - if (n == 0) { - break; // Client disconnected - } else if (n > 0) { - if (cli->on_data) { - cli->on_data(cli); - } - } + memset(&ptr->addr, 0, sizeof(ptr->addr)); + ptr->addr.sin_family = AF_INET; + ptr->addr.sin_port = htons(port); + inet_pton(AF_INET, addr, &ptr->addr.sin_addr); - pthread_testcancel(); // Check for thread death + if (bind(ptr->sockFd, (struct sockaddr*)&ptr->addr, sizeof(ptr->addr)) < 0) { + close(ptr->sockFd); + ptr->sockFd = -1; } - - if (cli->on_disconnect) { - cli->on_disconnect(cli); - } - - // Close on exit - close(cli->clientFd); - - // Destroy - tcp_connection_t** arr = svr->clientsArrPtr; - size_t idx = Generic_FindClientInArrayByPtr(arr, cli, svr->clients); - if (idx != SIZE_MAX) { - if (arr[idx]) { - free(arr[idx]); - arr[idx] = NULL; - } - } else { - perror("tcpserver (client thread) - something already freed the client!"); - } - - //free(ptr); - - return NULL; -} - -// Do not call outside of func. -void* TcpServer_threadprocess(void* ptr) { - if (!ptr) { - perror("Client ptr is null!\n"); - return NULL; - } - - tcp_server_t* svr = (tcp_server_t*)ptr; - while (1) { - tcp_connection_t tempclient; - socklen_t clientsize = sizeof(tempclient.clientAddr); - int client = accept(svr->sockFd, (struct sockaddr*)&tempclient.clientAddr, &clientsize); - if (client >= 0) { - tempclient.clientFd = client; - tempclient.on_data = svr->on_data; - tempclient.on_disconnect = svr->on_disconnect; - - // I'm lazy, so I'm just copying the data for now (I should probably make this a better way) - tcp_connection_t* heapCli = (tcp_connection_t*)malloc(sizeof(tcp_connection_t)); - if (!heapCli) { - perror("tcpserver - client failed to allocate"); - exit(EXIT_FAILURE); // Wtf just happened??? - } - - heapCli->clientAddr = tempclient.clientAddr; - heapCli->clientFd = tempclient.clientFd; - heapCli->on_data = tempclient.on_data; - heapCli->on_disconnect = tempclient.on_disconnect; - heapCli->clientId = random_four_byte(); - heapCli->dataBufLen = 0; - - size_t i; - for (i = 0; i < svr->clients; i++) { - if (svr->clientsArrPtr[i] == NULL) { - // Make use of that space - svr->clientsArrPtr[i] = heapCli; // We have now transfered the ownership :) - break; - } - } - - if (i == svr->clients) { - // Not found - // RST; Thread doesn't exist yet - struct linger so_linger; - so_linger.l_onoff = 1; - so_linger.l_linger = 0; - setsockopt(heapCli->clientFd, SOL_SOCKET, SO_LINGER, &so_linger, sizeof(so_linger)); - close(heapCli->clientFd); - - free(heapCli); - heapCli = NULL; - //svr->clientsArrPtr[i] = NULL; - - continue; - } - - tcpclient_thread_args* arg = (tcpclient_thread_args*)malloc(sizeof(tcpclient_thread_args)); - arg->clientPtr = heapCli; - arg->serverPtr = svr; - - if (svr->on_connect) { - svr->on_connect(heapCli); - } - - pthread_create(&heapCli->clientThread, NULL, TcpServer_clientthreadprocess, arg); - pthread_detach(heapCli->clientThread); // May not work :( - } - - pthread_testcancel(); // Check for thread death - } - - return NULL; } void TcpServer_Start(tcp_server_t* ptr, int maxcons) { - if (ptr) { - if (listen(ptr->sockFd, maxcons) < 0) { - perror("tcpserver - listen"); - close(ptr->sockFd); - exit(EXIT_FAILURE); - } - - ptr->clients = maxcons; - ptr->clientsArrPtr = (tcp_connection_t**)malloc(sizeof(tcp_connection_t*) * maxcons); - - if (!ptr->clientsArrPtr) { - perror("tcpserver - allocation of client space fatally errored"); - exit(EXIT_FAILURE); - } - - // Fucking null out everything - for (int i = 0; i < maxcons; i++) { - ptr->clientsArrPtr[i] = NULL; - } + if (!ptr || ptr->sockFd < 0 || maxcons <= 0 || ptr->isRunning) { + return; } - // Spawn server thread - pthread_create(&ptr->svrThread, NULL, TcpServer_threadprocess, ptr); + if (listen(ptr->sockFd, maxcons) < 0) { + return; + } + + pthread_mutex_lock(&ptr->clientsMutex); + + ptr->maxClients = (size_t)maxcons; + ptr->clientsArrPtr = (tcp_connection_t**)malloc(sizeof(tcp_connection_t*) * ptr->maxClients); + if (!ptr->clientsArrPtr) { + ptr->maxClients = 0; + pthread_mutex_unlock(&ptr->clientsMutex); + return; + } + + for (size_t i = 0; i < ptr->maxClients; ++i) { + ptr->clientsArrPtr[i] = NULL; + } + + ptr->isRunning = 1; + pthread_mutex_unlock(&ptr->clientsMutex); + + if (pthread_create(&ptr->svrThread, NULL, TcpServer_threadprocess, ptr) != 0) { + pthread_mutex_lock(&ptr->clientsMutex); + ptr->isRunning = 0; + free(ptr->clientsArrPtr); + ptr->clientsArrPtr = NULL; + ptr->maxClients = 0; + pthread_mutex_unlock(&ptr->clientsMutex); + } } void TcpServer_Stop(tcp_server_t* ptr) { - if (ptr && ptr->svrThread != 0) { - // Stop server - pthread_cancel(ptr->svrThread); + if (!ptr || !ptr->isRunning) { + return; + } + + ptr->isRunning = 0; + + if (ptr->sockFd >= 0) { + shutdown(ptr->sockFd, SHUT_RDWR); + close(ptr->sockFd); + ptr->sockFd = -1; + } + + if (ptr->svrThread != 0 && !pthread_equal(ptr->svrThread, pthread_self())) { pthread_join(ptr->svrThread, NULL); + } + ptr->svrThread = 0; - // Disconnect clients - for (size_t i = 0; i < ptr->clients; i++) { - tcp_connection_t* cliPtr = ptr->clientsArrPtr[i]; - if (cliPtr) { - close(cliPtr->clientFd); - pthread_cancel(cliPtr->clientThread); - } + pthread_mutex_lock(&ptr->clientsMutex); + size_t maxClients = ptr->maxClients; + tcp_connection_t** local = ptr->clientsArrPtr; + pthread_mutex_unlock(&ptr->clientsMutex); + + for (size_t i = 0; i < maxClients; ++i) { + tcp_connection_t* cli = local[i]; + if (!cli) { + continue; } - ptr->svrThread = 0; + TcpConnection_RequestClose(cli); } + + for (size_t i = 0; i < maxClients; ++i) { + tcp_connection_t* cli = local[i]; + if (!cli) { + continue; + } + + if (!pthread_equal(cli->ioThread, pthread_self())) { + pthread_join(cli->ioThread, NULL); + } + } + + pthread_mutex_lock(&ptr->clientsMutex); + free(ptr->clientsArrPtr); + ptr->clientsArrPtr = NULL; + ptr->maxClients = 0; + pthread_mutex_unlock(&ptr->clientsMutex); } -void TcpServer_Send(tcp_server_t* ptr, tcp_connection_t* cli, void* data, size_t len) { - if (ptr && cli && data && len > 0) { - size_t sent = 0; - while (sent < len) { - // Ensure that all data is sent. TCP can split sends. - ssize_t n = send(cli->clientFd, (unsigned char*)data + sent, len - sent, 0); - if (n < 0) { - perror("tcpserver - send error"); - break; - } - sent += n; - } +int TcpServer_Send(tcp_server_t* ptr, tcp_connection_t* cli, const void* data, size_t len) { + if (!ptr || !cli || !data || len == 0) { + return -1; } + + return TcpConnection_SendFramed(cli, data, len); } -void Generic_SendSocket(int sock, void* data, size_t len) { - if (sock > 0 && data && len > 0) { - size_t sent = 0; - while (sent < len) { - ssize_t n = send(sock, (unsigned char*)data + sent, len - sent, 0); - if (n < 0) { - perror("generic - send socket error"); - break; - } - sent += n; - } - } +void Generic_SendSocket(int sock, const void* data, size_t len) { + (void)TcpConnection_SendRaw(sock, data, len); } void TcpServer_Disconnect(tcp_server_t* ptr, tcp_connection_t* cli) { - if (ptr && cli) { - close(cli->clientFd); - pthread_cancel(cli->clientThread); + if (!ptr || !cli) { + return; + } - size_t idx = Generic_FindClientInArrayByPtr(ptr->clientsArrPtr, cli, ptr->clients); - if (idx != SIZE_MAX) { - if (ptr->clientsArrPtr[idx]) { - free(ptr->clientsArrPtr[idx]); - } - ptr->clientsArrPtr[idx] = NULL; - } else { - perror("tcpserver - didn't find client to disconnect in array!"); - } + TcpConnection_RequestClose(cli); + + if (!pthread_equal(cli->ioThread, pthread_self())) { + pthread_join(cli->ioThread, NULL); } } void TcpServer_KillClient(tcp_server_t* ptr, tcp_connection_t* cli) { - if (ptr && cli) { - // RST the connection - struct linger so_linger; - so_linger.l_onoff = 1; - so_linger.l_linger = 0; - setsockopt(cli->clientFd, SOL_SOCKET, SO_LINGER, &so_linger, sizeof(so_linger)); - close(cli->clientFd); - pthread_cancel(cli->clientThread); - - size_t idx = Generic_FindClientInArrayByPtr(ptr->clientsArrPtr, cli, ptr->clients); - if (idx != SIZE_MAX) { - if (ptr->clientsArrPtr[idx]) { - free(ptr->clientsArrPtr[idx]); - } - ptr->clientsArrPtr[idx] = NULL; - } else { - perror("tcpserver - didn't find client to kill in array!"); - } + if (!ptr || !cli) { + return; } + + struct linger so_linger; + so_linger.l_onoff = 1; + so_linger.l_linger = 0; + setsockopt(cli->sockFd, SOL_SOCKET, SO_LINGER, &so_linger, sizeof(so_linger)); + + TcpServer_Disconnect(ptr, cli); } size_t Generic_FindClientInArrayByPtr(tcp_connection_t** arr, tcp_connection_t* ptr, size_t len) { - for (size_t i = 0; i < len; i++) { + if (!arr || !ptr) { + return SIZE_MAX; + } + + for (size_t i = 0; i < len; ++i) { if (arr[i] == ptr) { return i; } } - return SIZE_MAX; // Returns max unsigned, likely improbable to be correct + return SIZE_MAX; } #endif