reorgs, fetch batching (parallel fetch), orphans

This commit is contained in:
2026-05-15 13:01:27 +02:00
parent ad339dc696
commit 3337ac85ab
14 changed files with 827 additions and 133 deletions

View File

@@ -7,6 +7,10 @@
#include <runtime_state.h>
#include <balance_sheet.h>
#include <inttypes.h>
#include <nets/orphan_pool.h>
#include <inttypes.h>
#include <pthread.h>
#include <unistd.h>
static net_node_t* Node_FromConnection(tcp_connection_t* conn) {
if (!conn) {
@@ -24,6 +28,23 @@ static uint64_t Node_GetCurrentBlockHeight(void) {
return currentBlockHeight;
}
static void* Node_MaintenanceThread(void* arg) {
net_node_t* n = (net_node_t*)arg;
if (!n) return NULL;
while (n->maintenanceRunning) {
if (currentChain) {
size_t attached = OrphanPool_AttemptAttach(currentChain);
if (attached > 0) {
printf("Maintenance: attached %zu orphan(s)\n", attached);
Chain_SaveToFile(currentChain, chainDataDir, currentSupply, currentReward);
BalanceSheet_SaveToFile(chainDataDir);
}
}
usleep((useconds_t)(n->maintenanceIntervalMs * 1000));
}
return NULL;
}
static int Node_DecodePacket(const tcp_connection_t* conn, packet_type_t* outType, const unsigned char** outPayload, size_t* outPayloadLen) {
if (!conn || !outType || !outPayload || !outPayloadLen || conn->dataBufLen < 1 || !conn->dataBuf) {
return -1;
@@ -92,6 +113,19 @@ static bool Node_ParseAndAcceptBlock(const unsigned char* payload, size_t payloa
return false;
}
// If parent is missing, insert into orphan pool instead of rejecting immediately.
if (blk->header.blockNumber > 0) {
uint64_t parentIndex = blk->header.blockNumber - 1;
block_t* parentCopy = NULL;
if (parentIndex >= Chain_Size(currentChain) || !Chain_GetBlockCopy(currentChain, (size_t)parentIndex, &parentCopy) || !parentCopy) {
// Insert into orphan pool and take ownership of blk
OrphanPool_Insert(blk, blockHeight);
if (parentCopy) Block_Destroy(parentCopy);
return false;
}
Block_Destroy(parentCopy);
}
if (!Chain_AddBlock(currentChain, blk)) {
// Chain_AddBlock failed; cleanup
if (blk->transactions) {
@@ -109,6 +143,14 @@ static bool Node_ParseAndAcceptBlock(const unsigned char* payload, size_t payloa
// Chain_AddBlock copied the block into the chain; free our temporary wrapper but do NOT destroy transactions (they are freed by Chain_SaveToFile when persisted)
free(blk);
// Attempt to attach any orphans that may now have their parents present.
size_t attached = OrphanPool_AttemptAttach(currentChain);
if (attached > 0) {
printf("Attached %zu orphan(s) after accepting block\n", attached);
// Persist after attaching orphans
Chain_SaveToFile(currentChain, chainDataDir, currentSupply, currentReward);
BalanceSheet_SaveToFile(chainDataDir);
}
return true;
}
@@ -160,6 +202,16 @@ net_node_t* Node_Create() {
TcpServer_Start(node->server, MAX_CONS);
OrphanPool_Init();
// Start maintenance thread
node->maintenanceRunning = 1;
node->maintenanceIntervalMs = 1000; // 1s
if (pthread_create(&node->maintenanceThread, NULL, Node_MaintenanceThread, node) != 0) {
// Failed to start maintenance thread; continue without it
node->maintenanceRunning = 0;
}
return node;
}
@@ -178,6 +230,14 @@ void Node_Destroy(net_node_t* node) {
TcpServer_Destroy(node->server);
}
// Stop maintenance thread
if (node->maintenanceRunning) {
node->maintenanceRunning = 0;
pthread_join(node->maintenanceThread, NULL);
}
OrphanPool_Destroy();
free(node);
}
@@ -346,45 +406,53 @@ void Node_Server_OnData(tcp_connection_t* client) {
return;
}
// Find the block
block_t* block = Chain_GetBlock(currentChain, (size_t)requestedHeight);
if (!block) {
printf("Requested block height %" PRIu64 " not found, ignoring\n", requestedHeight);
const char* msg = "Requested block not found!";
Node_SendPacket(Node_FromConnection(client), client, PACKET_TYPE_ERROR, msg, strlen(msg));
return;
}
if (block->transactions == NULL) {
// Just reload from disk pure
Chain_LoadBlockFromFile(chainDataDir, requestedHeight - 1, true, &block, NULL); // blockNumber = height - 1 because of 0-indexing
// Find the block (deep-copy it for safe access)
block_t* block = NULL;
if (!Chain_GetBlockCopy(currentChain, (size_t)requestedHeight, &block) || !block) {
// Try loading from disk
if (!Chain_LoadBlockFromFile(chainDataDir, requestedHeight, true, &block, NULL) || !block) {
printf("Requested block height %" PRIu64 " not found, ignoring\n", requestedHeight);
const char* msg = "Requested block not found!";
Node_SendPacket(Node_FromConnection(client), client, PACKET_TYPE_ERROR, msg, strlen(msg));
return;
}
}
// Serialize into a BLOCK_DATA packet [block header][tx count - 8 bytes][transactions...]
size_t blockDataSize = sizeof(block_header_t) + sizeof(uint64_t) + (block->transactions ? block->transactions->size * sizeof(signed_transaction_t) : 0);
size_t txCount = block->transactions ? DynArr_size(block->transactions) : 0;
size_t blockDataSize = sizeof(uint64_t) + sizeof(block_header_t) + sizeof(uint64_t) + (txCount * sizeof(signed_transaction_t));
unsigned char* blockData = (unsigned char*)malloc(blockDataSize);
if (!blockData) {
// Generic error response
printf("Failed to allocate memory for block data response to node %u\n", client ? client->connectionId : 0U);
const char* msg = "Generic error for block data!";
Node_SendPacket(Node_FromConnection(client), client, PACKET_TYPE_ERROR, msg, strlen(msg));
Block_Destroy(block);
return;
}
size_t offset = 0;
// Write height first
uint64_t heightLE = requestedHeight;
memcpy(blockData + offset, &heightLE, sizeof(heightLE));
offset += sizeof(heightLE);
memcpy(blockData + offset, &block->header, sizeof(block_header_t));
offset += sizeof(block_header_t);
uint64_t txCount = block->transactions ? (uint64_t)block->transactions->size : 0;
memcpy(blockData + offset, &txCount, sizeof(txCount));
offset += sizeof(txCount);
if (block->transactions && block->transactions->size > 0) {
memcpy(blockData + offset, block->transactions->data, block->transactions->size * sizeof(signed_transaction_t));
offset += block->transactions->size * sizeof(signed_transaction_t);
uint64_t txCount64 = (uint64_t)txCount;
memcpy(blockData + offset, &txCount64, sizeof(txCount64));
offset += sizeof(txCount64);
if (block->transactions && txCount > 0) {
for (size_t ti = 0; ti < txCount; ++ti) {
signed_transaction_t* tx = (signed_transaction_t*)DynArr_at(block->transactions, ti);
memcpy(blockData + offset, tx, sizeof(signed_transaction_t));
offset += sizeof(signed_transaction_t);
}
}
// Send the block data
Node_SendPacket(Node_FromConnection(client), client, PACKET_TYPE_BLOCK_DATA, blockData, offset);
free(blockData);
Block_Destroy(block);
}
case PACKET_TYPE_BLOCK_DATA: {
// Server can't receive these!