This commit is contained in:
2026-05-15 12:23:07 +02:00
parent 361ac73e45
commit ad339dc696
4 changed files with 167 additions and 60 deletions

View File

@@ -3,6 +3,7 @@
#include <arpa/inet.h>
#include <stddef.h>
#include <stdint.h>
#include <constants.h>
#include <tcpd/tcpconnection.h>
@@ -13,6 +14,7 @@ typedef struct {
void (*on_data)(tcp_connection_t* conn);
void (*on_disconnect)(tcp_connection_t* conn);
void* owner;
uint64_t peerBlockHeight;
} tcp_client_t;
int TcpClient_Init(tcp_client_t* client);

View File

@@ -10,6 +10,7 @@
#include <time.h>
#include <signal.h>
#include <balance_sheet.h>
#include <unistd.h>
#include <constants.h>
@@ -695,6 +696,66 @@ int main(int argc, char* argv[]) {
continue;
}
if (strcmp(cmd, "sync") == 0) {
// Pick outbound peer with highest advertised height
if (!node) {
printf("no node available\n");
continue;
}
int bestIdx = -1;
uint64_t bestHeight = 0;
for (size_t i = 0; i < MAX_CONS; ++i) {
if (node->outboundClients[i].connection) {
if (node->outboundClients[i].peerBlockHeight > bestHeight) {
bestHeight = node->outboundClients[i].peerBlockHeight;
bestIdx = (int)i;
}
}
}
if (bestIdx < 0) {
printf("no outbound peers to sync from\n");
continue;
}
uint64_t localHeight = (uint64_t)Chain_Size(chain);
if (bestHeight <= localHeight) {
printf("already synced (local=%" PRIu64 ", peer=%" PRIu64 ")\n", localHeight, bestHeight);
continue;
}
printf("syncing from peer %d: peerHeight=%" PRIu64 " local=%" PRIu64 "\n", bestIdx, bestHeight, localHeight);
tcp_connection_t* peerConn = node->outboundClients[bestIdx].connection;
for (uint64_t h = localHeight; h < bestHeight; ++h) {
uint64_t req = h;
if (Node_SendPacket(node, peerConn, PACKET_TYPE_FETCH_BLOCK, &req, sizeof(req)) != 0) {
printf("failed to send FETCH_BLOCK for %" PRIu64 "\n", req);
break;
}
// Wait up to 5 seconds for the block to be applied (Chain_Size to increase)
const int timeoutMs = 5000;
const int pollIntervalMs = 100;
int waited = 0;
uint64_t startSize = (uint64_t)Chain_Size(chain);
while ((uint64_t)Chain_Size(chain) == startSize && waited < timeoutMs) {
usleep(pollIntervalMs * 1000);
waited += pollIntervalMs;
}
if ((uint64_t)Chain_Size(chain) == startSize) {
printf("timed out waiting for block %" PRIu64 "\n", req);
break;
}
printf("fetched block %" PRIu64 "\n", req);
}
printf("sync complete: localHeight=%zu\n", Chain_Size(chain));
continue;
}
if (strcmp(cmd, "blockdetail") == 0) {
char* blockNumberStr = strtok(NULL, " \t");
char* extra = strtok(NULL, " \t");

View File

@@ -5,6 +5,8 @@
#include <string.h>
#include <runtime_state.h>
#include <balance_sheet.h>
#include <inttypes.h>
static net_node_t* Node_FromConnection(tcp_connection_t* conn) {
if (!conn) {
@@ -38,6 +40,78 @@ static int Node_DecodePacket(const tcp_connection_t* conn, packet_type_t* outTyp
return 0;
}
static bool Node_ParseAndAcceptBlock(const unsigned char* payload, size_t payloadLen, bool persist) {
if (!payload) { return false; }
size_t offset = 0;
if (payloadLen < sizeof(uint64_t) + sizeof(block_header_t) + sizeof(uint64_t)) { return false; }
uint64_t blockHeight = 0;
memcpy(&blockHeight, payload + offset, sizeof(blockHeight));
offset += sizeof(blockHeight);
block_t* blk = (block_t*)calloc(1, sizeof(block_t));
if (!blk) { return false; }
memcpy(&blk->header, payload + offset, sizeof(blk->header));
offset += sizeof(blk->header);
uint64_t txCount = 0;
memcpy(&txCount, payload + offset, sizeof(txCount));
offset += sizeof(txCount);
blk->transactions = DYNARR_CREATE(signed_transaction_t, txCount == 0 ? 1 : (size_t)txCount);
if (!blk->transactions) { free(blk); return false; }
for (uint64_t i = 0; i < txCount; ++i) {
if (offset + sizeof(signed_transaction_t) > payloadLen) {
DynArr_destroy(blk->transactions);
free(blk);
return false;
}
signed_transaction_t tx;
memcpy(&tx, payload + offset, sizeof(tx));
offset += sizeof(tx);
if (!DynArr_push_back(blk->transactions, &tx)) {
DynArr_destroy(blk->transactions);
free(blk);
return false;
}
}
// Validate block
if (!Block_IsFullyValid(blk)) {
DynArr_destroy(blk->transactions);
free(blk);
return false;
}
if (!currentChain) {
DynArr_destroy(blk->transactions);
free(blk);
return false;
}
if (!Chain_AddBlock(currentChain, blk)) {
// Chain_AddBlock failed; cleanup
if (blk->transactions) {
DynArr_destroy(blk->transactions);
}
free(blk);
return false;
}
// Persist on accept if requested
if (persist) {
Chain_SaveToFile(currentChain, chainDataDir, currentSupply, currentReward);
BalanceSheet_SaveToFile(chainDataDir);
}
// Chain_AddBlock copied the block into the chain; free our temporary wrapper but do NOT destroy transactions (they are freed by Chain_SaveToFile when persisted)
free(blk);
return true;
}
static void Node_ForwardConnect(net_node_t* node, tcp_connection_t* conn) {
if (node && node->on_connect) {
node->on_connect(conn, node->callbackUser);
@@ -323,14 +397,13 @@ void Node_Server_OnData(tcp_connection_t* client) {
return;
}
case PACKET_TYPE_BROADCAST_BLOCK: {
// Server cannot receive these either.
printf("Received unexpected BROADCAST_BLOCK packet from node %u\n", client ? client->connectionId : 0U);
// Send the error and kill the connection
const char* msg = "You can't send me BROADCAST_BLOCK! I'm a server!";
Node_SendPacket(Node_FromConnection(client), client, PACKET_TYPE_ERROR, msg, strlen(msg));
TcpConnection_RequestClose(client);
return;
// Accept broadcast blocks from peers and try to append
if (Node_ParseAndAcceptBlock(payload, payloadLen, true)) {
printf("Accepted BROADCAST_BLOCK from node %u\n", client ? client->connectionId : 0U);
} else {
printf("Rejected BROADCAST_BLOCK from node %u\n", client ? client->connectionId : 0U);
}
break;
}
case PACKET_TYPE_ACK_BLOCK:
case PACKET_TYPE_BROADCAST_TX:
@@ -416,7 +489,18 @@ void Node_Client_OnData(tcp_connection_t* client) {
memcpy(&protoVersion, payload, sizeof(protoVersion));
memcpy(&blockHeight, payload + sizeof(protoVersion), sizeof(blockHeight));
printf("Received ACK_HELLO from node %u with protoVersion %u and blockHeight %lu\n", client ? client->connectionId : 0U, protoVersion, (unsigned long)blockHeight);
printf("Received ACK_HELLO from node %u with protoVersion %u and blockHeight %" PRIu64 "\n", client ? client->connectionId : 0U, protoVersion, blockHeight);
// Store peer-advertised height on matching outbound client
net_node_t* node = Node_FromConnection(client);
if (node) {
for (size_t i = 0; i < MAX_CONS; ++i) {
if (node->outboundClients[i].connection == client) {
node->outboundClients[i].peerBlockHeight = blockHeight;
break;
}
}
}
break;
}
case PACKET_TYPE_FETCH_BLOCK: {
@@ -430,61 +514,20 @@ void Node_Client_OnData(tcp_connection_t* client) {
return;
}
case PACKET_TYPE_BLOCK_DATA: {
// TODO
if (Node_ParseAndAcceptBlock(payload, payloadLen, true)) {
printf("Accepted BLOCK_DATA from node %u\n", client ? client->connectionId : 0U);
} else {
printf("Rejected BLOCK_DATA from node %u\n", client ? client->connectionId : 0U);
}
break;
}
case PACKET_TYPE_BROADCAST_BLOCK: {
// TODO: Handle based on the current block height of the node; if for n + 1, ignore it for now, since we're probably syncing.
// If higher than n + 1, request missing blocks.
// We just assume n + 1 for now.
// Decode - [1 byte packet type][8 byte block height][block header][8 byte transation count][remaining bytes block data]; TODO: This is just for v1 transactions right now.
if (payloadLen < 1 + sizeof(uint64_t) + sizeof(uint64_t)) {
return;
}
char* ptr = (char*)payload;
ptr += 1; // skip packet type, we already know it
uint64_t blockHeight;
memcpy(&blockHeight, ptr, sizeof(blockHeight));
ptr += sizeof(blockHeight);
block_t blockHeader;
memcpy(&blockHeader, ptr, sizeof(blockHeader));
ptr += sizeof(blockHeader);
uint64_t txCount;
memcpy(&txCount, ptr, sizeof(txCount));
ptr += sizeof(txCount);
DynArr* transactions = DYNARR_CREATE(sizeof(signed_transaction_t), 0);
for (uint64_t i = 0; i < txCount; ++i) {
if (ptr + sizeof(signed_transaction_t) > (char*)payload + payloadLen) {
// Malformed packet - TODO: Error the sender
DynArr_destroy(transactions);
return;
}
signed_transaction_t tx;
memcpy(&tx, ptr, sizeof(tx));
ptr += sizeof(tx);
DynArr_push_back(transactions, &tx);
}
blockHeader.transactions = transactions;
// Verify
if (!Block_IsFullyValid(&blockHeader)) {
// Invalid block
DynArr_destroy(transactions);
return;
}
// Push to chain - TODO: Handle orphans, reorgs, etc.
if (currentChain) {
Chain_AddBlock(currentChain, &blockHeader);
Chain_SaveToFile(currentChain, chainDataDir, currentSupply, currentReward); // Note: this destroy the transactions inside the block automatically, so we don't have to worry about that here.
if (Node_ParseAndAcceptBlock(payload, payloadLen, true)) {
printf("Accepted BROADCAST_BLOCK from node %u\n", client ? client->connectionId : 0U);
} else {
printf("Rejected BROADCAST_BLOCK from node %u\n", client ? client->connectionId : 0U);
}
break;
}
case PACKET_TYPE_ACK_BLOCK:
case PACKET_TYPE_BROADCAST_TX:

View File

@@ -51,6 +51,7 @@ int TcpClient_Init(tcp_client_t* client) {
memset(client, 0, sizeof(*client));
client->connection = NULL;
client->peerBlockHeight = 0;
return 0;
}