Compare commits

...

10 Commits

Author SHA1 Message Date
4f10f013f6 orphans reorg test 2026-05-15 22:38:18 +02:00
f94655a0ed segfaults and orphans 2026-05-15 22:32:34 +02:00
58ff36b218 cli fix 2026-05-15 19:54:48 +02:00
8f3559b3f6 segfault fix 2026-05-15 19:46:57 +02:00
971a4d9e49 auto resync on penalties 2026-05-15 19:33:57 +02:00
f9c94876d9 reorg bugs 2026-05-15 19:30:58 +02:00
9405801f6b con timeout 2026-05-15 19:28:21 +02:00
0fb2615d4c sync errors 2026-05-15 19:01:51 +02:00
55ca03f4ff orphan test 2026-05-15 18:49:49 +02:00
ce27dafaba todo update, forward block broadcasts, optional echo connect 2026-05-15 18:37:50 +02:00
11 changed files with 526 additions and 67 deletions

View File

@@ -1,6 +1,4 @@
TODO: TODO:
Implement Horizen's "Reorg Penalty" system to make it harder for the young chain to be attacked by a powerful miner.
Make transactions private. A bit more work, but it's a challenge worth taking on. Make transactions private. A bit more work, but it's a challenge worth taking on.
I want to make an "optional privacy" system, where the TX can be public or private. Of course private TXs need more bytes, so the fees (although low) will be higher for them. I want to make an "optional privacy" system, where the TX can be public or private. Of course private TXs need more bytes, so the fees (although low) will be higher for them.
I need to figure out a way to make the privacy work without a UTXO system, and instead, with a "Balance Sheet" approach. I need to figure out a way to make the privacy work without a UTXO system, and instead, with a "Balance Sheet" approach.
@@ -10,6 +8,15 @@ Maybe move the node system to an async event loop instead of spawning threads.
A potential race could occur if the P2P node receives a new block, or flushes a new block to disk while the user is running a full verify. A potential race could occur if the P2P node receives a new block, or flushes a new block to disk while the user is running a full verify.
Maybe think about how block broadcasting works. Instead of unsolicited broadcasting, maybe only advertise a new height and have peers request the block if they want it. This would reduce bandwidth usage, but it also means that blocks won't propagate as fast, which could lead to more orphaned blocks. It's a tradeoff.
Check if Block FullVerify is actually verifying fully (not missing any conditions).
A loophole in the reorg penalty system could potentially exist where someone broadcasts blocks one-at-a-time. Determine a solution to this.
TO TEST:
Implement Horizen's "Reorg Penalty" system to make it harder for the young chain to be attacked by a powerful miner.
DONE: DONE:
I want to move away from the Monero emission. I want to do something a bit radical for cryptocurrency, but I feel like it's necessary to make it more like money: I want to move away from the Monero emission. I want to do something a bit radical for cryptocurrency, but I feel like it's necessary to make it more like money:
a constant inflation rate of 1.5% per year. It's lower than fiat (USD is ~2.8% per year), and it additionally doesn't fluctuate during crisis. It's constant. a constant inflation rate of 1.5% per year. It's lower than fiat (USD is ~2.8% per year), and it additionally doesn't fluctuate during crisis. It's constant.

View File

@@ -40,6 +40,7 @@ bool Block_IsFullyValid(const block_t* block);
void Block_ShutdownPowContext(void); void Block_ShutdownPowContext(void);
void Block_Destroy(block_t* block); void Block_Destroy(block_t* block);
void Block_Print(const block_t* block); void Block_Print(const block_t* block);
void Block_ShortPrint(const block_t* block);
// Deep-copy a block (allocates a new `block_t*`). Caller must call `Block_Destroy`. // Deep-copy a block (allocates a new `block_t*`). Caller must call `Block_Destroy`.
block_t* Block_Copy(const block_t* src); block_t* Block_Copy(const block_t* src);

View File

@@ -12,6 +12,7 @@
// Nets // Nets
#define MAX_CONS 32 // Some baseline for now #define MAX_CONS 32 // Some baseline for now
#define LISTEN_PORT 9393 #define LISTEN_PORT 9393
#define ECHO_PEERS 1 // If non-zero, automatically attempt to connect back to any inbound peers (helps form bidirectional peering)
#define TCP_THREAD_STACK_SIZE (512 * 1024) // 512 KB. We could get away with like 128 KB since it's mostly just recv bufs, but it's good having some breathing room. #define TCP_THREAD_STACK_SIZE (512 * 1024) // 512 KB. We could get away with like 128 KB since it's mostly just recv bufs, but it's good having some breathing room.
// This is also for client threads. The server has the default (~8 MB on POSIX). // This is also for client threads. The server has the default (~8 MB on POSIX).

View File

@@ -14,6 +14,7 @@
#include <stddef.h> #include <stddef.h>
#include <dynarr.h> #include <dynarr.h>
#include <dynset.h>
#include <pthread.h> #include <pthread.h>
@@ -25,6 +26,12 @@ typedef struct {
tcp_server_t* server; tcp_server_t* server;
tcp_client_t outboundClients[MAX_CONS]; tcp_client_t outboundClients[MAX_CONS];
size_t outboundCount; size_t outboundCount;
// Dedup cache for recently seen block hashes (canonical 32-byte hash)
DynSet* seenBlocks;
// Protects seenBlocks
pthread_mutex_t seenLock;
// Protects outboundClients snapshots and peerBlockHeight writes
pthread_mutex_t outboundLock;
void (*on_connect)(tcp_connection_t* conn, void* user); void (*on_connect)(tcp_connection_t* conn, void* user);
void (*on_data)(tcp_connection_t* conn, const unsigned char* data, size_t len, void* user); void (*on_data)(tcp_connection_t* conn, const unsigned char* data, size_t len, void* user);
void (*on_disconnect)(tcp_connection_t* conn, void* user); void (*on_disconnect)(tcp_connection_t* conn, void* user);
@@ -51,6 +58,10 @@ int Node_ConnectStartupPeers(net_node_t* node, const char** ips, const unsigned
int Node_SendPacket(net_node_t* node, tcp_connection_t* conn, packet_type_t packetType, const void* payload, size_t payloadLen); int Node_SendPacket(net_node_t* node, tcp_connection_t* conn, packet_type_t packetType, const void* payload, size_t payloadLen);
// Helpers for outbound peer selection and block broadcast
int Node_GetBestOutboundPeer(net_node_t* node, tcp_connection_t** outConn, uint64_t* outHeight);
void Node_BroadcastChainRange(net_node_t* node, size_t startHeightInclusive, tcp_connection_t* sourceConn);
// Callback logic // Callback logic
void Node_Server_OnConnect(tcp_connection_t* client); void Node_Server_OnConnect(tcp_connection_t* client);
void Node_Server_OnData(tcp_connection_t* client); void Node_Server_OnData(tcp_connection_t* client);

0
myeasylog.log Normal file
View File

View File

@@ -286,6 +286,20 @@ void Block_Print(const block_t* block) {
} }
} }
void Block_ShortPrint(const block_t* block) {
if (!block) return;
printf("Block #%llu: Timestamp %llu, Nonce %llu, DiffTarget 0x%08x, Version %u, PrevHash %02x%02x...%02x%02x, MerkleRoot %02x%02x...%02x%02x, TxCount %zu\n",
(unsigned long long)block->header.blockNumber,
(unsigned long long)block->header.timestamp,
(unsigned long long)block->header.nonce,
block->header.difficultyTarget,
block->header.version,
block->header.prevHash[0], block->header.prevHash[1], block->header.prevHash[30], block->header.prevHash[31],
block->header.merkleRoot[0], block->header.merkleRoot[1], block->header.merkleRoot[30], block->header.merkleRoot[31],
block->transactions ? DynArr_size(block->transactions) : 0);
}
block_t* Block_Copy(const block_t* src) { block_t* Block_Copy(const block_t* src) {
if (!src) return NULL; if (!src) return NULL;
block_t* dst = (block_t*)malloc(sizeof(block_t)); block_t* dst = (block_t*)malloc(sizeof(block_t));

View File

@@ -151,6 +151,15 @@ bool Chain_AddBlock(blockchain_t* chain, block_t* block) {
pthread_rwlock_wrlock(&chainLock); pthread_rwlock_wrlock(&chainLock);
pthread_mutex_lock(&balanceSheetLock); pthread_mutex_lock(&balanceSheetLock);
// Ensure the incoming block's header.blockNumber matches the index it will be appended at.
size_t expectedIndex = DynArr_size(chain->blocks);
if (block->header.blockNumber != expectedIndex) {
// Mismatched block number; reject to avoid duplicate indices or inconsistent headers.
pthread_mutex_unlock(&balanceSheetLock);
pthread_rwlock_unlock(&chainLock);
return false;
}
do { do {
// First pass: ensure all non-coinbase senders can cover the full spend // First pass: ensure all non-coinbase senders can cover the full spend
// (amount1 + amount2 + fee) before mutating the chain or balance sheet. // (amount1 + amount2 + fee) before mutating the chain or balance sheet.
@@ -230,6 +239,9 @@ bool Chain_AddBlock(blockchain_t* chain, block_t* block) {
pthread_mutex_unlock(&balanceSheetLock); pthread_mutex_unlock(&balanceSheetLock);
pthread_rwlock_unlock(&chainLock); pthread_rwlock_unlock(&chainLock);
printf("Added new block to chain:\n");
Block_ShortPrint(block);
return ok; return ok;
} }
@@ -518,9 +530,12 @@ bool Chain_SaveToFile(blockchain_t* chain, const char* dirpath, uint256_t curren
uint64_t byteCount = (uint64_t)pos; // Get the size uint64_t byteCount = (uint64_t)pos; // Get the size
// Save blocks that are not yet saved // Save blocks that are not yet saved
// Acquire write lock to protect block transaction pointers from concurrent freeing.
pthread_rwlock_wrlock(&chainLock);
for (size_t i = savedSize; i < DynArr_size(chain->blocks); i++) { for (size_t i = savedSize; i < DynArr_size(chain->blocks); i++) {
block_t* blk = (block_t*)DynArr_at(chain->blocks, i); block_t* blk = (block_t*)DynArr_at(chain->blocks, i);
if (!blk) { if (!blk) {
pthread_rwlock_unlock(&chainLock);
fclose(metaFile); fclose(metaFile);
fclose(chainFile); fclose(chainFile);
fclose(tableFile); fclose(tableFile);
@@ -539,6 +554,7 @@ bool Chain_SaveToFile(blockchain_t* chain, const char* dirpath, uint256_t curren
for (size_t j = 0; j < txSize; j++) { for (size_t j = 0; j < txSize; j++) {
signed_transaction_t* tx = (signed_transaction_t*)DynArr_at(blk->transactions, j); signed_transaction_t* tx = (signed_transaction_t*)DynArr_at(blk->transactions, j);
if (fwrite(tx, sizeof(signed_transaction_t), 1, chainFile) != 1) { if (fwrite(tx, sizeof(signed_transaction_t), 1, chainFile) != 1) {
pthread_rwlock_unlock(&chainLock);
fclose(chainFile); fclose(chainFile);
fclose(metaFile); fclose(metaFile);
fclose(tableFile); fclose(tableFile);
@@ -559,6 +575,8 @@ bool Chain_SaveToFile(blockchain_t* chain, const char* dirpath, uint256_t curren
blk->transactions = NULL; // Clear transactions to save memory since they're now saved on disk blk->transactions = NULL; // Clear transactions to save memory since they're now saved on disk
} }
pthread_rwlock_unlock(&chainLock);
// Update metadata with new size and last block hash // Update metadata with new size and last block hash
size_t newSize = DynArr_size(chain->blocks); size_t newSize = DynArr_size(chain->blocks);
fseek(metaFile, 0, SEEK_SET); fseek(metaFile, 0, SEEK_SET);

View File

@@ -11,6 +11,7 @@
#include <signal.h> #include <signal.h>
#include <balance_sheet.h> #include <balance_sheet.h>
#include <unistd.h> #include <unistd.h>
#include <errno.h>
#include <constants.h> #include <constants.h>
@@ -651,6 +652,11 @@ int main(int argc, char* argv[]) {
free(block); // Chain stores block by value and owns copied transaction array. free(block); // Chain stores block by value and owns copied transaction array.
// Broadcast newly mined block to outbound peers
if (node) {
Node_BroadcastChainRange(node, Chain_Size(chain) - 1, NULL);
}
if (i % 50 == 0) { if (i % 50 == 0) {
// Mid-mine flush // Mid-mine flush
(void)FlushChainAndSheet(chain, chainDataDir, currentSupply, currentReward); (void)FlushChainAndSheet(chain, chainDataDir, currentSupply, currentReward);
@@ -733,6 +739,9 @@ int main(int argc, char* argv[]) {
FlushChainAndSheet(chain, chainDataDir, currentSupply, currentReward); FlushChainAndSheet(chain, chainDataDir, currentSupply, currentReward);
free(block); free(block);
if (node) {
Node_BroadcastChainRange(node, Chain_Size(chain) - 1, NULL);
}
printf("send committed in mined block\n"); printf("send committed in mined block\n");
continue; continue;
} }
@@ -744,46 +753,45 @@ int main(int argc, char* argv[]) {
} }
// Choose the best outbound peer by advertised height // Choose the best outbound peer by advertised height
int bestIdx = -1; tcp_connection_t* peerConn = NULL;
uint64_t bestHeight = 0; uint64_t peerHeight = 0;
for (size_t i = 0; i < MAX_CONS; ++i) { if (Node_GetBestOutboundPeer(node, &peerConn, &peerHeight) != 0 || !peerConn) {
if (node->outboundClients[i].connection) {
if (node->outboundClients[i].peerBlockHeight > bestHeight) {
bestHeight = node->outboundClients[i].peerBlockHeight;
bestIdx = (int)i;
}
}
}
if (bestIdx < 0) {
printf("no outbound peers to sync from\n"); printf("no outbound peers to sync from\n");
continue; continue;
} }
uint64_t localHeight = (uint64_t)Chain_Size(chain); // Continue syncing in a loop until we've caught up to the peer or no progress is made.
uint64_t peerHeight = node->outboundClients[bestIdx].peerBlockHeight; bool madeProgressOverall = false;
while (true) {
uint64_t localHeight = (uint64_t)Chain_Size(chain);
// Determine if this is an initial sync. If so, do not apply penalty. // Only penalize small near-tip gaps. Large gaps are treated as normal catch-up,
bool isInitialSync = (localHeight == 0); // because a much taller peer on the same chain is not evidence of a reorg. TODO: Maybe look at this again some other day.
bool isInitialSync = (localHeight == 0) || ((peerHeight > localHeight) && ((peerHeight - localHeight) > INITIAL_SYNC_HEIGHT_DIFF));
// Compute penalty and adjusted peer height (skip penalty for initial sync) // Compute penalty and adjusted peer height.
uint64_t delay = (peerHeight > localHeight) ? (peerHeight - localHeight) : 0ULL; uint64_t delay = (peerHeight > localHeight) ? (peerHeight - localHeight) : 0ULL;
uint64_t penalty = isInitialSync ? 0ULL : FetchScheduler_ComputeReorgPenaltyBlocks(delay); uint64_t penalty = isInitialSync ? 0ULL : FetchScheduler_ComputeReorgPenaltyBlocks(delay);
uint64_t adjustedPeerHeight = (peerHeight > penalty) ? (peerHeight - penalty) : 0ULL; uint64_t adjustedPeerHeight = (peerHeight > penalty) ? (peerHeight - penalty) : 0ULL;
// Ensure we always make forward progress: if the penalty would reduce the
// target below our current height, fetch at least the next block. This
// lets us apply penalties for near-tip reorg risk while still allowing
// normal syncing when the peer is ahead by a small amount.
if (adjustedPeerHeight <= localHeight) { if (adjustedPeerHeight <= localHeight) {
printf("already synced (local=%" PRIu64 ", peer=%" PRIu64 ", penalty=%" PRIu64 ")\n", localHeight, peerHeight, penalty); adjustedPeerHeight = localHeight + 1;
continue;
} }
printf("syncing from peer %d: peerHeight=%" PRIu64 " adjusted=%" PRIu64 " local=%" PRIu64 " penalty=%" PRIu64 "\n", if (adjustedPeerHeight > peerHeight) {
bestIdx, peerHeight, adjustedPeerHeight, localHeight, penalty); adjustedPeerHeight = peerHeight;
}
tcp_connection_t* peerConn = node->outboundClients[bestIdx].connection; printf("syncing: peerHeight=%" PRIu64 " adjusted=%" PRIu64 " local=%" PRIu64 " penalty=%" PRIu64 "\n",
peerHeight, adjustedPeerHeight, localHeight, penalty);
// Windowed parallel fetch // Windowed parallel fetch
uint64_t start = localHeight; uint64_t start = localHeight;
uint64_t end = adjustedPeerHeight; // exclusive target height uint64_t end = adjustedPeerHeight; // exclusive target height
uint64_t nextReq = start; uint64_t nextReq = start;
const int maxInFlight = MAX_PARALLEL_FETCHES; const int maxInFlight = MAX_PARALLEL_FETCHES;
@@ -968,10 +976,36 @@ int main(int argc, char* argv[]) {
} }
} }
printf("sync complete: localHeight=%zu\n", Chain_Size(chain)); // After the window completes, check progress and possibly refresh peer height
uint64_t newLocal = (uint64_t)Chain_Size(chain);
if (newLocal > localHeight) madeProgressOverall = true;
printf("sync complete: localHeight=%" PRIu64 "\n", newLocal);
// If we've caught up to the peer, stop. Otherwise refresh peerHeight and loop again.
if (newLocal >= peerHeight) break;
// Refresh advertised peer height for this connection (it may have been updated during fetch)
pthread_mutex_lock(&node->outboundLock);
for (size_t i = 0; i < MAX_CONS; ++i) {
if (node->outboundClients[i].connection == peerConn) {
peerHeight = node->outboundClients[i].peerBlockHeight;
break;
}
}
pthread_mutex_unlock(&node->outboundLock);
// If no progress was made in this iteration, stop to avoid tight loop
if (!madeProgressOverall) {
break;
}
// Re-evaluate loop condition: continue while local < peerHeight
if ((uint64_t)Chain_Size(chain) >= peerHeight) break;
continue; continue;
} }
}
if (strcmp(cmd, "blockdetail") == 0) { if (strcmp(cmd, "blockdetail") == 0) {
char* blockNumberStr = strtok(NULL, " \t"); char* blockNumberStr = strtok(NULL, " \t");
char* extra = strtok(NULL, " \t"); char* extra = strtok(NULL, " \t");
@@ -1062,7 +1096,11 @@ int main(int argc, char* argv[]) {
} }
if (Node_ConnectPeer(node, ipStr, LISTEN_PORT) != 0) { if (Node_ConnectPeer(node, ipStr, LISTEN_PORT) != 0) {
printf("failed to connect to %s:%u\n", ipStr, (unsigned int)LISTEN_PORT); if (errno == ETIMEDOUT) {
printf("failed to connect to %s:%u (timeout)\n", ipStr, (unsigned int)LISTEN_PORT);
} else {
printf("failed to connect to %s:%u\n", ipStr, (unsigned int)LISTEN_PORT);
}
continue; continue;
} }

View File

@@ -28,6 +28,12 @@ static uint64_t Node_GetCurrentBlockHeight(void) {
return currentBlockHeight; return currentBlockHeight;
} }
typedef enum {
NODE_BLOCK_REJECTED = 0,
NODE_BLOCK_ORPHAN_QUEUED = 1,
NODE_BLOCK_ACCEPTED = 2
} node_block_accept_result_t;
static void* Node_MaintenanceThread(void* arg) { static void* Node_MaintenanceThread(void* arg) {
net_node_t* n = (net_node_t*)arg; net_node_t* n = (net_node_t*)arg;
if (!n) return NULL; if (!n) return NULL;
@@ -61,18 +67,18 @@ static int Node_DecodePacket(const tcp_connection_t* conn, packet_type_t* outTyp
return 0; return 0;
} }
static bool Node_ParseAndAcceptBlock(const unsigned char* payload, size_t payloadLen, bool persist) { static node_block_accept_result_t Node_ParseAndAcceptBlock(const unsigned char* payload, size_t payloadLen, bool persist) {
if (!payload) { return false; } if (!payload) { return NODE_BLOCK_REJECTED; }
size_t offset = 0; size_t offset = 0;
if (payloadLen < sizeof(uint64_t) + sizeof(block_header_t) + sizeof(uint64_t)) { return false; } if (payloadLen < sizeof(uint64_t) + sizeof(block_header_t) + sizeof(uint64_t)) { return NODE_BLOCK_REJECTED; }
uint64_t blockHeight = 0; uint64_t blockHeight = 0;
memcpy(&blockHeight, payload + offset, sizeof(blockHeight)); memcpy(&blockHeight, payload + offset, sizeof(blockHeight));
offset += sizeof(blockHeight); offset += sizeof(blockHeight);
block_t* blk = (block_t*)calloc(1, sizeof(block_t)); block_t* blk = (block_t*)calloc(1, sizeof(block_t));
if (!blk) { return false; } if (!blk) { return NODE_BLOCK_REJECTED; }
memcpy(&blk->header, payload + offset, sizeof(blk->header)); memcpy(&blk->header, payload + offset, sizeof(blk->header));
blk->header.blockNumber = blockHeight; blk->header.blockNumber = blockHeight;
@@ -83,13 +89,13 @@ static bool Node_ParseAndAcceptBlock(const unsigned char* payload, size_t payloa
offset += sizeof(txCount); offset += sizeof(txCount);
blk->transactions = DYNARR_CREATE(signed_transaction_t, txCount == 0 ? 1 : (size_t)txCount); blk->transactions = DYNARR_CREATE(signed_transaction_t, txCount == 0 ? 1 : (size_t)txCount);
if (!blk->transactions) { free(blk); return false; } if (!blk->transactions) { free(blk); return NODE_BLOCK_REJECTED; }
for (uint64_t i = 0; i < txCount; ++i) { for (uint64_t i = 0; i < txCount; ++i) {
if (offset + sizeof(signed_transaction_t) > payloadLen) { if (offset + sizeof(signed_transaction_t) > payloadLen) {
DynArr_destroy(blk->transactions); DynArr_destroy(blk->transactions);
free(blk); free(blk);
return false; return NODE_BLOCK_REJECTED;
} }
signed_transaction_t tx; signed_transaction_t tx;
memcpy(&tx, payload + offset, sizeof(tx)); memcpy(&tx, payload + offset, sizeof(tx));
@@ -97,7 +103,7 @@ static bool Node_ParseAndAcceptBlock(const unsigned char* payload, size_t payloa
if (!DynArr_push_back(blk->transactions, &tx)) { if (!DynArr_push_back(blk->transactions, &tx)) {
DynArr_destroy(blk->transactions); DynArr_destroy(blk->transactions);
free(blk); free(blk);
return false; return NODE_BLOCK_REJECTED;
} }
} }
@@ -106,28 +112,51 @@ static bool Node_ParseAndAcceptBlock(const unsigned char* payload, size_t payloa
printf("Rejected BLOCK_DATA at height %" PRIu64 " during validation\n", blockHeight); printf("Rejected BLOCK_DATA at height %" PRIu64 " during validation\n", blockHeight);
DynArr_destroy(blk->transactions); DynArr_destroy(blk->transactions);
free(blk); free(blk);
return false; return NODE_BLOCK_REJECTED;
} }
if (!currentChain) { if (!currentChain) {
printf("Rejected BLOCK_DATA at height %" PRIu64 ": no active chain\n", blockHeight); printf("Rejected BLOCK_DATA at height %" PRIu64 ": no active chain\n", blockHeight);
DynArr_destroy(blk->transactions); DynArr_destroy(blk->transactions);
free(blk); free(blk);
return false; return NODE_BLOCK_REJECTED;
} }
// If parent is missing, insert into orphan pool instead of rejecting immediately. // If parent is missing, insert into orphan pool instead of rejecting immediately.
if (blk->header.blockNumber > 0) { uint64_t chainSize = Chain_Size(currentChain);
uint64_t parentIndex = blk->header.blockNumber - 1; if (blk->header.blockNumber > chainSize) {
block_t* parentCopy = NULL; // Parent(s) missing; queue as orphan
if (parentIndex >= Chain_Size(currentChain) || !Chain_GetBlockCopy(currentChain, (size_t)parentIndex, &parentCopy) || !parentCopy) { OrphanPool_Insert(blk, blockHeight);
// Insert into orphan pool and take ownership of blk printf("Queued orphan BLOCK_DATA at height %" PRIu64 "\n", blockHeight);
OrphanPool_Insert(blk, blockHeight); return NODE_BLOCK_ORPHAN_QUEUED;
if (parentCopy) Block_Destroy(parentCopy); } else if (blk->header.blockNumber < chainSize) {
printf("Queued orphan BLOCK_DATA at height %" PRIu64 "\n", blockHeight); // Older block than current chain tip: reject
return true; printf("Rejected BLOCK_DATA at height %" PRIu64 ": older than current chain\n", blockHeight);
DynArr_destroy(blk->transactions);
free(blk);
return NODE_BLOCK_REJECTED;
} else {
// blk->header.blockNumber == chainSize -> candidate to append. Ensure prevHash matches current tip.
if (chainSize > 0) {
block_t* last = NULL;
if (!Chain_GetBlockCopy(currentChain, (size_t)(chainSize - 1), &last) || !last) {
// Can't verify parent; queue as orphan conservatively
OrphanPool_Insert(blk, blockHeight);
printf("Queued orphan BLOCK_DATA at height %" PRIu64 " (unable to verify parent)\n", blockHeight);
if (last) Block_Destroy(last);
return NODE_BLOCK_ORPHAN_QUEUED;
}
uint8_t lastHash[32];
Block_CalculateHash(last, lastHash);
if (memcmp(lastHash, blk->header.prevHash, 32) != 0) {
// Conflicting block at same height; queue as orphan until resolved by a subsequent extension.
OrphanPool_Insert(blk, blockHeight);
Block_Destroy(last);
printf("Queued conflicting BLOCK_DATA at same height %" PRIu64 " as orphan\n", blockHeight);
return NODE_BLOCK_ORPHAN_QUEUED;
}
Block_Destroy(last);
} }
Block_Destroy(parentCopy);
} }
if (!Chain_AddBlock(currentChain, blk)) { if (!Chain_AddBlock(currentChain, blk)) {
@@ -137,7 +166,7 @@ static bool Node_ParseAndAcceptBlock(const unsigned char* payload, size_t payloa
DynArr_destroy(blk->transactions); DynArr_destroy(blk->transactions);
} }
free(blk); free(blk);
return false; return NODE_BLOCK_REJECTED;
} }
// Persist on accept if requested // Persist on accept if requested
@@ -156,7 +185,7 @@ static bool Node_ParseAndAcceptBlock(const unsigned char* payload, size_t payloa
Chain_SaveToFile(currentChain, chainDataDir, currentSupply, currentReward); Chain_SaveToFile(currentChain, chainDataDir, currentSupply, currentReward);
BalanceSheet_SaveToFile(chainDataDir); BalanceSheet_SaveToFile(chainDataDir);
} }
return true; return NODE_BLOCK_ACCEPTED;
} }
static void Node_ForwardConnect(net_node_t* node, tcp_connection_t* conn) { static void Node_ForwardConnect(net_node_t* node, tcp_connection_t* conn) {
@@ -198,6 +227,11 @@ net_node_t* Node_Create() {
} }
} }
// Initialize outbound lock and seen-block cache
pthread_mutex_init(&node->seenLock, NULL);
pthread_mutex_init(&node->outboundLock, NULL);
node->seenBlocks = DynSet_Create(32); // 32-byte canonical hashes
TcpServer_Init(node->server, LISTEN_PORT, "0.0.0.0"); TcpServer_Init(node->server, LISTEN_PORT, "0.0.0.0");
node->server->owner = node; node->server->owner = node;
@@ -243,6 +277,13 @@ void Node_Destroy(net_node_t* node) {
OrphanPool_Destroy(); OrphanPool_Destroy();
if (node->seenBlocks) {
DynSet_Destroy(node->seenBlocks);
node->seenBlocks = NULL;
}
pthread_mutex_destroy(&node->seenLock);
pthread_mutex_destroy(&node->outboundLock);
free(node); free(node);
} }
@@ -340,6 +381,36 @@ void Node_Server_OnConnect(tcp_connection_t* client) {
net_node_t* node = Node_FromConnection(client); net_node_t* node = Node_FromConnection(client);
Node_ForwardConnect(node, client); Node_ForwardConnect(node, client);
printf("Inbound node connected: %u\n", client ? client->connectionId : 0U); printf("Inbound node connected: %u\n", client ? client->connectionId : 0U);
#if ECHO_PEERS
if (node && client) {
// Attempt to create an outbound connection back to the peer's IP on our LISTEN_PORT.
// We avoid connecting if we already have an outbound to the same IP.
char ipbuf[INET_ADDRSTRLEN];
if (inet_ntop(AF_INET, &client->peerAddr.sin_addr, ipbuf, sizeof(ipbuf))) {
// Use LISTEN_PORT as target port for peer's listening service, not the ephemeral source port.
unsigned short targetPort = LISTEN_PORT;
int shouldConnect = 1;
pthread_mutex_lock(&node->outboundLock);
for (size_t i = 0; i < MAX_CONS; ++i) {
if (node->outboundClients[i].connection) {
struct in_addr otherAddr = node->outboundClients[i].connection->peerAddr.sin_addr;
if (otherAddr.s_addr == client->peerAddr.sin_addr.s_addr) {
shouldConnect = 0;
break;
}
}
}
pthread_mutex_unlock(&node->outboundLock);
if (shouldConnect) {
// Try to connect; ignore failure silently
(void)Node_ConnectPeer(node, ipbuf, targetPort);
}
}
}
#endif
} }
void Node_Server_OnData(tcp_connection_t* client) { void Node_Server_OnData(tcp_connection_t* client) {
@@ -498,10 +569,21 @@ void Node_Server_OnData(tcp_connection_t* client) {
} }
case PACKET_TYPE_BROADCAST_BLOCK: { case PACKET_TYPE_BROADCAST_BLOCK: {
// Accept broadcast blocks from peers and try to append // Accept broadcast blocks from peers and try to append
if (Node_ParseAndAcceptBlock(payload, payloadLen, true)) { if (payloadLen >= sizeof(uint64_t)) {
printf("Accepted BROADCAST_BLOCK from node %u\n", client ? client->connectionId : 0U); uint64_t blockHeight = 0;
} else { memcpy(&blockHeight, payload, sizeof(blockHeight));
printf("Rejected BROADCAST_BLOCK from node %u\n", client ? client->connectionId : 0U); node_block_accept_result_t result = Node_ParseAndAcceptBlock(payload, payloadLen, true);
if (result == NODE_BLOCK_ACCEPTED) {
printf("Accepted BROADCAST_BLOCK from node %u\n", client ? client->connectionId : 0U);
net_node_t* node = Node_FromConnection(client);
if (node) {
Node_BroadcastChainRange(node, (size_t)blockHeight, client);
}
} else if (result == NODE_BLOCK_ORPHAN_QUEUED) {
printf("Queued orphan BROADCAST_BLOCK from node %u\n", client ? client->connectionId : 0U);
} else {
printf("Rejected BROADCAST_BLOCK from node %u\n", client ? client->connectionId : 0U);
}
} }
break; break;
} }
@@ -594,12 +676,14 @@ void Node_Client_OnData(tcp_connection_t* client) {
// Store peer-advertised height on matching outbound client // Store peer-advertised height on matching outbound client
net_node_t* node = Node_FromConnection(client); net_node_t* node = Node_FromConnection(client);
if (node) { if (node) {
pthread_mutex_lock(&node->outboundLock);
for (size_t i = 0; i < MAX_CONS; ++i) { for (size_t i = 0; i < MAX_CONS; ++i) {
if (node->outboundClients[i].connection == client) { if (node->outboundClients[i].connection == client) {
node->outboundClients[i].peerBlockHeight = blockHeight; node->outboundClients[i].peerBlockHeight = blockHeight;
break; break;
} }
} }
pthread_mutex_unlock(&node->outboundLock);
} }
break; break;
} }
@@ -614,18 +698,64 @@ void Node_Client_OnData(tcp_connection_t* client) {
return; return;
} }
case PACKET_TYPE_BLOCK_DATA: { case PACKET_TYPE_BLOCK_DATA: {
if (Node_ParseAndAcceptBlock(payload, payloadLen, true)) { if (payloadLen >= sizeof(uint64_t)) {
printf("Accepted BLOCK_DATA from node %u\n", client ? client->connectionId : 0U); uint64_t blockHeight = 0;
} else { memcpy(&blockHeight, payload, sizeof(blockHeight));
printf("Rejected BLOCK_DATA from node %u\n", client ? client->connectionId : 0U); node_block_accept_result_t result = Node_ParseAndAcceptBlock(payload, payloadLen, true);
if (result == NODE_BLOCK_ACCEPTED) {
printf("Accepted BLOCK_DATA from node %u\n", client ? client->connectionId : 0U);
net_node_t* node = Node_FromConnection(client);
if (node) {
// Update peer advertised height
pthread_mutex_lock(&node->outboundLock);
for (size_t i = 0; i < MAX_CONS; ++i) {
if (node->outboundClients[i].connection == client) {
if (node->outboundClients[i].peerBlockHeight < blockHeight) {
node->outboundClients[i].peerBlockHeight = blockHeight;
}
break;
}
}
pthread_mutex_unlock(&node->outboundLock);
Node_BroadcastChainRange(node, (size_t)blockHeight, client);
}
} else if (result == NODE_BLOCK_ORPHAN_QUEUED) {
printf("Queued orphan BLOCK_DATA from node %u\n", client ? client->connectionId : 0U);
} else {
printf("Rejected BLOCK_DATA from node %u\n", client ? client->connectionId : 0U);
}
} }
break; break;
} }
case PACKET_TYPE_BROADCAST_BLOCK: { case PACKET_TYPE_BROADCAST_BLOCK: {
if (Node_ParseAndAcceptBlock(payload, payloadLen, true)) { if (payloadLen >= sizeof(uint64_t)) {
printf("Accepted BROADCAST_BLOCK from node %u\n", client ? client->connectionId : 0U); uint64_t blockHeight = 0;
} else { memcpy(&blockHeight, payload, sizeof(blockHeight));
printf("Rejected BROADCAST_BLOCK from node %u\n", client ? client->connectionId : 0U); node_block_accept_result_t result = Node_ParseAndAcceptBlock(payload, payloadLen, true);
if (result == NODE_BLOCK_ACCEPTED) {
printf("Accepted BROADCAST_BLOCK from node %u\n", client ? client->connectionId : 0U);
net_node_t* node = Node_FromConnection(client);
if (node) {
// Update peer advertised height
pthread_mutex_lock(&node->outboundLock);
for (size_t i = 0; i < MAX_CONS; ++i) {
if (node->outboundClients[i].connection == client) {
if (node->outboundClients[i].peerBlockHeight < blockHeight) {
node->outboundClients[i].peerBlockHeight = blockHeight;
}
break;
}
}
pthread_mutex_unlock(&node->outboundLock);
Node_BroadcastChainRange(node, (size_t)blockHeight, client);
}
} else if (result == NODE_BLOCK_ORPHAN_QUEUED) {
printf("Queued orphan BROADCAST_BLOCK from node %u\n", client ? client->connectionId : 0U);
} else {
printf("Rejected BROADCAST_BLOCK from node %u\n", client ? client->connectionId : 0U);
}
} }
break; break;
} }
@@ -656,10 +786,128 @@ void Node_Client_OnData(tcp_connection_t* client) {
void Node_Client_OnDisconnect(tcp_connection_t* client) { void Node_Client_OnDisconnect(tcp_connection_t* client) {
net_node_t* node = Node_FromConnection(client); net_node_t* node = Node_FromConnection(client);
if (node && node->outboundCount > 0) { if (node) {
node->outboundCount--; // Clear peer advertised height for this outbound slot
pthread_mutex_lock(&node->outboundLock);
for (size_t i = 0; i < MAX_CONS; ++i) {
if (node->outboundClients[i].connection == client) {
node->outboundClients[i].peerBlockHeight = 0;
break;
}
}
pthread_mutex_unlock(&node->outboundLock);
if (node->outboundCount > 0) {
node->outboundCount--;
}
} }
Node_ForwardDisconnect(node, client); Node_ForwardDisconnect(node, client);
printf("Outbound node disconnected: %u\n", client ? client->connectionId : 0U); printf("Outbound node disconnected: %u\n", client ? client->connectionId : 0U);
} }
int Node_GetBestOutboundPeer(net_node_t* node, tcp_connection_t** outConn, uint64_t* outHeight) {
if (!node || !outConn || !outHeight) return -1;
tcp_connection_t* best = NULL;
uint64_t bestH = 0;
pthread_mutex_lock(&node->outboundLock);
for (size_t i = 0; i < MAX_CONS; ++i) {
if (node->outboundClients[i].connection) {
if (node->outboundClients[i].peerBlockHeight > bestH || best == NULL) {
best = node->outboundClients[i].connection;
bestH = node->outboundClients[i].peerBlockHeight;
}
}
}
pthread_mutex_unlock(&node->outboundLock);
if (!best) return -1;
*outConn = best;
*outHeight = bestH;
return 0;
}
void Node_BroadcastChainRange(net_node_t* node, size_t startHeightInclusive, tcp_connection_t* sourceConn) {
if (!node || !currentChain) return;
size_t chainSize = Chain_Size(currentChain);
if (startHeightInclusive >= chainSize) return;
uint32_t sourceIp = 0;
if (sourceConn) {
sourceIp = sourceConn->peerAddr.sin_addr.s_addr;
}
for (size_t h = startHeightInclusive; h < chainSize; ++h) {
block_t* blk = NULL;
if (!Chain_GetBlockCopy(currentChain, h, &blk) || !blk) {
if (!Chain_LoadBlockFromFile(chainDataDir, h, true, &blk, NULL) || !blk) {
continue;
}
} else if (!blk->transactions) {
block_t* full = NULL;
if (Chain_LoadBlockFromFile(chainDataDir, h, true, &full, NULL) && full) {
Block_Destroy(blk);
blk = full;
}
}
if (!blk || !blk->transactions) {
if (blk) Block_Destroy(blk);
continue;
}
unsigned char hash[32];
Block_CalculateHash(blk, hash);
// Dedupe using seenBlocks
int seen = 0;
pthread_mutex_lock(&node->seenLock);
if (DynSet_Contains(node->seenBlocks, hash)) {
seen = 1;
} else {
DynSet_Insert(node->seenBlocks, hash);
}
pthread_mutex_unlock(&node->seenLock);
if (seen) {
Block_Destroy(blk);
continue;
}
// Serialize payload: [uint64_t height][block_header_t][uint64_t txCount][transactions...]
size_t txCount = DynArr_size(blk->transactions);
size_t payloadLen = sizeof(uint64_t) + sizeof(block_header_t) + sizeof(uint64_t) + (txCount * sizeof(signed_transaction_t));
unsigned char* payload = (unsigned char*)malloc(payloadLen);
if (!payload) {
Block_Destroy(blk);
continue;
}
size_t off = 0;
uint64_t h64 = (uint64_t)h;
memcpy(payload + off, &h64, sizeof(h64)); off += sizeof(h64);
memcpy(payload + off, &blk->header, sizeof(block_header_t)); off += sizeof(block_header_t);
uint64_t txCount64 = (uint64_t)txCount;
memcpy(payload + off, &txCount64, sizeof(txCount64)); off += sizeof(txCount64);
for (size_t ti = 0; ti < txCount; ++ti) {
signed_transaction_t* tx = (signed_transaction_t*)DynArr_at(blk->transactions, ti);
memcpy(payload + off, tx, sizeof(signed_transaction_t)); off += sizeof(signed_transaction_t);
}
// Snapshot outbound clients and send
pthread_mutex_lock(&node->outboundLock);
for (size_t i = 0; i < MAX_CONS; ++i) {
tcp_connection_t* conn = node->outboundClients[i].connection;
if (!conn) continue;
if (conn == sourceConn) continue;
if (sourceIp != 0 && conn->peerAddr.sin_addr.s_addr == sourceIp) continue;
Node_SendPacket(node, conn, PACKET_TYPE_BROADCAST_BLOCK, payload, off);
}
pthread_mutex_unlock(&node->outboundLock);
free(payload);
Block_Destroy(blk);
}
}

View File

@@ -67,6 +67,84 @@ size_t OrphanPool_AttemptAttach(blockchain_t* chain) {
} }
if (parentExists) { if (parentExists) {
// Verify that the parent's hash matches the orphan's prevHash before attaching.
bool parentMatches = false;
if (e->height == 0) {
parentMatches = (Chain_Size(chain) == 0);
} else {
block_t* parent = NULL;
if (Chain_GetBlockCopy(chain, (size_t)parentIndex, &parent) && parent) {
uint8_t parentHash[32];
Block_CalculateHash(parent, parentHash);
parentMatches = (memcmp(parentHash, e->block->header.prevHash, 32) == 0);
Block_Destroy(parent);
} else {
parentMatches = false;
}
}
if (!parentMatches) {
// Parent exists but does not match this orphan's prevHash.
// Attempt to detect a longer alternate chain in the orphan pool starting at this height.
// Build a consecutive sequence of orphans from this height upward.
DynArr* seq = DYNARR_CREATE(block_t*, 8);
size_t h = e->height;
while (1) {
bool found = false;
size_t gn = DynArr_size(g_orphans);
for (size_t gi = 0; gi < gn; ++gi) {
orphan_entry_t* oe = (orphan_entry_t*)DynArr_at(g_orphans, gi);
if (!oe || !oe->block) continue;
if (oe->height == h) {
(void)DynArr_push_back(seq, &oe->block);
found = true;
break;
}
}
if (!found) break;
h++;
}
size_t seqCount = DynArr_size(seq);
if (seqCount > 0) {
size_t seqTopHeight = e->height + seqCount - 1;
if (seqTopHeight >= Chain_Size(chain)) {
// Found a candidate longer branch. Perform rollback to fork height and attach sequence.
if (Chain_RollbackToHeight(chain, (size_t)e->height)) {
// Attach in-order
for (size_t si = 0; si < seqCount; ++si) {
block_t* bptr = *(block_t**)DynArr_at(seq, si);
if (!Chain_AddBlock(chain, bptr)) {
// failed to add; stop attempting further
break;
}
// Remove the attached orphan from pool but keep the block object to preserve transactions in-memory (consistent with existing behavior)
// Find and remove corresponding orphan entry
size_t gn2 = DynArr_size(g_orphans);
for (size_t gi2 = 0; gi2 < gn2; ++gi2) {
orphan_entry_t* oe2 = (orphan_entry_t*)DynArr_at(g_orphans, gi2);
if (oe2 && oe2->block == bptr) {
DynArr_remove(g_orphans, gi2);
gn2 = DynArr_size(g_orphans);
gi2 = (size_t)-1; // restart search if needed
}
}
}
attached += seqCount;
madeProgress = true;
DynArr_destroy(seq);
// reset outer loop
n = DynArr_size(g_orphans);
i = (size_t)-1;
break;
}
}
}
DynArr_destroy(seq);
// If we didn't perform a reorg/attach, skip for now.
continue;
}
// Try to add to chain // Try to add to chain
if (Chain_AddBlock(chain, e->block)) { if (Chain_AddBlock(chain, e->block)) {
attached++; attached++;

View File

@@ -9,6 +9,8 @@
#include <string.h> #include <string.h>
#include <sys/socket.h> #include <sys/socket.h>
#include <unistd.h> #include <unistd.h>
#include <fcntl.h>
#include <sys/select.h>
static void* TcpClient_ThreadProc(void* arg) { static void* TcpClient_ThreadProc(void* arg) {
tcp_client_t* client = (tcp_client_t*)arg; tcp_client_t* client = (tcp_client_t*)arg;
@@ -95,11 +97,52 @@ int TcpClient_Connect(
return -1; return -1;
} }
if (connect(sockFd, (struct sockaddr*)&peerAddr, sizeof(peerAddr)) < 0) { // Use non-blocking connect with a timeout to avoid long blocking in the CLI.
close(sockFd); int flags = fcntl(sockFd, F_GETFL, 0);
return -1; if (flags == -1) flags = 0;
fcntl(sockFd, F_SETFL, flags | O_NONBLOCK);
int rc = connect(sockFd, (struct sockaddr*)&peerAddr, sizeof(peerAddr));
if (rc < 0) {
if (errno != EINPROGRESS) {
close(sockFd);
return -1;
}
// Wait up to 5 seconds for the socket to become writable (connected)
struct timeval tv;
tv.tv_sec = 5;
tv.tv_usec = 0;
fd_set wfds;
FD_ZERO(&wfds);
FD_SET(sockFd, &wfds);
int sel = select(sockFd + 1, NULL, &wfds, NULL, &tv);
if (sel <= 0) {
// timeout or error
if (sel == 0) {
errno = ETIMEDOUT;
}
close(sockFd);
return -1;
}
// Check for socket error
int so_error = 0;
socklen_t len = sizeof(so_error);
if (getsockopt(sockFd, SOL_SOCKET, SO_ERROR, &so_error, &len) < 0) {
close(sockFd);
return -1;
}
if (so_error != 0) {
errno = so_error;
close(sockFd);
return -1;
}
} }
// Restore blocking mode
fcntl(sockFd, F_SETFL, flags & ~O_NONBLOCK);
tcp_connection_t* conn = (tcp_connection_t*)malloc(sizeof(*conn)); tcp_connection_t* conn = (tcp_connection_t*)malloc(sizeof(*conn));
if (!conn) { if (!conn) {
close(sockFd); close(sockFd);