diff --git a/include/tcpd/tcpclient.h b/include/tcpd/tcpclient.h index b7c2138..7ceac82 100644 --- a/include/tcpd/tcpclient.h +++ b/include/tcpd/tcpclient.h @@ -3,6 +3,7 @@ #include #include +#include #include #include @@ -13,6 +14,7 @@ typedef struct { void (*on_data)(tcp_connection_t* conn); void (*on_disconnect)(tcp_connection_t* conn); void* owner; + uint64_t peerBlockHeight; } tcp_client_t; int TcpClient_Init(tcp_client_t* client); diff --git a/src/main.c b/src/main.c index 040c090..e977c3b 100644 --- a/src/main.c +++ b/src/main.c @@ -10,6 +10,7 @@ #include #include #include +#include #include @@ -695,6 +696,66 @@ int main(int argc, char* argv[]) { continue; } + if (strcmp(cmd, "sync") == 0) { + // Pick outbound peer with highest advertised height + if (!node) { + printf("no node available\n"); + continue; + } + + int bestIdx = -1; + uint64_t bestHeight = 0; + for (size_t i = 0; i < MAX_CONS; ++i) { + if (node->outboundClients[i].connection) { + if (node->outboundClients[i].peerBlockHeight > bestHeight) { + bestHeight = node->outboundClients[i].peerBlockHeight; + bestIdx = (int)i; + } + } + } + + if (bestIdx < 0) { + printf("no outbound peers to sync from\n"); + continue; + } + + uint64_t localHeight = (uint64_t)Chain_Size(chain); + if (bestHeight <= localHeight) { + printf("already synced (local=%" PRIu64 ", peer=%" PRIu64 ")\n", localHeight, bestHeight); + continue; + } + + printf("syncing from peer %d: peerHeight=%" PRIu64 " local=%" PRIu64 "\n", bestIdx, bestHeight, localHeight); + + tcp_connection_t* peerConn = node->outboundClients[bestIdx].connection; + for (uint64_t h = localHeight; h < bestHeight; ++h) { + uint64_t req = h; + if (Node_SendPacket(node, peerConn, PACKET_TYPE_FETCH_BLOCK, &req, sizeof(req)) != 0) { + printf("failed to send FETCH_BLOCK for %" PRIu64 "\n", req); + break; + } + + // Wait up to 5 seconds for the block to be applied (Chain_Size to increase) + const int timeoutMs = 5000; + const int pollIntervalMs = 100; + int waited = 0; + uint64_t startSize = (uint64_t)Chain_Size(chain); + while ((uint64_t)Chain_Size(chain) == startSize && waited < timeoutMs) { + usleep(pollIntervalMs * 1000); + waited += pollIntervalMs; + } + + if ((uint64_t)Chain_Size(chain) == startSize) { + printf("timed out waiting for block %" PRIu64 "\n", req); + break; + } + printf("fetched block %" PRIu64 "\n", req); + } + + printf("sync complete: localHeight=%zu\n", Chain_Size(chain)); + continue; + } + if (strcmp(cmd, "blockdetail") == 0) { char* blockNumberStr = strtok(NULL, " \t"); char* extra = strtok(NULL, " \t"); diff --git a/src/nets/net_node.c b/src/nets/net_node.c index 8cd1fc6..fbae2db 100644 --- a/src/nets/net_node.c +++ b/src/nets/net_node.c @@ -5,6 +5,8 @@ #include #include +#include +#include static net_node_t* Node_FromConnection(tcp_connection_t* conn) { if (!conn) { @@ -38,6 +40,78 @@ static int Node_DecodePacket(const tcp_connection_t* conn, packet_type_t* outTyp return 0; } +static bool Node_ParseAndAcceptBlock(const unsigned char* payload, size_t payloadLen, bool persist) { + if (!payload) { return false; } + + size_t offset = 0; + if (payloadLen < sizeof(uint64_t) + sizeof(block_header_t) + sizeof(uint64_t)) { return false; } + + uint64_t blockHeight = 0; + memcpy(&blockHeight, payload + offset, sizeof(blockHeight)); + offset += sizeof(blockHeight); + + block_t* blk = (block_t*)calloc(1, sizeof(block_t)); + if (!blk) { return false; } + + memcpy(&blk->header, payload + offset, sizeof(blk->header)); + offset += sizeof(blk->header); + + uint64_t txCount = 0; + memcpy(&txCount, payload + offset, sizeof(txCount)); + offset += sizeof(txCount); + + blk->transactions = DYNARR_CREATE(signed_transaction_t, txCount == 0 ? 1 : (size_t)txCount); + if (!blk->transactions) { free(blk); return false; } + + for (uint64_t i = 0; i < txCount; ++i) { + if (offset + sizeof(signed_transaction_t) > payloadLen) { + DynArr_destroy(blk->transactions); + free(blk); + return false; + } + signed_transaction_t tx; + memcpy(&tx, payload + offset, sizeof(tx)); + offset += sizeof(tx); + if (!DynArr_push_back(blk->transactions, &tx)) { + DynArr_destroy(blk->transactions); + free(blk); + return false; + } + } + + // Validate block + if (!Block_IsFullyValid(blk)) { + DynArr_destroy(blk->transactions); + free(blk); + return false; + } + + if (!currentChain) { + DynArr_destroy(blk->transactions); + free(blk); + return false; + } + + if (!Chain_AddBlock(currentChain, blk)) { + // Chain_AddBlock failed; cleanup + if (blk->transactions) { + DynArr_destroy(blk->transactions); + } + free(blk); + return false; + } + + // Persist on accept if requested + if (persist) { + Chain_SaveToFile(currentChain, chainDataDir, currentSupply, currentReward); + BalanceSheet_SaveToFile(chainDataDir); + } + + // Chain_AddBlock copied the block into the chain; free our temporary wrapper but do NOT destroy transactions (they are freed by Chain_SaveToFile when persisted) + free(blk); + return true; +} + static void Node_ForwardConnect(net_node_t* node, tcp_connection_t* conn) { if (node && node->on_connect) { node->on_connect(conn, node->callbackUser); @@ -323,14 +397,13 @@ void Node_Server_OnData(tcp_connection_t* client) { return; } case PACKET_TYPE_BROADCAST_BLOCK: { - // Server cannot receive these either. - printf("Received unexpected BROADCAST_BLOCK packet from node %u\n", client ? client->connectionId : 0U); - - // Send the error and kill the connection - const char* msg = "You can't send me BROADCAST_BLOCK! I'm a server!"; - Node_SendPacket(Node_FromConnection(client), client, PACKET_TYPE_ERROR, msg, strlen(msg)); - TcpConnection_RequestClose(client); - return; + // Accept broadcast blocks from peers and try to append + if (Node_ParseAndAcceptBlock(payload, payloadLen, true)) { + printf("Accepted BROADCAST_BLOCK from node %u\n", client ? client->connectionId : 0U); + } else { + printf("Rejected BROADCAST_BLOCK from node %u\n", client ? client->connectionId : 0U); + } + break; } case PACKET_TYPE_ACK_BLOCK: case PACKET_TYPE_BROADCAST_TX: @@ -416,7 +489,18 @@ void Node_Client_OnData(tcp_connection_t* client) { memcpy(&protoVersion, payload, sizeof(protoVersion)); memcpy(&blockHeight, payload + sizeof(protoVersion), sizeof(blockHeight)); - printf("Received ACK_HELLO from node %u with protoVersion %u and blockHeight %lu\n", client ? client->connectionId : 0U, protoVersion, (unsigned long)blockHeight); + printf("Received ACK_HELLO from node %u with protoVersion %u and blockHeight %" PRIu64 "\n", client ? client->connectionId : 0U, protoVersion, blockHeight); + + // Store peer-advertised height on matching outbound client + net_node_t* node = Node_FromConnection(client); + if (node) { + for (size_t i = 0; i < MAX_CONS; ++i) { + if (node->outboundClients[i].connection == client) { + node->outboundClients[i].peerBlockHeight = blockHeight; + break; + } + } + } break; } case PACKET_TYPE_FETCH_BLOCK: { @@ -430,61 +514,20 @@ void Node_Client_OnData(tcp_connection_t* client) { return; } case PACKET_TYPE_BLOCK_DATA: { - // TODO + if (Node_ParseAndAcceptBlock(payload, payloadLen, true)) { + printf("Accepted BLOCK_DATA from node %u\n", client ? client->connectionId : 0U); + } else { + printf("Rejected BLOCK_DATA from node %u\n", client ? client->connectionId : 0U); + } break; } case PACKET_TYPE_BROADCAST_BLOCK: { - // TODO: Handle based on the current block height of the node; if for n + 1, ignore it for now, since we're probably syncing. - // If higher than n + 1, request missing blocks. - // We just assume n + 1 for now. - - // Decode - [1 byte packet type][8 byte block height][block header][8 byte transation count][remaining bytes block data]; TODO: This is just for v1 transactions right now. - if (payloadLen < 1 + sizeof(uint64_t) + sizeof(uint64_t)) { - return; - } - - char* ptr = (char*)payload; - ptr += 1; // skip packet type, we already know it - - uint64_t blockHeight; - memcpy(&blockHeight, ptr, sizeof(blockHeight)); - ptr += sizeof(blockHeight); - - block_t blockHeader; - memcpy(&blockHeader, ptr, sizeof(blockHeader)); - ptr += sizeof(blockHeader); - - uint64_t txCount; - memcpy(&txCount, ptr, sizeof(txCount)); - ptr += sizeof(txCount); - - DynArr* transactions = DYNARR_CREATE(sizeof(signed_transaction_t), 0); - for (uint64_t i = 0; i < txCount; ++i) { - if (ptr + sizeof(signed_transaction_t) > (char*)payload + payloadLen) { - // Malformed packet - TODO: Error the sender - DynArr_destroy(transactions); - return; - } - signed_transaction_t tx; - memcpy(&tx, ptr, sizeof(tx)); - ptr += sizeof(tx); - DynArr_push_back(transactions, &tx); - } - - blockHeader.transactions = transactions; - - // Verify - if (!Block_IsFullyValid(&blockHeader)) { - // Invalid block - DynArr_destroy(transactions); - return; - } - - // Push to chain - TODO: Handle orphans, reorgs, etc. - if (currentChain) { - Chain_AddBlock(currentChain, &blockHeader); - Chain_SaveToFile(currentChain, chainDataDir, currentSupply, currentReward); // Note: this destroy the transactions inside the block automatically, so we don't have to worry about that here. + if (Node_ParseAndAcceptBlock(payload, payloadLen, true)) { + printf("Accepted BROADCAST_BLOCK from node %u\n", client ? client->connectionId : 0U); + } else { + printf("Rejected BROADCAST_BLOCK from node %u\n", client ? client->connectionId : 0U); } + break; } case PACKET_TYPE_ACK_BLOCK: case PACKET_TYPE_BROADCAST_TX: diff --git a/src/tcpd/tcpclient.c b/src/tcpd/tcpclient.c index 6841e92..fdaa59f 100644 --- a/src/tcpd/tcpclient.c +++ b/src/tcpd/tcpclient.c @@ -51,6 +51,7 @@ int TcpClient_Init(tcp_client_t* client) { memset(client, 0, sizeof(*client)); client->connection = NULL; + client->peerBlockHeight = 0; return 0; }