diff --git a/include/block/block.h b/include/block/block.h index 0764016..6978a66 100644 --- a/include/block/block.h +++ b/include/block/block.h @@ -40,5 +40,7 @@ bool Block_IsFullyValid(const block_t* block); void Block_ShutdownPowContext(void); void Block_Destroy(block_t* block); void Block_Print(const block_t* block); +// Deep-copy a block (allocates a new `block_t*`). Caller must call `Block_Destroy`. +block_t* Block_Copy(const block_t* src); #endif diff --git a/include/block/chain.h b/include/block/chain.h index a066a4f..caf9b89 100644 --- a/include/block/chain.h +++ b/include/block/chain.h @@ -24,6 +24,13 @@ size_t Chain_Size(blockchain_t* chain); bool Chain_IsValid(blockchain_t* chain); void Chain_Wipe(blockchain_t* chain); +// Roll back the chain to `height` (exclusive): after this call, Chain_Size(chain) == height +// Returns true on success. +bool Chain_RollbackToHeight(blockchain_t* chain, size_t height); + +// Retrieve a deep copy of the block at `index`. Caller must free with `Block_Destroy`. +bool Chain_GetBlockCopy(blockchain_t* chain, size_t index, block_t** outCopy); + // I/O bool Chain_SaveToFile(blockchain_t* chain, const char* dirpath, uint256_t currentSupply, uint64_t currentReward); bool Chain_LoadFromFile(blockchain_t* chain, const char* dirpath, uint256_t* outCurrentSupply, uint32_t* outDifficultyTarget, uint64_t* outCurrentReward, uint8_t* outLastSavedHash, bool loadTransactions); diff --git a/include/constants.h b/include/constants.h index 84cce4c..dccd9e8 100644 --- a/include/constants.h +++ b/include/constants.h @@ -23,6 +23,22 @@ //#define INITIAL_DIFFICULTY 0x1f0c1422 // Default compact target used by Autolykos2 PoW (This is ridiculously low) #define INITIAL_DIFFICULTY 0x1f1b7c51 // This takes 90s on my machine with a single thread, good for testing +// Sync / Reorg tuning constants +// Timeouts and retry/backoff behavior for block fetches during sync (milliseconds) +static const uint64_t SYNC_REQUEST_TIMEOUT_MS = 5000ULL; // 5s +static const int MAX_SYNC_RETRIES = 4; // retry attempts per block fetch +static const uint64_t SYNC_BACKOFF_BASE_MS = 200ULL; // base backoff in ms (exponential) +// Parallelism +static const int MAX_PARALLEL_FETCHES = 8; // concurrent block fetches during windowed sync +// Heuristic: if peer is this many blocks ahead, treat as initial sync +static const uint64_t INITIAL_SYNC_HEIGHT_DIFF = 50ULL; + +// Reorg penalty configuration (used to penalize peers reporting higher heights but with delayed work) +static const uint64_t REORG_PENALTY_GRACE_BLOCKS = 3ULL; // allow small reorgs without penalty +static const double REORG_PENALTY_FACTOR = 1.0; // base scaling factor (theta) +static const double REORG_PENALTY_EXPONENT = 2.0; // exponent p in penalty ~ B^p +static const double REORG_PENALTY_REF_BLOCK_TIME = 150.0; // reference block time in seconds used by original scheme + // Reward schedule acceleration: 1 means normal-speed progression. #define EMISSION_ACCELERATION_FACTOR 1ULL @@ -166,10 +182,16 @@ static inline size_t CalculateTargetDAGSize(blockchain_t* chain) { } // Get the height - EPOCH_LENGTH block and the last block; - block_t* lastBlock = Chain_GetBlock(chain, Chain_Size(chain) - 1); - block_t* epochStartBlock = Chain_GetBlock(chain, (size_t)(Chain_Size(chain) - 1 - EPOCH_LENGTH)); - if (!lastBlock || !epochStartBlock) { - return 0; // Invalid + block_t* lastBlock = NULL; + block_t* epochStartBlock = NULL; + if (!Chain_GetBlockCopy(chain, Chain_Size(chain) - 1, &lastBlock) || !lastBlock) { + if (lastBlock) Block_Destroy(lastBlock); + return 0; + } + if (!Chain_GetBlockCopy(chain, (size_t)(Chain_Size(chain) - 1 - EPOCH_LENGTH), &epochStartBlock) || !epochStartBlock) { + Block_Destroy(lastBlock); + if (epochStartBlock) Block_Destroy(epochStartBlock); + return 0; } int64_t difficultyDelta = (int64_t)epochStartBlock->header.difficultyTarget - (int64_t)lastBlock->header.difficultyTarget; @@ -190,10 +212,15 @@ static inline size_t CalculateTargetDAGSize(blockchain_t* chain) { int64_t targetSize = (int64_t)DAG_BASE_SIZE + growth; if (targetSize <= 0) { + Block_Destroy(lastBlock); + Block_Destroy(epochStartBlock); return 0; } - return (size_t)targetSize; + size_t out = (size_t)targetSize; + Block_Destroy(lastBlock); + Block_Destroy(epochStartBlock); + return out; } static inline void GetNextDAGSeed(blockchain_t* chain, uint8_t outSeed[32]) { @@ -205,13 +232,15 @@ static inline void GetNextDAGSeed(blockchain_t* chain, uint8_t outSeed[32]) { return; } - block_t* prevBlock = Chain_GetBlock(chain, Chain_Size(chain) - 1); - if (!prevBlock) { + block_t* prevBlock = NULL; + if (!Chain_GetBlockCopy(chain, Chain_Size(chain) - 1, &prevBlock) || !prevBlock) { memset(outSeed, 0x00, 32); // Fallback to zeroes if we can't get the previous block for some reason; The caller should treat this as an error if height >= EPOCH_LENGTH + if (prevBlock) Block_Destroy(prevBlock); return; } Block_CalculateHash(prevBlock, outSeed); + Block_Destroy(prevBlock); } #endif diff --git a/include/nets/fetch_scheduler.h b/include/nets/fetch_scheduler.h new file mode 100644 index 0000000..13bd215 --- /dev/null +++ b/include/nets/fetch_scheduler.h @@ -0,0 +1,10 @@ +#ifndef FETCH_SCHEDULER_H +#define FETCH_SCHEDULER_H + +#include + +// Compute penalty in blocks for a delayed/heavy reorg reported by a peer. +// Returns the number of penalty blocks to subtract from the peer's advertised work. +uint64_t FetchScheduler_ComputeReorgPenaltyBlocks(uint64_t delayBlocks); + +#endif diff --git a/include/nets/net_node.h b/include/nets/net_node.h index 8e13706..d98309c 100644 --- a/include/nets/net_node.h +++ b/include/nets/net_node.h @@ -15,6 +15,8 @@ #include +#include + #include #include #include @@ -27,6 +29,10 @@ typedef struct { void (*on_data)(tcp_connection_t* conn, const unsigned char* data, size_t len, void* user); void (*on_disconnect)(tcp_connection_t* conn, void* user); void* callbackUser; + // Maintenance thread for periodic tasks (orphan attach, pruning, metrics) + pthread_t maintenanceThread; + volatile int maintenanceRunning; + int maintenanceIntervalMs; } net_node_t; net_node_t* Node_Create(); diff --git a/include/nets/orphan_pool.h b/include/nets/orphan_pool.h new file mode 100644 index 0000000..d609e0f --- /dev/null +++ b/include/nets/orphan_pool.h @@ -0,0 +1,20 @@ +#ifndef ORPHAN_POOL_H +#define ORPHAN_POOL_H + +#include +#include +#include + +// Initialize/destroy the global orphan pool +void OrphanPool_Init(void); +void OrphanPool_Destroy(void); + +// Insert an orphan block into the pool. Ownership of `block` is transferred to the pool. +// `height` is the block number from the header. +void OrphanPool_Insert(block_t* block, uint64_t height); + +// Attempt to attach any orphans whose parents now exist in `chain`. +// Returns the number of blocks successfully attached. +size_t OrphanPool_AttemptAttach(blockchain_t* chain); + +#endif diff --git a/include/runtime_state.h b/include/runtime_state.h index db9decc..198c8ff 100644 --- a/include/runtime_state.h +++ b/include/runtime_state.h @@ -6,6 +6,7 @@ #include #include +#include extern uint64_t currentBlockHeight; extern blockchain_t* currentChain; @@ -14,4 +15,8 @@ extern uint64_t currentReward; extern uint32_t difficultyTarget; extern const char* chainDataDir; +// Global synchronization primitives for runtime state +extern pthread_rwlock_t chainLock; // protects chain structure and related mutations +extern pthread_mutex_t balanceSheetLock; // protects balance sheet map + #endif diff --git a/src/balance_sheet.c b/src/balance_sheet.c index d5b9532..6eb724d 100644 --- a/src/balance_sheet.c +++ b/src/balance_sheet.c @@ -1,14 +1,19 @@ #include +#include khash_t(balance_sheet_map_m)* sheetMap = NULL; +static pthread_mutex_t g_sheetLock; void BalanceSheet_Init() { sheetMap = kh_init(balance_sheet_map_m); + pthread_mutex_init(&g_sheetLock, NULL); } int BalanceSheet_Insert(balance_sheet_entry_t entry) { if (!sheetMap) { return -1; } + pthread_mutex_lock(&g_sheetLock); + // Encapsulate key key32_t key; memcpy(key.bytes, entry.address, 32); @@ -21,12 +26,14 @@ int BalanceSheet_Insert(balance_sheet_entry_t entry) { kh_value(sheetMap, k) = entry; + pthread_mutex_unlock(&g_sheetLock); return ret; } bool BalanceSheet_Lookup(uint8_t* address, balance_sheet_entry_t* out) { if (!address || !out) { return false; } - + + pthread_mutex_lock(&g_sheetLock); key32_t key; memcpy(key.bytes, address, 32); @@ -34,20 +41,26 @@ bool BalanceSheet_Lookup(uint8_t* address, balance_sheet_entry_t* out) { if (k != kh_end(sheetMap)) { balance_sheet_entry_t entry = kh_value(sheetMap, k); memcpy(out, &entry, sizeof(balance_sheet_entry_t)); + pthread_mutex_unlock(&g_sheetLock); return true; } + pthread_mutex_unlock(&g_sheetLock); return false; } bool BalanceSheet_SaveToFile(const char* outPath) { if (!sheetMap) { return false; } + pthread_mutex_lock(&g_sheetLock); char outFile[512]; strcpy(outFile, outPath); strcat(outFile, "/balance_sheet.data"); FILE* file = fopen(outFile, "wb"); - if (!file) { return false; } + if (!file) { + pthread_mutex_unlock(&g_sheetLock); + return false; + } khiter_t k; for (k = kh_begin(sheetMap); k != kh_end(sheetMap); ++k) { @@ -55,39 +68,48 @@ bool BalanceSheet_SaveToFile(const char* outPath) { balance_sheet_entry_t entry = kh_val(sheetMap, k); if (fwrite(&entry, sizeof(balance_sheet_entry_t), 1, file) != 1) { fclose(file); + pthread_mutex_unlock(&g_sheetLock); return false; } } } fclose(file); + pthread_mutex_unlock(&g_sheetLock); return true; } bool BalanceSheet_LoadFromFile(const char* inPath) { if (!sheetMap) { return false; } + pthread_mutex_lock(&g_sheetLock); char inFile[512]; strcpy(inFile, inPath); strcat(inFile, "/balance_sheet.data"); FILE* file = fopen(inFile, "rb"); - if (!file) { return false; } + if (!file) { + pthread_mutex_unlock(&g_sheetLock); + return false; + } balance_sheet_entry_t entry; while (fread(&entry, sizeof(balance_sheet_entry_t), 1, file) == 1) { if (BalanceSheet_Insert(entry) < 0) { fclose(file); + pthread_mutex_unlock(&g_sheetLock); return false; } } fclose(file); + pthread_mutex_unlock(&g_sheetLock); return true; } void BalanceSheet_Print() { if (!sheetMap) { return; } + pthread_mutex_lock(&g_sheetLock); // Iterate through every entry khiter_t k; for (k = kh_begin(sheetMap); k != kh_end(sheetMap); ++k) { @@ -107,9 +129,11 @@ void BalanceSheet_Print() { balanceStr); } } + pthread_mutex_unlock(&g_sheetLock); } void BalanceSheet_Destroy() { kh_destroy(balance_sheet_map_m, sheetMap); sheetMap = NULL; + pthread_mutex_destroy(&g_sheetLock); } diff --git a/src/block/block.c b/src/block/block.c index 6bd817e..b45bfb8 100644 --- a/src/block/block.c +++ b/src/block/block.c @@ -285,3 +285,34 @@ void Block_Print(const block_t* block) { printf("No transactions (or none loaded)\n"); } } + +block_t* Block_Copy(const block_t* src) { + if (!src) return NULL; + block_t* dst = (block_t*)malloc(sizeof(block_t)); + if (!dst) return NULL; + dst->header = src->header; + if (src->transactions) { + size_t txCount = DynArr_size(src->transactions); + dst->transactions = DYNARR_CREATE(signed_transaction_t, txCount == 0 ? 1 : txCount); + if (!dst->transactions) { + free(dst); + return NULL; + } + for (size_t i = 0; i < txCount; ++i) { + signed_transaction_t* tx = (signed_transaction_t*)DynArr_at(src->transactions, i); + if (!tx) { + DynArr_destroy(dst->transactions); + free(dst); + return NULL; + } + if (!DynArr_push_back(dst->transactions, tx)) { + DynArr_destroy(dst->transactions); + free(dst); + return NULL; + } + } + } else { + dst->transactions = NULL; + } + return dst; +} diff --git a/src/block/chain.c b/src/block/chain.c index 009190b..c83e035 100644 --- a/src/block/chain.c +++ b/src/block/chain.c @@ -4,6 +4,7 @@ #include #include #include +#include uint64_t currentBlockHeight = 0; @@ -136,6 +137,8 @@ void Chain_Destroy(blockchain_t* chain) { } bool Chain_AddBlock(blockchain_t* chain, block_t* block) { + bool ok = true; + if (!chain || !block || !chain->blocks) { return false; } @@ -144,96 +147,124 @@ bool Chain_AddBlock(blockchain_t* chain, block_t* block) { return false; } - // First pass: ensure all non-coinbase senders can cover the full spend - // (amount1 + amount2 + fee) before mutating the chain or balance sheet. - size_t txCount = DynArr_size(block->transactions); - for (size_t i = 0; i < txCount; ++i) { - signed_transaction_t* tx = (signed_transaction_t*)DynArr_at(block->transactions, i); - if (!tx) { - return false; - } + // Acquire global write locks to protect chain and balance sheet mutations. + pthread_rwlock_wrlock(&chainLock); + pthread_mutex_lock(&balanceSheetLock); - if (Address_IsCoinbase(tx->transaction.senderAddress)) { - continue; - } - - uint256_t spend; - if (!BuildSpendAmount(tx, &spend)) { - return false; - } - - balance_sheet_entry_t senderEntry; - if (!BalanceSheet_Lookup(tx->transaction.senderAddress, &senderEntry)) { - fprintf(stderr, "Error: Sender address not found in balance sheet during block addition. Bailing!\n"); - return false; - } - - if (uint256_cmp(&senderEntry.balance, &spend) < 0) { - fprintf(stderr, "Error: Sender balance insufficient for block transaction. Bailing!\n"); - return false; - } - } - - // Push the block only after validation succeeds. - block_t* blk = (block_t*)DynArr_push_back(chain->blocks, block); - if (!blk) { - return false; - } - chain->size++; - currentBlockHeight = (uint64_t)(chain->size - 1); - - // Second pass: apply the ledger changes. - if (blk->transactions) { - txCount = DynArr_size(blk->transactions); + do { + // First pass: ensure all non-coinbase senders can cover the full spend + // (amount1 + amount2 + fee) before mutating the chain or balance sheet. + size_t txCount = DynArr_size(block->transactions); for (size_t i = 0; i < txCount; ++i) { - signed_transaction_t* tx = (signed_transaction_t*)DynArr_at(blk->transactions, i); + signed_transaction_t* tx = (signed_transaction_t*)DynArr_at(block->transactions, i); if (!tx) { + ok = false; break; + } + + if (Address_IsCoinbase(tx->transaction.senderAddress)) { continue; } - if (!Address_IsCoinbase(tx->transaction.senderAddress)) { - uint256_t spend; - if (!BuildSpendAmount(tx, &spend) || !DebitAddress(tx->transaction.senderAddress, &spend)) { - fprintf(stderr, "Error: Failed to debit sender balance during block addition. Bailing!\n"); - return false; - } + uint256_t spend; + if (!BuildSpendAmount(tx, &spend)) { ok = false; break; } + + balance_sheet_entry_t senderEntry; + if (!BalanceSheet_Lookup(tx->transaction.senderAddress, &senderEntry)) { + fprintf(stderr, "Error: Sender address not found in balance sheet during block addition. Bailing!\n"); + ok = false; break; } - if (!CreditAddress(tx->transaction.recipientAddress1, tx->transaction.amount1)) { - fprintf(stderr, "Error: Failed to credit recipient1 balance during block addition. Bailing!\n"); - return false; + if (uint256_cmp(&senderEntry.balance, &spend) < 0) { + fprintf(stderr, "Error: Sender balance insufficient for block transaction. Bailing!\n"); + ok = false; break; } + } + if (!ok) break; - if (tx->transaction.amount2 > 0) { - uint8_t zeroAddress[32] = {0}; - if (memcmp(tx->transaction.recipientAddress2, zeroAddress, 32) == 0) { - fprintf(stderr, "Error: amount2 is non-zero but recipient2 is empty during block addition. Bailing!\n"); - return false; + // Push the block only after validation succeeds. + block_t* blk = (block_t*)DynArr_push_back(chain->blocks, block); + if (!blk) { ok = false; break; } + chain->size++; + currentBlockHeight = (uint64_t)(chain->size - 1); + + // Second pass: apply the ledger changes. + if (blk->transactions) { + txCount = DynArr_size(blk->transactions); + for (size_t i = 0; i < txCount; ++i) { + signed_transaction_t* tx = (signed_transaction_t*)DynArr_at(blk->transactions, i); + if (!tx) { + continue; } - if (!CreditAddress(tx->transaction.recipientAddress2, tx->transaction.amount2)) { - fprintf(stderr, "Error: Failed to credit recipient2 balance during block addition. Bailing!\n"); - return false; + if (!Address_IsCoinbase(tx->transaction.senderAddress)) { + uint256_t spend; + if (!BuildSpendAmount(tx, &spend) || !DebitAddress(tx->transaction.senderAddress, &spend)) { + fprintf(stderr, "Error: Failed to debit sender balance during block addition. Bailing!\n"); + ok = false; break; + } + } + + if (!CreditAddress(tx->transaction.recipientAddress1, tx->transaction.amount1)) { + fprintf(stderr, "Error: Failed to credit recipient1 balance during block addition. Bailing!\n"); + ok = false; break; + } + + if (tx->transaction.amount2 > 0) { + uint8_t zeroAddress[32] = {0}; + if (memcmp(tx->transaction.recipientAddress2, zeroAddress, 32) == 0) { + fprintf(stderr, "Error: amount2 is non-zero but recipient2 is empty during block addition. Bailing!\n"); + ok = false; break; + } + + if (!CreditAddress(tx->transaction.recipientAddress2, tx->transaction.amount2)) { + fprintf(stderr, "Error: Failed to credit recipient2 balance during block addition. Bailing!\n"); + ok = false; break; + } } } } - } + // ok remains true if no failures + } while (0); - return true; + // Release locks + pthread_mutex_unlock(&balanceSheetLock); + pthread_rwlock_unlock(&chainLock); + + return ok; } block_t* Chain_GetBlock(blockchain_t* chain, size_t index) { - if (chain) { - return DynArr_at(chain->blocks, index); + if (!chain) return NULL; + block_t* blk = NULL; + pthread_rwlock_rdlock(&chainLock); + blk = (block_t*)DynArr_at(chain->blocks, index); + pthread_rwlock_unlock(&chainLock); + return blk; +} + +bool Chain_GetBlockCopy(blockchain_t* chain, size_t index, block_t** outCopy) { + if (!chain || !outCopy) return false; + *outCopy = NULL; + pthread_rwlock_rdlock(&chainLock); + block_t* src = (block_t*)DynArr_at(chain->blocks, index); + if (!src) { + pthread_rwlock_unlock(&chainLock); + return false; } - return NULL; + block_t* copy = Block_Copy(src); + pthread_rwlock_unlock(&chainLock); + if (!copy) return false; + *outCopy = copy; + return true; } size_t Chain_Size(blockchain_t* chain) { - if (chain) { - return DynArr_size(chain->blocks); - } - return 0; + if (!chain) return 0; + size_t sz = 0; + pthread_rwlock_rdlock(&chainLock); + sz = DynArr_size(chain->blocks); + pthread_rwlock_unlock(&chainLock); + return sz; } bool Chain_IsValid(blockchain_t* chain) { @@ -272,6 +303,131 @@ bool Chain_IsValid(blockchain_t* chain) { return true; } +bool Chain_RollbackToHeight(blockchain_t* chain, size_t height) { + if (!chain || !chain->blocks) return false; + + pthread_rwlock_wrlock(&chainLock); + pthread_mutex_lock(&balanceSheetLock); + + size_t cur = DynArr_size(chain->blocks); + if (height >= cur) { + pthread_mutex_unlock(&balanceSheetLock); + pthread_rwlock_unlock(&chainLock); + return true; // nothing to do + } + + // Remove blocks above height + for (size_t i = cur; i > height; --i) { + size_t idx = i - 1; + block_t* blk = (block_t*)DynArr_at(chain->blocks, idx); + if (blk && blk->transactions) { + DynArr_destroy(blk->transactions); + blk->transactions = NULL; + } + DynArr_remove(chain->blocks, idx); + } + + chain->size = DynArr_size(chain->blocks); + currentBlockHeight = chain->size ? (uint64_t)(chain->size - 1) : 0ULL; + + // Rebuild balance sheet from scratch up to current chain size + BalanceSheet_Destroy(); + BalanceSheet_Init(); + + for (size_t i = 0; i < chain->size; ++i) { + block_t* blk = (block_t*)DynArr_at(chain->blocks, i); + block_t* toProcess = blk; + bool loaded = false; + + if (!blk || !blk->transactions) { + // Try to load from disk + block_t* loadedBlk = NULL; + size_t txCount = 0; + if (!Chain_LoadBlockFromFile(chainDataDir, (uint64_t)i, true, &loadedBlk, &txCount)) { + // Can't rebuild without transactions + pthread_mutex_unlock(&balanceSheetLock); + pthread_rwlock_unlock(&chainLock); + return false; + } + toProcess = loadedBlk; + loaded = true; + } + + // Apply transactions + if (toProcess && toProcess->transactions) { + size_t txCount = DynArr_size(toProcess->transactions); + for (size_t ti = 0; ti < txCount; ++ti) { + signed_transaction_t* tx = (signed_transaction_t*)DynArr_at(toProcess->transactions, ti); + if (!tx) continue; + + // Coinbase credit + if (Address_IsCoinbase(tx->transaction.senderAddress)) { + balance_sheet_entry_t entry; + if (!BalanceSheet_Lookup(tx->transaction.recipientAddress1, &entry)) { + memset(&entry, 0, sizeof(entry)); + memcpy(entry.address, tx->transaction.recipientAddress1, 32); + entry.balance = uint256_from_u64(tx->transaction.amount1); + } else { + (void)uint256_add_u64(&entry.balance, tx->transaction.amount1); + } + (void)BalanceSheet_Insert(entry); + continue; + } + + // Non-coinbase: debit sender + uint256_t spend = uint256_from_u64(0); + (void)uint256_add_u64(&spend, tx->transaction.amount1); + if (tx->transaction.amount2 > 0) (void)uint256_add_u64(&spend, tx->transaction.amount2); + if (tx->transaction.fee > 0) (void)uint256_add_u64(&spend, tx->transaction.fee); + + balance_sheet_entry_t senderEntry; + if (!BalanceSheet_Lookup(tx->transaction.senderAddress, &senderEntry)) { + // Missing sender; create zero and then subtract (will underflow if invalid) + memset(&senderEntry, 0, sizeof(senderEntry)); + memcpy(senderEntry.address, tx->transaction.senderAddress, 32); + senderEntry.balance = uint256_from_u64(0); + } + (void)uint256_subtract(&senderEntry.balance, &spend); + (void)BalanceSheet_Insert(senderEntry); + + // Credit recipient1 + balance_sheet_entry_t rec1; + if (!BalanceSheet_Lookup(tx->transaction.recipientAddress1, &rec1)) { + memset(&rec1, 0, sizeof(rec1)); + memcpy(rec1.address, tx->transaction.recipientAddress1, 32); + rec1.balance = uint256_from_u64(tx->transaction.amount1); + } else { + (void)uint256_add_u64(&rec1.balance, tx->transaction.amount1); + } + (void)BalanceSheet_Insert(rec1); + + // Credit recipient2 if any + if (tx->transaction.amount2 > 0) { + balance_sheet_entry_t rec2; + if (!BalanceSheet_Lookup(tx->transaction.recipientAddress2, &rec2)) { + memset(&rec2, 0, sizeof(rec2)); + memcpy(rec2.address, tx->transaction.recipientAddress2, 32); + rec2.balance = uint256_from_u64(tx->transaction.amount2); + } else { + (void)uint256_add_u64(&rec2.balance, tx->transaction.amount2); + } + (void)BalanceSheet_Insert(rec2); + } + } + } + + if (loaded && toProcess) { + if (toProcess->transactions) DynArr_destroy(toProcess->transactions); + free(toProcess); + } + } + + pthread_mutex_unlock(&balanceSheetLock); + pthread_rwlock_unlock(&chainLock); + + return true; +} + void Chain_Wipe(blockchain_t* chain) { Chain_ClearBlocks(chain); currentBlockHeight = 0; diff --git a/src/main.c b/src/main.c index e977c3b..0a097a4 100644 --- a/src/main.c +++ b/src/main.c @@ -18,6 +18,8 @@ #include #include +#include +#include #ifndef CHAIN_DATA_DIR #define CHAIN_DATA_DIR "chain_data" @@ -28,6 +30,10 @@ const char* chainDataDir = CHAIN_DATA_DIR; uint256_t currentSupply = {{0, 0, 0, 0}}; uint64_t currentReward = 750000000000ULL; +// Define the synchronization primitives declared in runtime_state.h +pthread_rwlock_t chainLock; +pthread_mutex_t balanceSheetLock; + void handle_sigint(int sig) { printf("Caught signal %d, exiting...\n", sig); Block_ShutdownPowContext(); @@ -80,9 +86,10 @@ static block_t* BuildNextBlock(blockchain_t* chain, uint32_t difficultyTarget) { block->header.version = 1; block->header.blockNumber = (uint64_t)Chain_Size(chain); if (Chain_Size(chain) > 0) { - block_t* lastBlock = Chain_GetBlock(chain, Chain_Size(chain) - 1); - if (lastBlock) { + block_t* lastBlock = NULL; + if (Chain_GetBlockCopy(chain, Chain_Size(chain) - 1, &lastBlock)) { Block_CalculateHash(lastBlock, block->header.prevHash); + Block_Destroy(lastBlock); } else { memset(block->header.prevHash, 0, sizeof(block->header.prevHash)); } @@ -155,12 +162,13 @@ static bool ComputeEpochSeedForHeightFromChain(const blockchain_t* chain, uint64 return false; } - block_t* seedBlock = Chain_GetBlock((blockchain_t*)chain, (size_t)seedBlockNumber); - if (!seedBlock) { + block_t* seedBlock = NULL; + if (!Chain_GetBlockCopy((blockchain_t*)chain, (size_t)seedBlockNumber, &seedBlock)) { return false; } Block_CalculateHash(seedBlock, outSeed); + Block_Destroy(seedBlock); return true; } @@ -180,11 +188,10 @@ static bool ComputeEpochDagBytesForHeightFromChain(const blockchain_t* chain, ui return false; } - block_t* lastBlock = Chain_GetBlock((blockchain_t*)chain, (size_t)lastBlockNumber); - block_t* epochStartBlock = Chain_GetBlock((blockchain_t*)chain, (size_t)epochStartBlockNumber); - if (!lastBlock || !epochStartBlock) { - return false; - } + block_t* lastBlock = NULL; + block_t* epochStartBlock = NULL; + if (!Chain_GetBlockCopy((blockchain_t*)chain, (size_t)lastBlockNumber, &lastBlock)) { return false; } + if (!Chain_GetBlockCopy((blockchain_t*)chain, (size_t)epochStartBlockNumber, &epochStartBlock)) { Block_Destroy(lastBlock); return false; } int64_t difficultyDelta = (int64_t)epochStartBlock->header.difficultyTarget - (int64_t)lastBlock->header.difficultyTarget; int64_t growth = (int64_t)((int64_t)DAG_BASE_GROWTH * difficultyDelta); @@ -213,6 +220,8 @@ static bool ComputeEpochDagBytesForHeightFromChain(const blockchain_t* chain, ui } *outDagBytes = (size_t)targetSize; + Block_Destroy(lastBlock); + Block_Destroy(epochStartBlock); return true; } @@ -285,6 +294,15 @@ static bool MineAndAppendBlock(blockchain_t* chain, return false; } + // After successfully appending a block, attempt to attach any orphans. + size_t attached = OrphanPool_AttemptAttach(chain); + if (attached > 0) { + printf("Attached %zu orphan(s) after mining/appending block\n", attached); + // Persist chain/sheet after attaching orphans + Chain_SaveToFile(chain, chainDataDir, *currentSupply, *currentReward); + BalanceSheet_SaveToFile(chainDataDir); + } + uint64_t coinbaseAmount = 0; if (block->transactions && DynArr_size(block->transactions) > 0) { signed_transaction_t* firstTx = (signed_transaction_t*)DynArr_at(block->transactions, 0); @@ -358,13 +376,15 @@ static bool VerifyChainFully(blockchain_t* chain) { uint32_t expectedDifficulty = INITIAL_DIFFICULTY; for (size_t i = 0; i < chainSize; ++i) { - block_t* blk = Chain_GetBlock(chain, i); - if (!blk || !blk->transactions) { + block_t* blk = NULL; + if (!Chain_GetBlockCopy(chain, i, &blk) || !blk || !blk->transactions) { + if (blk) Block_Destroy(blk); Chain_Destroy(prevChain); return false; } if (blk->header.blockNumber != (uint64_t)i) { + Block_Destroy(blk); Chain_Destroy(prevChain); return false; } @@ -372,12 +392,15 @@ static bool VerifyChainFully(blockchain_t* chain) { if (i == 0) { uint8_t zeroHash[32] = {0}; if (memcmp(blk->header.prevHash, zeroHash, sizeof(zeroHash)) != 0) { + Block_Destroy(blk); Chain_Destroy(prevChain); return false; } } else { - block_t* prevBlk = Chain_GetBlock(chain, i - 1); - if (!prevBlk) { + block_t* prevBlk = NULL; + if (!Chain_GetBlockCopy(chain, i - 1, &prevBlk) || !prevBlk) { + if (prevBlk) Block_Destroy(prevBlk); + Block_Destroy(blk); Chain_Destroy(prevChain); return false; } @@ -385,9 +408,12 @@ static bool VerifyChainFully(blockchain_t* chain) { uint8_t expectedPrevHash[32]; Block_CalculateHash(prevBlk, expectedPrevHash); if (memcmp(blk->header.prevHash, expectedPrevHash, sizeof(expectedPrevHash)) != 0) { + Block_Destroy(prevBlk); + Block_Destroy(blk); Chain_Destroy(prevChain); return false; } + Block_Destroy(prevBlk); } // Determine expected difficulty for this block. TODO: Optimize to recompute at adjustment intervals only instead of every block. @@ -400,25 +426,32 @@ static bool VerifyChainFully(blockchain_t* chain) { // Ensure the block's header difficulty matches the expected difficulty (can't cheat easier) if (blk->header.difficultyTarget != expectedDifficulty) { + Block_Destroy(blk); Chain_Destroy(prevChain); return false; } uint8_t powHash[32]; if (!ComputeHistoricalAutolykosHashFromChain(chain, blk, (uint64_t)i, powHash)) { + Block_Destroy(blk); Chain_Destroy(prevChain); return false; } uint8_t target[32]; if (!DecodeCompactTarget(blk->header.difficultyTarget, target)) { + Block_Destroy(blk); + Chain_Destroy(prevChain); return false; } if (CompareHashToTarget(powHash, target) > 0) { + Block_Destroy(blk); + Chain_Destroy(prevChain); return false; } if (!Block_AllTransactionsValid(blk)) { + Block_Destroy(blk); Chain_Destroy(prevChain); return false; } @@ -426,14 +459,17 @@ static bool VerifyChainFully(blockchain_t* chain) { uint8_t expectedMerkle[32]; Block_CalculateMerkleRoot(blk, expectedMerkle); if (memcmp(blk->header.merkleRoot, expectedMerkle, sizeof(expectedMerkle)) != 0) { + Block_Destroy(blk); Chain_Destroy(prevChain); return false; } // Transactions are persisted on disk. Once this block is fully verified, // release its in-memory transaction list to reduce peak memory usage. - DynArr_destroy(blk->transactions); - blk->transactions = NULL; + if (blk->transactions) { + DynArr_destroy(blk->transactions); + blk->transactions = NULL; + } // Push a header-only copy of this block into prevChain for future difficulty calculations. block_t headerOnly; @@ -441,6 +477,8 @@ static bool VerifyChainFully(blockchain_t* chain) { headerOnly.header = blk->header; headerOnly.transactions = NULL; (void)DynArr_push_back(prevChain->blocks, &headerOnly); + + Block_Destroy(blk); } Chain_Destroy(prevChain); @@ -475,7 +513,6 @@ int main(int argc, char* argv[]) { srand((unsigned int)time(NULL)); BalanceSheet_Init(); - blockchain_t* chain = Chain_Create(); if (!chain) { fprintf(stderr, "failed to create chain\n"); @@ -494,6 +531,9 @@ int main(int argc, char* argv[]) { } uint8_t lastSavedHash[32] = {0}; + // Initialize runtime locks before starting modules + pthread_rwlock_init(&chainLock, NULL); + pthread_mutex_init(&balanceSheetLock, NULL); if (!Chain_LoadFromFile(chain, chainDataDir, ¤tSupply, &difficultyTarget, ¤tReward, lastSavedHash, false)) { printf("No existing chain loaded from %s\n", chainDataDir); } @@ -697,12 +737,12 @@ int main(int argc, char* argv[]) { } if (strcmp(cmd, "sync") == 0) { - // Pick outbound peer with highest advertised height if (!node) { printf("no node available\n"); continue; } + // Choose the best outbound peer by advertised height int bestIdx = -1; uint64_t bestHeight = 0; for (size_t i = 0; i < MAX_CONS; ++i) { @@ -720,36 +760,211 @@ int main(int argc, char* argv[]) { } uint64_t localHeight = (uint64_t)Chain_Size(chain); - if (bestHeight <= localHeight) { - printf("already synced (local=%" PRIu64 ", peer=%" PRIu64 ")\n", localHeight, bestHeight); + uint64_t peerHeight = node->outboundClients[bestIdx].peerBlockHeight; + + // Determine if this is an initial sync. If so, do not apply penalty. + bool isInitialSync = (localHeight == 0); + + // Compute penalty and adjusted peer height (skip penalty for initial sync) + uint64_t delay = (peerHeight > localHeight) ? (peerHeight - localHeight) : 0ULL; + uint64_t penalty = isInitialSync ? 0ULL : FetchScheduler_ComputeReorgPenaltyBlocks(delay); + uint64_t adjustedPeerHeight = (peerHeight > penalty) ? (peerHeight - penalty) : 0ULL; + + if (adjustedPeerHeight <= localHeight) { + printf("already synced (local=%" PRIu64 ", peer=%" PRIu64 ", penalty=%" PRIu64 ")\n", localHeight, peerHeight, penalty); continue; } - printf("syncing from peer %d: peerHeight=%" PRIu64 " local=%" PRIu64 "\n", bestIdx, bestHeight, localHeight); + printf("syncing from peer %d: peerHeight=%" PRIu64 " adjusted=%" PRIu64 " local=%" PRIu64 " penalty=%" PRIu64 "\n", + bestIdx, peerHeight, adjustedPeerHeight, localHeight, penalty); tcp_connection_t* peerConn = node->outboundClients[bestIdx].connection; - for (uint64_t h = localHeight; h < bestHeight; ++h) { - uint64_t req = h; - if (Node_SendPacket(node, peerConn, PACKET_TYPE_FETCH_BLOCK, &req, sizeof(req)) != 0) { - printf("failed to send FETCH_BLOCK for %" PRIu64 "\n", req); - break; + + // Windowed parallel fetch + uint64_t start = localHeight; + uint64_t end = adjustedPeerHeight; // exclusive target height + uint64_t nextReq = start; + + const int maxInFlight = MAX_PARALLEL_FETCHES; + uint64_t requestedHeights[64]; + int retryCount[64]; + uint64_t sentAtMs[64]; + int inFlight = 0; + + if (maxInFlight > (int)(sizeof(requestedHeights)/sizeof(requestedHeights[0]))) { + printf("MAX_PARALLEL_FETCHES too large for local buffers\n"); + continue; + } + + // Keep track of expected last-hash to detect reorgs. Initialize to our current tip. + uint8_t expectedPrevHash[32]; + if (localHeight > 0) { + block_t* lastBlock = NULL; + if (Chain_GetBlockCopy(chain, localHeight - 1, &lastBlock)) { + Block_CalculateHash(lastBlock, expectedPrevHash); + Block_Destroy(lastBlock); + } else { + memset(expectedPrevHash, 0, sizeof(expectedPrevHash)); + } + } else { + memset(expectedPrevHash, 0, sizeof(expectedPrevHash)); + } + + while (nextReq < end || inFlight > 0) { + // Fill window + while (inFlight < maxInFlight && nextReq < end) { + uint64_t req = nextReq; + if (Node_SendPacket(node, peerConn, PACKET_TYPE_FETCH_BLOCK, &req, sizeof(req)) != 0) { + printf("failed to send FETCH_BLOCK for %" PRIu64 "\n", req); + break; + } + + requestedHeights[inFlight] = req; + retryCount[inFlight] = 0; + sentAtMs[inFlight] = get_current_time_ms(); + inFlight++; + nextReq++; } - // Wait up to 5 seconds for the block to be applied (Chain_Size to increase) - const int timeoutMs = 5000; - const int pollIntervalMs = 100; - int waited = 0; - uint64_t startSize = (uint64_t)Chain_Size(chain); - while ((uint64_t)Chain_Size(chain) == startSize && waited < timeoutMs) { - usleep(pollIntervalMs * 1000); - waited += pollIntervalMs; + // Poll for completions or timeouts + if (inFlight == 0) { + // nothing in flight; small sleep to avoid busy-loop + usleep(100 * 1000); + continue; } - if ((uint64_t)Chain_Size(chain) == startSize) { - printf("timed out waiting for block %" PRIu64 "\n", req); - break; + uint64_t now = get_current_time_ms(); + // Check earliest outstanding entry for completion/timeout + bool progressed = false; + for (int i = 0; i < inFlight; ++i) { + uint64_t h = requestedHeights[i]; + if ((uint64_t)Chain_Size(chain) > h) { + // A new block at height h was applied. Retrieve it and verify parent. + block_t* fetched = NULL; + if (!Chain_GetBlockCopy(chain, (size_t)h, &fetched) || !fetched) { + // Shouldn't happen, but be robust. + printf("fetched block %" PRIu64 " applied but not found\n", h); + // remove entry + for (int j = i; j < inFlight - 1; ++j) { + requestedHeights[j] = requestedHeights[j + 1]; + retryCount[j] = retryCount[j + 1]; + sentAtMs[j] = sentAtMs[j + 1]; + } + inFlight--; + progressed = true; + break; + } + + // Check whether this block builds on our expected tip. If not, it's a reorg. + if (memcmp(fetched->header.prevHash, expectedPrevHash, sizeof(expectedPrevHash)) != 0) { + // Find matching ancestor in our current chain (if any) + ssize_t matchIndex = -1; + size_t chainSz = Chain_Size(chain); + uint8_t tmpHash[32]; + for (size_t bi = 0; bi < chainSz; ++bi) { + block_t* b = NULL; + if (!Chain_GetBlockCopy(chain, bi, &b) || !b) continue; + Block_CalculateHash(b, tmpHash); + if (memcmp(tmpHash, fetched->header.prevHash, sizeof(tmpHash)) == 0) { + matchIndex = (ssize_t)bi; + Block_Destroy(b); + break; + } + Block_Destroy(b); + } + + uint64_t reorgDepth = 0ULL; + if (matchIndex >= 0) { + reorgDepth = (uint64_t)localHeight - ((uint64_t)matchIndex + 1ULL); + } else { + // No match found: treat as full reorg depth equal to localHeight + reorgDepth = localHeight; + } + + if (!isInitialSync) { + uint64_t reorgPenalty = FetchScheduler_ComputeReorgPenaltyBlocks(reorgDepth); + printf("Reorg detected at height %" PRIu64 ": depth=%" PRIu64 " penalty=%" PRIu64 "\n", + h, reorgDepth, reorgPenalty); + + // Rollback our chain to the matching ancestor (or to 0 if none) + size_t rollbackTo = (matchIndex >= 0) ? (size_t)(matchIndex + 1) : 0; + if (!Chain_RollbackToHeight(chain, rollbackTo)) { + printf("Failed to rollback to height %zu during reorg handling\n", rollbackTo); + inFlight = 0; // abort sync + break; + } + + // Apply additional penalty by shrinking end and restart window from current Chain_Size + if (peerHeight > reorgPenalty) { + end = peerHeight - reorgPenalty; + } else { + end = start; + } + } else { + printf("Initial sync: reorg-like divergence ignored (height=%" PRIu64 ")\n", h); + } + + // Free fetched block and reset window to pick up new adjusted end and expectedPrevHash + Block_Destroy(fetched); + nextReq = Chain_Size(chain); + inFlight = 0; + // Recompute expectedPrevHash to current tip + if (Chain_Size(chain) > 0) { + block_t* tip = NULL; + if (Chain_GetBlockCopy(chain, Chain_Size(chain) - 1, &tip) && tip) { + Block_CalculateHash(tip, expectedPrevHash); + Block_Destroy(tip); + } + } else { + memset(expectedPrevHash, 0, sizeof(expectedPrevHash)); + } + + progressed = true; + break; // restart loop + } + + printf("fetched block %" PRIu64 "\n", h); + // Update expectedPrevHash to this fetched block's hash (for next block) + Block_CalculateHash(fetched, expectedPrevHash); + // remove entry i by shifting left + for (int j = i; j < inFlight - 1; ++j) { + requestedHeights[j] = requestedHeights[j + 1]; + retryCount[j] = retryCount[j + 1]; + sentAtMs[j] = sentAtMs[j + 1]; + } + inFlight--; + progressed = true; + Block_Destroy(fetched); + break; // restart loop to re-evaluate + } + + uint64_t elapsed = (now > sentAtMs[i]) ? (now - sentAtMs[i]) : 0ULL; + if (elapsed > SYNC_REQUEST_TIMEOUT_MS) { + if (retryCount[i] < MAX_SYNC_RETRIES) { + // retry with exponential backoff + retryCount[i]++; + uint64_t backoff = SYNC_BACKOFF_BASE_MS * (1ULL << (retryCount[i] - 1)); + usleep((useconds_t)(backoff * 1000ULL)); + + uint64_t req = requestedHeights[i]; + if (Node_SendPacket(node, peerConn, PACKET_TYPE_FETCH_BLOCK, &req, sizeof(req)) != 0) { + printf("retry: failed to send FETCH_BLOCK for %" PRIu64 "\n", req); + } else { + sentAtMs[i] = get_current_time_ms(); + progressed = true; + } + } else { + printf("timed out fetching block %" PRIu64 ", giving up\n", requestedHeights[i]); + inFlight = 0; // abort sync on persistent failures + break; + } + } + } + + if (!progressed) { + // small sleep to avoid spinning + usleep(50 * 1000); } - printf("fetched block %" PRIu64 "\n", req); } printf("sync complete: localHeight=%zu\n", Chain_Size(chain)); @@ -938,5 +1153,8 @@ int main(int argc, char* argv[]) { Chain_Destroy(chain); BalanceSheet_Destroy(); + pthread_mutex_destroy(&balanceSheetLock); + pthread_rwlock_destroy(&chainLock); + return 0; } diff --git a/src/nets/fetch_scheduler.c b/src/nets/fetch_scheduler.c new file mode 100644 index 0000000..3c1b5a3 --- /dev/null +++ b/src/nets/fetch_scheduler.c @@ -0,0 +1,24 @@ +#include +#include +#include + +// Note: floating point is used intentionally here for readability and +// because the final penalty is rounded to whole blocks. This keeps the +// implementation straightforward while avoiding subtle integer overflow +// for large exponents. If desired, replace with fixed-point arithmetic. +uint64_t FetchScheduler_ComputeReorgPenaltyBlocks(uint64_t delayBlocks) { + if (delayBlocks <= REORG_PENALTY_GRACE_BLOCKS) { + return 0ULL; + } + + double B = (double)delayBlocks; + double factor = REORG_PENALTY_FACTOR; + double exp = REORG_PENALTY_EXPONENT; + double timeScale = ((double)TARGET_BLOCK_TIME) / REORG_PENALTY_REF_BLOCK_TIME; + + double raw = factor * pow(B, exp) * timeScale; + if (raw < 0.0) raw = 0.0; + + uint64_t penalty = (uint64_t)ceil(raw); + return penalty; +} diff --git a/src/nets/net_node.c b/src/nets/net_node.c index fbae2db..5f3dd39 100644 --- a/src/nets/net_node.c +++ b/src/nets/net_node.c @@ -7,6 +7,10 @@ #include #include #include +#include +#include +#include +#include static net_node_t* Node_FromConnection(tcp_connection_t* conn) { if (!conn) { @@ -24,6 +28,23 @@ static uint64_t Node_GetCurrentBlockHeight(void) { return currentBlockHeight; } +static void* Node_MaintenanceThread(void* arg) { + net_node_t* n = (net_node_t*)arg; + if (!n) return NULL; + while (n->maintenanceRunning) { + if (currentChain) { + size_t attached = OrphanPool_AttemptAttach(currentChain); + if (attached > 0) { + printf("Maintenance: attached %zu orphan(s)\n", attached); + Chain_SaveToFile(currentChain, chainDataDir, currentSupply, currentReward); + BalanceSheet_SaveToFile(chainDataDir); + } + } + usleep((useconds_t)(n->maintenanceIntervalMs * 1000)); + } + return NULL; +} + static int Node_DecodePacket(const tcp_connection_t* conn, packet_type_t* outType, const unsigned char** outPayload, size_t* outPayloadLen) { if (!conn || !outType || !outPayload || !outPayloadLen || conn->dataBufLen < 1 || !conn->dataBuf) { return -1; @@ -92,6 +113,19 @@ static bool Node_ParseAndAcceptBlock(const unsigned char* payload, size_t payloa return false; } + // If parent is missing, insert into orphan pool instead of rejecting immediately. + if (blk->header.blockNumber > 0) { + uint64_t parentIndex = blk->header.blockNumber - 1; + block_t* parentCopy = NULL; + if (parentIndex >= Chain_Size(currentChain) || !Chain_GetBlockCopy(currentChain, (size_t)parentIndex, &parentCopy) || !parentCopy) { + // Insert into orphan pool and take ownership of blk + OrphanPool_Insert(blk, blockHeight); + if (parentCopy) Block_Destroy(parentCopy); + return false; + } + Block_Destroy(parentCopy); + } + if (!Chain_AddBlock(currentChain, blk)) { // Chain_AddBlock failed; cleanup if (blk->transactions) { @@ -109,6 +143,14 @@ static bool Node_ParseAndAcceptBlock(const unsigned char* payload, size_t payloa // Chain_AddBlock copied the block into the chain; free our temporary wrapper but do NOT destroy transactions (they are freed by Chain_SaveToFile when persisted) free(blk); + // Attempt to attach any orphans that may now have their parents present. + size_t attached = OrphanPool_AttemptAttach(currentChain); + if (attached > 0) { + printf("Attached %zu orphan(s) after accepting block\n", attached); + // Persist after attaching orphans + Chain_SaveToFile(currentChain, chainDataDir, currentSupply, currentReward); + BalanceSheet_SaveToFile(chainDataDir); + } return true; } @@ -160,6 +202,16 @@ net_node_t* Node_Create() { TcpServer_Start(node->server, MAX_CONS); + OrphanPool_Init(); + + // Start maintenance thread + node->maintenanceRunning = 1; + node->maintenanceIntervalMs = 1000; // 1s + if (pthread_create(&node->maintenanceThread, NULL, Node_MaintenanceThread, node) != 0) { + // Failed to start maintenance thread; continue without it + node->maintenanceRunning = 0; + } + return node; } @@ -178,6 +230,14 @@ void Node_Destroy(net_node_t* node) { TcpServer_Destroy(node->server); } + // Stop maintenance thread + if (node->maintenanceRunning) { + node->maintenanceRunning = 0; + pthread_join(node->maintenanceThread, NULL); + } + + OrphanPool_Destroy(); + free(node); } @@ -346,45 +406,53 @@ void Node_Server_OnData(tcp_connection_t* client) { return; } - // Find the block - block_t* block = Chain_GetBlock(currentChain, (size_t)requestedHeight); - if (!block) { - printf("Requested block height %" PRIu64 " not found, ignoring\n", requestedHeight); - const char* msg = "Requested block not found!"; - Node_SendPacket(Node_FromConnection(client), client, PACKET_TYPE_ERROR, msg, strlen(msg)); - return; - } - - if (block->transactions == NULL) { - // Just reload from disk pure - Chain_LoadBlockFromFile(chainDataDir, requestedHeight - 1, true, &block, NULL); // blockNumber = height - 1 because of 0-indexing + // Find the block (deep-copy it for safe access) + block_t* block = NULL; + if (!Chain_GetBlockCopy(currentChain, (size_t)requestedHeight, &block) || !block) { + // Try loading from disk + if (!Chain_LoadBlockFromFile(chainDataDir, requestedHeight, true, &block, NULL) || !block) { + printf("Requested block height %" PRIu64 " not found, ignoring\n", requestedHeight); + const char* msg = "Requested block not found!"; + Node_SendPacket(Node_FromConnection(client), client, PACKET_TYPE_ERROR, msg, strlen(msg)); + return; + } } // Serialize into a BLOCK_DATA packet [block header][tx count - 8 bytes][transactions...] - size_t blockDataSize = sizeof(block_header_t) + sizeof(uint64_t) + (block->transactions ? block->transactions->size * sizeof(signed_transaction_t) : 0); + size_t txCount = block->transactions ? DynArr_size(block->transactions) : 0; + size_t blockDataSize = sizeof(uint64_t) + sizeof(block_header_t) + sizeof(uint64_t) + (txCount * sizeof(signed_transaction_t)); unsigned char* blockData = (unsigned char*)malloc(blockDataSize); if (!blockData) { // Generic error response printf("Failed to allocate memory for block data response to node %u\n", client ? client->connectionId : 0U); const char* msg = "Generic error for block data!"; Node_SendPacket(Node_FromConnection(client), client, PACKET_TYPE_ERROR, msg, strlen(msg)); + Block_Destroy(block); return; } size_t offset = 0; + // Write height first + uint64_t heightLE = requestedHeight; + memcpy(blockData + offset, &heightLE, sizeof(heightLE)); + offset += sizeof(heightLE); memcpy(blockData + offset, &block->header, sizeof(block_header_t)); offset += sizeof(block_header_t); - uint64_t txCount = block->transactions ? (uint64_t)block->transactions->size : 0; - memcpy(blockData + offset, &txCount, sizeof(txCount)); - offset += sizeof(txCount); - if (block->transactions && block->transactions->size > 0) { - memcpy(blockData + offset, block->transactions->data, block->transactions->size * sizeof(signed_transaction_t)); - offset += block->transactions->size * sizeof(signed_transaction_t); + uint64_t txCount64 = (uint64_t)txCount; + memcpy(blockData + offset, &txCount64, sizeof(txCount64)); + offset += sizeof(txCount64); + if (block->transactions && txCount > 0) { + for (size_t ti = 0; ti < txCount; ++ti) { + signed_transaction_t* tx = (signed_transaction_t*)DynArr_at(block->transactions, ti); + memcpy(blockData + offset, tx, sizeof(signed_transaction_t)); + offset += sizeof(signed_transaction_t); + } } // Send the block data Node_SendPacket(Node_FromConnection(client), client, PACKET_TYPE_BLOCK_DATA, blockData, offset); free(blockData); + Block_Destroy(block); } case PACKET_TYPE_BLOCK_DATA: { // Server can't receive these! diff --git a/src/nets/orphan_pool.c b/src/nets/orphan_pool.c new file mode 100644 index 0000000..b87600c --- /dev/null +++ b/src/nets/orphan_pool.c @@ -0,0 +1,94 @@ +#include +#include +#include +#include +#include + +typedef struct { + block_t* block; + uint64_t height; +} orphan_entry_t; + +static DynArr* g_orphans = NULL; + +void OrphanPool_Init(void) { + if (g_orphans) return; + g_orphans = DYNARR_CREATE(orphan_entry_t, 16); +} + +void OrphanPool_Destroy(void) { + if (!g_orphans) return; + size_t n = DynArr_size(g_orphans); + for (size_t i = 0; i < n; ++i) { + orphan_entry_t* e = (orphan_entry_t*)DynArr_at(g_orphans, i); + if (e && e->block) { + Block_Destroy(e->block); + } + } + DynArr_destroy(g_orphans); + g_orphans = NULL; +} + +void OrphanPool_Insert(block_t* block, uint64_t height) { + if (!block) return; + if (!g_orphans) OrphanPool_Init(); + orphan_entry_t e; + e.block = block; + e.height = height; + (void)DynArr_push_back(g_orphans, &e); +} + +size_t OrphanPool_AttemptAttach(blockchain_t* chain) { + if (!g_orphans || !chain) return 0; + size_t attached = 0; + bool madeProgress = true; + + // Attempt repeatedly while progress is made (to handle chained orphans) + while (madeProgress) { + madeProgress = false; + size_t n = DynArr_size(g_orphans); + for (size_t i = 0; i < n; ++i) { + orphan_entry_t* e = (orphan_entry_t*)DynArr_at(g_orphans, i); + if (!e || !e->block) continue; + + uint64_t parentIndex = (e->height == 0) ? (uint64_t)-1 : (e->height - 1); + bool parentExists = false; + if (e->height == 0) { + // genesis-style block: parent is zero-hash; accept if chain empty + parentExists = (Chain_Size(chain) == 0); + } else if (parentIndex < Chain_Size(chain)) { + block_t* parent = NULL; + if (Chain_GetBlockCopy(chain, (size_t)parentIndex, &parent) && parent) { + parentExists = true; + Block_Destroy(parent); + } else { + parentExists = false; + } + } + + if (parentExists) { + // Try to add to chain + if (Chain_AddBlock(chain, e->block)) { + attached++; + madeProgress = true; + // remove this entry + DynArr_remove(g_orphans, i); + // adjust indices + n = DynArr_size(g_orphans); + i = (size_t)-1; // reset outer loop + break; + } else { + // Chain_AddBlock rejected it (maybe invalid). Drop it. + Block_Destroy(e->block); + DynArr_remove(g_orphans, i); + n = DynArr_size(g_orphans); + i = (size_t)-1; + madeProgress = true; + break; + } + } + } + } + + return attached; +}