Compare commits

..

19 Commits

Author SHA1 Message Date
da50b4e8c1 Start IPv6 - Lord help me 2026-06-03 11:20:19 +02:00
00bd711501 remove proof 2026-05-29 14:31:24 +02:00
17ef3b74fd remove from mempool on mine; TEMPORARY log math proof of fee inclusion in coinbase tx 2026-05-29 14:28:27 +02:00
c1914dc3e7 add optional fee argument to send command 2026-05-29 14:00:58 +02:00
39293029c5 recompute state bug fixed 2026-05-29 13:51:34 +02:00
763aeb648f add fee-aware mining, coinbase validation, and reorg-safe orphan handling
Mining: blocks now include mempool txs, select spendable txs by fee, and pay coinbase as base reward + fees in main.c.
 - Consensus: block validation now enforces coinbase accounting and rejects invalid coinbase placement, including coinbase on amount2, in block.c and transaction.c.
 - Chain state: rollback now rebuilds currentSupply/currentReward, and block addition preflights spendability before mutating balances in chain.c.
 - Orphans/reorgs: orphan retry is safer, rollback-triggered sync reattaches orphans immediately, and transient orphan failures no longer drop blocks in orphan_pool.c and main.c.
 - Networking/mempool: node lifecycle now initializes the mempool, broadcasts can exclude one peer, and mempool snapshotting supports mining selection in net_node.c and txmempool.c.
 - Ledger simulation: added non-mutating spendable-transaction selection for block assembly in balance_sheet.c.
2026-05-29 13:44:15 +02:00
41a154a9fd fix pushing to txmempool 2026-05-29 12:49:51 +02:00
91d7bfa4e7 start adding tx system - test broadcast 2026-05-29 12:45:22 +02:00
4cfe85f6f2 orphans and wallet files 2026-05-28 13:01:23 +02:00
4f10f013f6 orphans reorg test 2026-05-15 22:38:18 +02:00
f94655a0ed segfaults and orphans 2026-05-15 22:32:34 +02:00
58ff36b218 cli fix 2026-05-15 19:54:48 +02:00
8f3559b3f6 segfault fix 2026-05-15 19:46:57 +02:00
971a4d9e49 auto resync on penalties 2026-05-15 19:33:57 +02:00
f9c94876d9 reorg bugs 2026-05-15 19:30:58 +02:00
9405801f6b con timeout 2026-05-15 19:28:21 +02:00
0fb2615d4c sync errors 2026-05-15 19:01:51 +02:00
55ca03f4ff orphan test 2026-05-15 18:49:49 +02:00
ce27dafaba todo update, forward block broadcasts, optional echo connect 2026-05-15 18:37:50 +02:00
23 changed files with 1773 additions and 209 deletions

View File

@@ -1,6 +1,4 @@
TODO:
Implement Horizen's "Reorg Penalty" system to make it harder for the young chain to be attacked by a powerful miner.
Make transactions private. A bit more work, but it's a challenge worth taking on.
I want to make an "optional privacy" system, where the TX can be public or private. Of course private TXs need more bytes, so the fees (although low) will be higher for them.
I need to figure out a way to make the privacy work without a UTXO system, and instead, with a "Balance Sheet" approach.
@@ -10,6 +8,23 @@ Maybe move the node system to an async event loop instead of spawning threads.
A potential race could occur if the P2P node receives a new block, or flushes a new block to disk while the user is running a full verify.
Maybe think about how block broadcasting works. Instead of unsolicited broadcasting, maybe only advertise a new height and have peers request the block if they want it. This would reduce bandwidth usage, but it also means that blocks won't propagate as fast, which could lead to more orphaned blocks. It's a tradeoff.
Check if Block FullVerify is actually verifying fully (not missing any conditions).
A loophole in the reorg penalty system could potentially exist where someone broadcasts blocks one-at-a-time. Determine a solution to this.
IPv6 support for the P2P node. Come on guys, it's 2026. RFC 2460 was in 1998. It's about time.
Like if someone is behind NAT, fine, workable. CGNAT? Lmao good luck.
TO TEST:
Implement Horizen's "Reorg Penalty" system to make it harder for the young chain to be attacked by a powerful miner.
NOTE:
Because tx sizes are currently fixed, mining can use raw fee ordering for now. If tx sizes ever become dynamic, revisit selection to consider fee/byte instead.
Mempool snapshotting for mining should hold the lock only long enough to copy pending txs, but if the mempool grows very large that copy may still be non-trivial.
DONE:
I want to move away from the Monero emission. I want to do something a bit radical for cryptocurrency, but I feel like it's necessary to make it more like money:
a constant inflation rate of 1.5% per year. It's lower than fiat (USD is ~2.8% per year), and it additionally doesn't fluctuate during crisis. It's constant.

View File

@@ -8,6 +8,7 @@
#include <stdio.h>
#include <khash/khash.h>
#include <crypto/crypto.h>
#include <block/transaction.h>
#include <string.h>
#include <utils.h>
#include <uint256.h>
@@ -29,4 +30,12 @@ bool BalanceSheet_LoadFromFile(const char* inPath);
void BalanceSheet_Print();
void BalanceSheet_Destroy();
bool BalanceSheet_SelectSpendableTransactions(
const signed_transaction_t* candidates,
size_t candidateCount,
signed_transaction_t** outAccepted,
size_t* outAcceptedCount,
uint64_t* outTotalFees
);
#endif

View File

@@ -36,10 +36,12 @@ void Block_AddTransaction(block_t* block, signed_transaction_t* tx);
void Block_RemoveTransaction(block_t* block, uint8_t* txHash);
bool Block_HasValidProofOfWork(const block_t* block);
bool Block_AllTransactionsValid(const block_t* block);
bool Block_ValidateCoinbaseAndFees(const block_t* block, uint64_t expectedCoinbaseAmount, uint64_t* outTotalFees);
bool Block_IsFullyValid(const block_t* block);
void Block_ShutdownPowContext(void);
void Block_Destroy(block_t* block);
void Block_Print(const block_t* block);
void Block_ShortPrint(const block_t* block);
// Deep-copy a block (allocates a new `block_t*`). Caller must call `Block_Destroy`.
block_t* Block_Copy(const block_t* src);

View File

@@ -28,6 +28,10 @@ void Chain_Wipe(blockchain_t* chain);
// Returns true on success.
bool Chain_RollbackToHeight(blockchain_t* chain, size_t height);
// Recompute `currentSupply` and `currentReward` from the in-memory chain blocks.
// Returns true on success and updates runtime state globals.
bool Chain_RecomputeRuntimeState(blockchain_t* chain);
// Retrieve a deep copy of the block at `index`. Caller must free with `Block_Destroy`.
bool Chain_GetBlockCopy(blockchain_t* chain, size_t index, block_t** outCopy);

View File

@@ -12,6 +12,7 @@
// Nets
#define MAX_CONS 32 // Some baseline for now
#define LISTEN_PORT 9393
#define ECHO_PEERS 1 // If non-zero, automatically attempt to connect back to any inbound peers (helps form bidirectional peering)
#define TCP_THREAD_STACK_SIZE (512 * 1024) // 512 KB. We could get away with like 128 KB since it's mostly just recv bufs, but it's good having some breathing room.
// This is also for client threads. The server has the default (~8 MB on POSIX).

View File

@@ -14,6 +14,7 @@
#include <stddef.h>
#include <dynarr.h>
#include <dynset.h>
#include <pthread.h>
@@ -25,6 +26,12 @@ typedef struct {
tcp_server_t* server;
tcp_client_t outboundClients[MAX_CONS];
size_t outboundCount;
// Dedup cache for recently seen block hashes (canonical 32-byte hash)
DynSet* seenBlocks;
// Protects seenBlocks
pthread_mutex_t seenLock;
// Protects outboundClients snapshots and peerBlockHeight writes
pthread_mutex_t outboundLock;
void (*on_connect)(tcp_connection_t* conn, void* user);
void (*on_data)(tcp_connection_t* conn, const unsigned char* data, size_t len, void* user);
void (*on_disconnect)(tcp_connection_t* conn, void* user);
@@ -50,6 +57,11 @@ int Node_ConnectPeer(net_node_t* node, const char* ip, unsigned short port);
int Node_ConnectStartupPeers(net_node_t* node, const char** ips, const unsigned short* ports, size_t peersCount);
int Node_SendPacket(net_node_t* node, tcp_connection_t* conn, packet_type_t packetType, const void* payload, size_t payloadLen);
int Node_BroadcastTransaction(net_node_t* node, signed_transaction_t* tx, tcp_connection_t* excludeNode);
// Helpers for outbound peer selection and block broadcast
int Node_GetBestOutboundPeer(net_node_t* node, tcp_connection_t** outConn, uint64_t* outHeight);
void Node_BroadcastChainRange(net_node_t* node, size_t startHeightInclusive, tcp_connection_t* sourceConn);
// Callback logic
void Node_Server_OnConnect(tcp_connection_t* client);

View File

@@ -14,6 +14,9 @@ extern uint256_t currentSupply;
extern uint64_t currentReward;
extern uint32_t difficultyTarget;
extern const char* chainDataDir;
extern unsigned short listenPort;
extern bool echoPeersEnabled;
extern bool forceOrphanReorgEnabled;
// Global synchronization primitives for runtime state
extern pthread_rwlock_t chainLock; // protects chain structure and related mutations

View File

@@ -19,8 +19,17 @@ typedef enum {
typedef struct tcp_connection_t tcp_connection_t;
struct tcp_connection_t {
// TODO: We should make it so only ONE of this needs to be available.
// Because of my temporary "I just need something that works" horseshit that I'm about to write, you'll need IPv4 and IPv6 is optional.
// Note to self: Don't pull an IETF and some "NAT exists, we're fine" bullshit, because if we end up with our eqvivalent of Teredo or CGNAT, I'm gonna be fucking pissed.
// And no, the solution isn't "eh, just bind to 0.0.0.0 and ignore it", because if we do that, we'll inevitably end up with a host that only has IPv6 and then we'll be fucked.
// Honestly, I'm proud of whoever runs IPv6-only. Brave soul.
int sockFd;
struct sockaddr_in peerAddr;
#ifdef USE_IPV6
int sockFd6; // For IPv6 support
struct sockaddr_in6 peerAddr6; // For IPv6 support
#endif
uint32_t connectionId;
tcp_connection_role_t role;
@@ -54,6 +63,7 @@ int TcpConnection_SetDataBuffer(tcp_connection_t* conn, const unsigned char* dat
void TcpConnection_ResetFramingState(tcp_connection_t* conn);
int TcpConnection_FeedFramedData(tcp_connection_t* conn, const unsigned char* input, size_t inputLen);
// This just takes a socket ID, so it's independent from the v4/v6 stuff. It works for both.
int TcpConnection_SendRaw(int sockFd, const void* data, size_t len);
int TcpConnection_SendFramed(tcp_connection_t* conn, const void* payload, size_t payloadLen);

View File

@@ -11,7 +11,12 @@
typedef struct {
int sockFd;
struct sockaddr_in addr;
#ifdef USE_IPV6
int sockFd6; // IPv6 support
struct sockaddr_in6 addr6; // IPv6 support
#endif
int opt;
int opt6; // IPv6 support
int isRunning;
void* owner;

View File

@@ -13,7 +13,10 @@ void TxMempool_Init();
// Assumed that the transation was confirmed to be valid
int TxMempool_Insert(signed_transaction_t tx);
bool TxMempool_Lookup(uint8_t* txHash, signed_transaction_t* out);
bool TxMempool_Snapshot(signed_transaction_t** outTxs, size_t* outCount);
void TxMempool_Print();
// Remove a transaction from the mempool by its hash. Returns true if removed.
bool TxMempool_Remove(const uint8_t* txHash);
void TxMempool_Destroy();
#endif

View File

@@ -167,7 +167,7 @@ static inline bool GenerateTestMinerIdentity(uint8_t privateKey[32], uint8_t com
return false;
}
static inline bool GenerateRandomTestAddress(uint8_t outAddress[32]) {
static inline bool GenerateRandomTestAddress(uint8_t outAddress[32], uint8_t outPrivateKey[32], uint8_t outCompressedPubkey[33]) {
if (!outAddress) {
return false;
}
@@ -200,11 +200,18 @@ static inline bool GenerateRandomTestAddress(uint8_t outAddress[32]) {
}
AddressFromCompressedPubkey(compressedPubkey, outAddress);
if (outPrivateKey) {
memcpy(outPrivateKey, privateKey, 32);
}
if (outCompressedPubkey) {
memcpy(outCompressedPubkey, compressedPubkey, 33);
}
secp256k1_context_destroy(ctx);
return true;
}
secp256k1_context_destroy(ctx);
return false;
}

0
myeasylog.log Normal file
View File

View File

@@ -4,6 +4,140 @@
khash_t(balance_sheet_map_m)* sheetMap = NULL;
static pthread_mutex_t g_sheetLock;
static bool BalanceSheet_GetSimEntry(
khash_t(balance_sheet_map_m)* simMap,
const uint8_t address[32],
balance_sheet_entry_t* out
) {
if (!simMap || !address || !out) {
return false;
}
key32_t key;
memcpy(key.bytes, address, 32);
khiter_t k = kh_get(balance_sheet_map_m, simMap, key);
if (k != kh_end(simMap)) {
*out = kh_value(simMap, k);
return true;
}
if (BalanceSheet_Lookup((uint8_t*)address, out)) {
int ret = 0;
k = kh_put(balance_sheet_map_m, simMap, key, &ret);
if (k == kh_end(simMap)) {
return false;
}
kh_value(simMap, k) = *out;
return true;
}
memset(out, 0, sizeof(*out));
memcpy(out->address, address, 32);
out->balance = uint256_from_u64(0);
int ret = 0;
k = kh_put(balance_sheet_map_m, simMap, key, &ret);
if (k == kh_end(simMap)) {
return false;
}
kh_value(simMap, k) = *out;
return true;
}
static bool BalanceSheet_StoreSimEntry(
khash_t(balance_sheet_map_m)* simMap,
const balance_sheet_entry_t* entry
) {
if (!simMap || !entry) {
return false;
}
key32_t key;
memcpy(key.bytes, entry->address, 32);
int ret = 0;
khiter_t k = kh_put(balance_sheet_map_m, simMap, key, &ret);
if (k == kh_end(simMap)) {
return false;
}
kh_value(simMap, k) = *entry;
return true;
}
static bool BalanceSheet_ApplyCandidateTransaction(
khash_t(balance_sheet_map_m)* simMap,
const signed_transaction_t* tx,
uint64_t* outFee
) {
if (!simMap || !tx) {
return false;
}
if (Address_IsCoinbase(tx->transaction.senderAddress)) {
return true;
}
if (!Transaction_Verify(tx)) {
return false;
}
balance_sheet_entry_t senderEntry;
if (!BalanceSheet_GetSimEntry(simMap, tx->transaction.senderAddress, &senderEntry)) {
return false;
}
uint256_t spend = uint256_from_u64(0);
if (uint256_add_u64(&spend, tx->transaction.amount1) ||
uint256_add_u64(&spend, tx->transaction.amount2) ||
uint256_add_u64(&spend, tx->transaction.fee)) {
return false;
}
if (uint256_cmp(&senderEntry.balance, &spend) < 0) {
return false;
}
if (!uint256_subtract(&senderEntry.balance, &spend)) {
return false;
}
if (!BalanceSheet_StoreSimEntry(simMap, &senderEntry)) {
return false;
}
balance_sheet_entry_t recipient1Entry;
if (!BalanceSheet_GetSimEntry(simMap, tx->transaction.recipientAddress1, &recipient1Entry)) {
return false;
}
if (uint256_add_u64(&recipient1Entry.balance, tx->transaction.amount1)) {
return false;
}
if (!BalanceSheet_StoreSimEntry(simMap, &recipient1Entry)) {
return false;
}
if (tx->transaction.amount2 > 0) {
balance_sheet_entry_t recipient2Entry;
if (!BalanceSheet_GetSimEntry(simMap, tx->transaction.recipientAddress2, &recipient2Entry)) {
return false;
}
if (uint256_add_u64(&recipient2Entry.balance, tx->transaction.amount2)) {
return false;
}
if (!BalanceSheet_StoreSimEntry(simMap, &recipient2Entry)) {
return false;
}
}
if (outFee) {
*outFee = tx->transaction.fee;
}
return true;
}
static int BalanceSheet_InsertLocked(balance_sheet_entry_t entry) {
if (!sheetMap) {
return -1;
@@ -143,3 +277,64 @@ void BalanceSheet_Destroy() {
sheetMap = NULL;
pthread_mutex_destroy(&g_sheetLock);
}
bool BalanceSheet_SelectSpendableTransactions(
const signed_transaction_t* candidates,
size_t candidateCount,
signed_transaction_t** outAccepted,
size_t* outAcceptedCount,
uint64_t* outTotalFees
) {
if (!outAccepted || !outAcceptedCount || !outTotalFees) {
return false;
}
*outAccepted = NULL;
*outAcceptedCount = 0;
*outTotalFees = 0;
if (!candidates || candidateCount == 0) {
return true;
}
signed_transaction_t* accepted = (signed_transaction_t*)calloc(candidateCount, sizeof(signed_transaction_t));
if (!accepted) {
return false;
}
khash_t(balance_sheet_map_m)* simMap = kh_init(balance_sheet_map_m);
if (!simMap) {
free(accepted);
return false;
}
size_t acceptedCount = 0;
uint64_t totalFees = 0;
for (size_t i = 0; i < candidateCount; ++i) {
const signed_transaction_t* tx = &candidates[i];
if (Address_IsCoinbase(tx->transaction.senderAddress)) {
continue;
}
uint64_t fee = 0;
if (!BalanceSheet_ApplyCandidateTransaction(simMap, tx, &fee)) {
continue;
}
accepted[acceptedCount++] = *tx;
totalFees += fee;
}
kh_destroy(balance_sheet_map_m, simMap);
if (acceptedCount == 0) {
free(accepted);
accepted = NULL;
}
*outAccepted = accepted;
*outAcceptedCount = acceptedCount;
*outTotalFees = totalFees;
return true;
}

View File

@@ -214,23 +214,88 @@ bool Block_AllTransactionsValid(const block_t* block) {
for (size_t i = 0; i < DynArr_size(block->transactions); i++) {
signed_transaction_t* tx = (signed_transaction_t*)DynArr_at(block->transactions, i);
if (!Transaction_Verify(tx)) {
return false;
}
if (tx && Address_IsCoinbase(tx->transaction.senderAddress)) {
if (hasCoinbase) {
return false; // More than one coinbase transaction
return false;
}
hasCoinbase = true;
continue; // Coinbase transactions are valid since the miner has the right to create coins. Only rule is one per block.
}
if (!Transaction_Verify(tx)) {
return false;
}
}
return true && hasCoinbase && DynArr_size(block->transactions) > 0; // Every block must have at least one transaction (the coinbase)
}
bool Block_ValidateCoinbaseAndFees(const block_t* block, uint64_t expectedCoinbaseAmount, uint64_t* outTotalFees) {
if (!block || !block->transactions) {
return false;
}
bool hasCoinbase = false;
uint64_t totalFees = 0;
uint8_t zeroAddress[32] = {0};
for (size_t i = 0; i < DynArr_size(block->transactions); ++i) {
signed_transaction_t* tx = (signed_transaction_t*)DynArr_at(block->transactions, i);
if (!tx) {
return false;
}
if (Address_IsCoinbase(tx->transaction.senderAddress)) {
if (hasCoinbase) {
return false;
}
hasCoinbase = true;
if (!Transaction_Verify(tx)) {
return false;
}
if (tx->transaction.fee != 0 || tx->transaction.amount2 != 0) {
return false;
}
if (tx->transaction.amount1 != expectedCoinbaseAmount) {
return false;
}
if (Address_IsCoinbase(tx->transaction.recipientAddress1)) {
return false;
}
if (memcmp(tx->transaction.recipientAddress2, zeroAddress, sizeof(zeroAddress)) != 0) {
return false;
}
continue;
}
if (!Transaction_Verify(tx)) {
return false;
}
if (UINT64_MAX - totalFees < tx->transaction.fee) {
return false;
}
totalFees += tx->transaction.fee;
}
if (!hasCoinbase) {
return false;
}
if (outTotalFees) {
*outTotalFees = totalFees;
}
return true;
}
bool Block_IsFullyValid(const block_t* block) {
bool merkleValid = false;
uint8_t calculatedMerkleRoot[32];
@@ -286,6 +351,20 @@ void Block_Print(const block_t* block) {
}
}
void Block_ShortPrint(const block_t* block) {
if (!block) return;
printf("Block #%llu: Timestamp %llu, Nonce %llu, DiffTarget 0x%08x, Version %u, PrevHash %02x%02x...%02x%02x, MerkleRoot %02x%02x...%02x%02x, TxCount %zu\n",
(unsigned long long)block->header.blockNumber,
(unsigned long long)block->header.timestamp,
(unsigned long long)block->header.nonce,
block->header.difficultyTarget,
block->header.version,
block->header.prevHash[0], block->header.prevHash[1], block->header.prevHash[30], block->header.prevHash[31],
block->header.merkleRoot[0], block->header.merkleRoot[1], block->header.merkleRoot[30], block->header.merkleRoot[31],
block->transactions ? DynArr_size(block->transactions) : 0);
}
block_t* Block_Copy(const block_t* src) {
if (!src) return NULL;
block_t* dst = (block_t*)malloc(sizeof(block_t));

View File

@@ -1,6 +1,7 @@
#include <block/chain.h>
#include <constants.h>
#include <runtime_state.h>
#include <txmempool.h>
#include <errno.h>
#include <limits.h>
#include <sys/stat.h>
@@ -97,6 +98,37 @@ static bool DebitAddress(const uint8_t address[32], const uint256_t* amount) {
return BalanceSheet_Insert(entry) >= 0;
}
bool Chain_RecomputeRuntimeState(blockchain_t* chain) {
if (!chain) {
return false;
}
uint256_t rebuiltSupply = uint256_from_u64(0);
for (size_t i = 0; i < chain->size; ++i) {
block_t* blk = (block_t*)DynArr_at(chain->blocks, i);
if (!blk || !blk->transactions) {
return false;
}
for (size_t j = 0; j < DynArr_size(blk->transactions); ++j) {
signed_transaction_t* tx = (signed_transaction_t*)DynArr_at(blk->transactions, j);
if (!tx) {
return false;
}
if (Address_IsCoinbase(tx->transaction.senderAddress)) {
if (uint256_add_u64(&rebuiltSupply, tx->transaction.amount1)) {
return false;
}
}
}
}
currentSupply = rebuiltSupply;
currentReward = CalculateBlockReward(currentSupply, chain);
return true;
}
static void Chain_ClearBlocks(blockchain_t* chain) {
if (!chain || !chain->blocks) {
return;
@@ -151,35 +183,100 @@ bool Chain_AddBlock(blockchain_t* chain, block_t* block) {
pthread_rwlock_wrlock(&chainLock);
pthread_mutex_lock(&balanceSheetLock);
// Ensure the incoming block's header.blockNumber matches the index it will be appended at.
size_t expectedIndex = DynArr_size(chain->blocks);
if (block->header.blockNumber != expectedIndex) {
// Mismatched block number; reject to avoid duplicate indices or inconsistent headers.
pthread_mutex_unlock(&balanceSheetLock);
pthread_rwlock_unlock(&chainLock);
return false;
}
do {
// First pass: ensure all non-coinbase senders can cover the full spend
// (amount1 + amount2 + fee) before mutating the chain or balance sheet.
size_t txCount = DynArr_size(block->transactions);
signed_transaction_t* candidateTxs = (signed_transaction_t*)calloc(txCount, sizeof(signed_transaction_t));
if (!candidateTxs) {
ok = false;
break;
}
size_t nonCoinbaseCount = 0;
for (size_t i = 0; i < txCount; ++i) {
signed_transaction_t* tx = (signed_transaction_t*)DynArr_at(block->transactions, i);
if (!tx) {
ok = false; break;
ok = false;
break;
}
if (Address_IsCoinbase(tx->transaction.senderAddress)) {
continue;
}
uint256_t spend;
if (!BuildSpendAmount(tx, &spend)) { ok = false; break; }
balance_sheet_entry_t senderEntry;
if (!BalanceSheet_Lookup(tx->transaction.senderAddress, &senderEntry)) {
fprintf(stderr, "Error: Sender address not found in balance sheet during block addition. Bailing!\n");
ok = false; break;
}
if (uint256_cmp(&senderEntry.balance, &spend) < 0) {
fprintf(stderr, "Error: Sender balance insufficient for block transaction. Bailing!\n");
ok = false; break;
candidateTxs[i] = *tx;
if (!Address_IsCoinbase(tx->transaction.senderAddress)) {
++nonCoinbaseCount;
}
}
if (!ok) break;
if (!ok) {
free(candidateTxs);
break;
}
signed_transaction_t* spendableTxs = NULL;
size_t spendableCount = 0;
uint64_t totalFees = 0;
if (!BalanceSheet_SelectSpendableTransactions(candidateTxs, txCount, &spendableTxs, &spendableCount, &totalFees)) {
free(candidateTxs);
ok = false;
break;
}
free(candidateTxs);
if (spendableCount != nonCoinbaseCount) {
free(spendableTxs);
ok = false;
break;
}
uint64_t expectedCoinbaseAmount = currentReward;
if (UINT64_MAX - expectedCoinbaseAmount < totalFees) {
free(spendableTxs);
ok = false;
break;
}
expectedCoinbaseAmount += totalFees;
// Debug: log expected coinbase and fees to aid diagnosis when nodes disagree
{
uint64_t cbAmount = 0;
if (block->transactions && DynArr_size(block->transactions) > 0) {
signed_transaction_t* firstTx = (signed_transaction_t*)DynArr_at(block->transactions, 0);
if (firstTx && Address_IsCoinbase(firstTx->transaction.senderAddress)) {
cbAmount = firstTx->transaction.amount1;
}
}
char supplyStr[80];
Uint256ToDecimal(&currentSupply, supplyStr, sizeof(supplyStr));
printf("Chain_AddBlock: blockIndex=%zu expectedCoinbase=%llu totalFees=%llu observedBlockCoinbase=%llu currentReward=%llu currentSupply=%s\n",
expectedIndex,
(unsigned long long)expectedCoinbaseAmount,
(unsigned long long)totalFees,
(unsigned long long)cbAmount,
(unsigned long long)currentReward,
supplyStr);
}
uint64_t observedFees = 0;
if (!Block_ValidateCoinbaseAndFees(block, expectedCoinbaseAmount, &observedFees) || observedFees != totalFees) {
// Log mismatch details for debugging
printf("Chain_AddBlock: validation failed: expectedCoinbase=%llu totalFees=%llu observedFees=%llu\n",
(unsigned long long)expectedCoinbaseAmount,
(unsigned long long)totalFees,
(unsigned long long)observedFees);
free(spendableTxs);
ok = false;
break;
}
free(spendableTxs);
// Push the block only after validation succeeds.
block_t* blk = (block_t*)DynArr_push_back(chain->blocks, block);
@@ -223,6 +320,21 @@ bool Chain_AddBlock(blockchain_t* chain, block_t* block) {
}
}
}
// Remove mined non-coinbase transactions from the mempool so they are not re-mined or re-broadcast.
if (blk->transactions) {
for (size_t i = 0; i < DynArr_size(blk->transactions); ++i) {
signed_transaction_t* tx = (signed_transaction_t*)DynArr_at(blk->transactions, i);
if (!tx) continue;
if (Address_IsCoinbase(tx->transaction.senderAddress)) continue;
uint8_t txHash[32];
Transaction_CalculateHash(tx, txHash);
if (TxMempool_Remove(txHash)) {
// optional: log removal
// printf("TxMempool_Remove: removed tx from mempool: "); PrintHexBytes(txHash, 32); printf("\n");
}
}
}
// ok remains true if no failures
} while (0);
@@ -230,6 +342,11 @@ bool Chain_AddBlock(blockchain_t* chain, block_t* block) {
pthread_mutex_unlock(&balanceSheetLock);
pthread_rwlock_unlock(&chainLock);
printf("Added new block to chain:\n");
Block_ShortPrint(block);
/* Debug proof removed: coinbase == baseReward + totalFees was printed here during debugging. */
return ok;
}
@@ -422,6 +539,12 @@ bool Chain_RollbackToHeight(blockchain_t* chain, size_t height) {
}
}
if (!Chain_RecomputeRuntimeState(chain)) {
pthread_mutex_unlock(&balanceSheetLock);
pthread_rwlock_unlock(&chainLock);
return false;
}
pthread_mutex_unlock(&balanceSheetLock);
pthread_rwlock_unlock(&chainLock);
@@ -455,112 +578,134 @@ bool Chain_SaveToFile(blockchain_t* chain, const char* dirpath, uint256_t curren
if (!BuildPath(tablePath, sizeof(tablePath), dirpath, "chain.table")) {
return false;
}
// Find metadata file (create if not exists) to get the saved chain size (+ other things)
FILE* metaFile = fopen(metaPath, "rb+");
FILE* chainFile = fopen(chainPath, "rb+");
FILE* tableFile = fopen(tablePath, "rb+");
char metaTmpPath[512];
char chainTmpPath[512];
char tableTmpPath[512];
if (!BuildPath(metaTmpPath, sizeof(metaTmpPath), dirpath, "chain.meta.tmp") ||
!BuildPath(chainTmpPath, sizeof(chainTmpPath), dirpath, "chain.data.tmp") ||
!BuildPath(tableTmpPath, sizeof(tableTmpPath), dirpath, "chain.table.tmp")) {
return false;
}
pthread_rwlock_wrlock(&chainLock);
FILE* metaFile = fopen(metaTmpPath, "wb+");
FILE* chainFile = fopen(chainTmpPath, "wb+");
FILE* tableFile = fopen(tableTmpPath, "wb+");
if (!metaFile || !chainFile || !tableFile) {
// Just overwrite everything
metaFile = fopen(metaPath, "wb+");
if (!metaFile) { return false; }
// Initialize metadata with size 0
size_t initialSize = 0;
fwrite(&initialSize, sizeof(size_t), 1, metaFile);
// Write last block hash (32 bytes of zeros for now)
uint8_t zeroHash[32] = {0};
fwrite(zeroHash, sizeof(uint8_t), 32, metaFile);
uint256_t zeroSupply = {0};
fwrite(&zeroSupply, sizeof(uint256_t), 1, metaFile);
uint32_t initialTarget = INITIAL_DIFFICULTY;
fwrite(&initialTarget, sizeof(uint32_t), 1, metaFile);
uint64_t initialReward = 0;
fwrite(&initialReward, sizeof(uint64_t), 1, metaFile);
chainFile = fopen(chainPath, "wb+");
if (!chainFile) { return false; }
tableFile = fopen(tablePath, "wb+");
if (!tableFile) { return false; }
// TODO: Potentially some other things here, we'll see
}
// Read
size_t savedSize = 0;
fread(&savedSize, sizeof(size_t), 1, metaFile);
uint8_t lastSavedHash[32];
fread(lastSavedHash, sizeof(uint8_t), 32, metaFile);
// Assume chain saved is valid, and that the chain in memory is valid (as LoadFromFile will verify the saved one)
if (savedSize > DynArr_size(chain->blocks)) {
// Saved chain is longer than current chain, this should not happen if we are always saving the current chain, but just in case, fail to save to avoid overwriting a potentially valid longer chain with a shorter one.
fclose(metaFile);
fclose(chainFile);
fclose(tableFile);
if (metaFile) fclose(metaFile);
if (chainFile) fclose(chainFile);
if (tableFile) fclose(tableFile);
pthread_rwlock_unlock(&chainLock);
remove(metaTmpPath);
remove(chainTmpPath);
remove(tableTmpPath);
return false;
}
// Filename format: dirpath/chain.data
// File format: ([block_header][num_transactions][transactions...])[*length] - since block_header is fixed size, LoadFromFile will only read those by default
fseek(chainFile, 0, SEEK_END); // Seek to the end of those files
fseek(tableFile, 0, SEEK_END);
long pos = ftell(chainFile);
if (pos < 0) {
fclose(metaFile);
fclose(chainFile);
fclose(tableFile);
return false;
}
uint64_t byteCount = (uint64_t)pos; // Get the size
// Save blocks that are not yet saved
for (size_t i = savedSize; i < DynArr_size(chain->blocks); i++) {
const size_t chainSize = DynArr_size(chain->blocks);
uint64_t byteCount = 0;
for (size_t i = 0; i < chainSize; ++i) {
block_t* blk = (block_t*)DynArr_at(chain->blocks, i);
if (!blk) {
fclose(metaFile);
fclose(chainFile);
fclose(tableFile);
pthread_rwlock_unlock(&chainLock);
remove(metaTmpPath);
remove(chainTmpPath);
remove(tableTmpPath);
return false;
}
uint64_t preIncrementByteSize = byteCount;
// Construct file path
// Write block header
fwrite(&blk->header, sizeof(block_header_t), 1, chainFile);
size_t txSize = DynArr_size(blk->transactions);
fwrite(&txSize, sizeof(size_t), 1, chainFile); // Write number of transactions
byteCount += sizeof(block_header_t) + sizeof(size_t);
// Write transactions
for (size_t j = 0; j < txSize; j++) {
signed_transaction_t* tx = (signed_transaction_t*)DynArr_at(blk->transactions, j);
if (fwrite(tx, sizeof(signed_transaction_t), 1, chainFile) != 1) {
fclose(chainFile);
block_t* diskCopy = blk;
bool loadedTemp = false;
if (!diskCopy->transactions) {
if (!Chain_LoadBlockFromFile(dirpath, (uint64_t)i, true, &diskCopy, NULL) || !diskCopy || !diskCopy->transactions) {
if (loadedTemp && diskCopy) {
Block_Destroy(diskCopy);
}
fclose(metaFile);
fclose(chainFile);
fclose(tableFile);
pthread_rwlock_unlock(&chainLock);
remove(metaTmpPath);
remove(chainTmpPath);
remove(tableTmpPath);
return false;
}
loadedTemp = true;
}
const uint64_t blockStart = byteCount;
if (fwrite(&diskCopy->header, sizeof(block_header_t), 1, chainFile) != 1) {
if (loadedTemp) Block_Destroy(diskCopy);
fclose(metaFile);
fclose(chainFile);
fclose(tableFile);
pthread_rwlock_unlock(&chainLock);
remove(metaTmpPath);
remove(chainTmpPath);
remove(tableTmpPath);
return false;
}
const size_t txSize = DynArr_size(diskCopy->transactions);
if (fwrite(&txSize, sizeof(size_t), 1, chainFile) != 1) {
if (loadedTemp) Block_Destroy(diskCopy);
fclose(metaFile);
fclose(chainFile);
fclose(tableFile);
pthread_rwlock_unlock(&chainLock);
remove(metaTmpPath);
remove(chainTmpPath);
remove(tableTmpPath);
return false;
}
byteCount += sizeof(block_header_t) + sizeof(size_t);
for (size_t j = 0; j < txSize; ++j) {
signed_transaction_t* tx = (signed_transaction_t*)DynArr_at(diskCopy->transactions, j);
if (!tx || fwrite(tx, sizeof(signed_transaction_t), 1, chainFile) != 1) {
if (loadedTemp) Block_Destroy(diskCopy);
fclose(metaFile);
fclose(chainFile);
fclose(tableFile);
pthread_rwlock_unlock(&chainLock);
remove(metaTmpPath);
remove(chainTmpPath);
remove(tableTmpPath);
return false;
}
byteCount += sizeof(signed_transaction_t);
}
// Create an entry in the block table
block_table_entry_t entry;
entry.blockNumber = i;
entry.byteNumber = preIncrementByteSize;
entry.blockSize = byteCount - preIncrementByteSize;
fwrite(&entry, sizeof(block_table_entry_t), 1, tableFile);
entry.byteNumber = blockStart;
entry.blockSize = byteCount - blockStart;
if (fwrite(&entry, sizeof(block_table_entry_t), 1, tableFile) != 1) {
if (loadedTemp) Block_Destroy(diskCopy);
fclose(metaFile);
fclose(chainFile);
fclose(tableFile);
pthread_rwlock_unlock(&chainLock);
remove(metaTmpPath);
remove(chainTmpPath);
remove(tableTmpPath);
return false;
}
DynArr_destroy(blk->transactions);
blk->transactions = NULL; // Clear transactions to save memory since they're now saved on disk
if (loadedTemp) {
Block_Destroy(diskCopy);
} else if (blk->transactions) {
DynArr_destroy(blk->transactions);
blk->transactions = NULL;
}
}
// Update metadata with new size and last block hash
size_t newSize = DynArr_size(chain->blocks);
size_t newSize = chainSize;
fseek(metaFile, 0, SEEK_SET);
fwrite(&newSize, sizeof(size_t), 1, metaFile);
uint32_t difficultyTarget = INITIAL_DIFFICULTY;
@@ -578,16 +723,23 @@ bool Chain_SaveToFile(blockchain_t* chain, const char* dirpath, uint256_t curren
fwrite(&difficultyTarget, sizeof(uint32_t), 1, metaFile);
fwrite(&currentReward, sizeof(uint64_t), 1, metaFile);
// Safety
fflush(metaFile);
fflush(chainFile);
fflush(tableFile);
// Close all pointers
fclose(metaFile);
fclose(chainFile);
fclose(tableFile);
if (rename(metaTmpPath, metaPath) != 0 || rename(chainTmpPath, chainPath) != 0 || rename(tableTmpPath, tablePath) != 0) {
pthread_rwlock_unlock(&chainLock);
remove(metaTmpPath);
remove(chainTmpPath);
remove(tableTmpPath);
return false;
}
pthread_rwlock_unlock(&chainLock);
return true;
}

View File

@@ -42,7 +42,23 @@ bool Transaction_Verify(const signed_transaction_t* tx) {
}
if (Address_IsCoinbase(tx->transaction.senderAddress)) {
// Coinbase transactions are valid if the signature is correct for the block (handled in Block_Verify)
if (tx->transaction.amount1 == 0) {
return false;
}
if (tx->transaction.amount2 != 0) {
return false;
}
if (Address_IsCoinbase(tx->transaction.recipientAddress1) || Address_IsCoinbase(tx->transaction.recipientAddress2)) {
return false;
}
uint8_t zeroAddress[32] = {0};
if (memcmp(tx->transaction.recipientAddress2, zeroAddress, 32) != 0) {
return false;
}
return true;
}

View File

@@ -11,7 +11,8 @@
#include <signal.h>
#include <balance_sheet.h>
#include <unistd.h>
#include <errno.h>
#include <txmempool.h>
#include <constants.h>
#include <runtime_state.h>
@@ -27,6 +28,9 @@
blockchain_t* currentChain = NULL;
const char* chainDataDir = CHAIN_DATA_DIR;
unsigned short listenPort = LISTEN_PORT;
bool echoPeersEnabled = ECHO_PEERS != 0;
bool forceOrphanReorgEnabled = false;
uint256_t currentSupply = {{0, 0, 0, 0}};
uint64_t currentReward = 750000000000ULL;
@@ -41,6 +45,32 @@ void handle_sigint(int sig) {
exit(0);
}
static void ApplyRuntimeConfigFromEnv(void) {
const char* dataDir = getenv("SKALACOIN_CHAIN_DATA_DIR");
if (dataDir && dataDir[0] != '\0') {
chainDataDir = dataDir;
}
const char* portStr = getenv("SKALACOIN_LISTEN_PORT");
if (portStr && portStr[0] != '\0') {
char* end = NULL;
long parsed = strtol(portStr, &end, 10);
if (end != portStr && *end == '\0' && parsed > 0 && parsed <= 65535) {
listenPort = (unsigned short)parsed;
}
}
const char* echoStr = getenv("SKALACOIN_ECHO_PEERS");
if (echoStr && echoStr[0] != '\0') {
echoPeersEnabled = (strcmp(echoStr, "0") != 0);
}
const char* forceOrphanStr = getenv("SKALACOIN_FORCE_ORPHAN_REORG");
if (forceOrphanStr && forceOrphanStr[0] != '\0') {
forceOrphanReorgEnabled = (strcmp(forceOrphanStr, "0") != 0);
}
}
uint32_t difficultyTarget = INITIAL_DIFFICULTY;
static bool MineBlock(block_t* block) {
@@ -117,6 +147,63 @@ static void AddCoinbaseTransaction(block_t* block, const uint8_t minerAddress[32
Block_AddTransaction(block, &coinbaseTx);
}
static int CompareTransactionPriority(const void* lhs, const void* rhs) {
const signed_transaction_t* left = (const signed_transaction_t*)lhs;
const signed_transaction_t* right = (const signed_transaction_t*)rhs;
if (left->transaction.fee > right->transaction.fee) {
return -1;
}
if (left->transaction.fee < right->transaction.fee) {
return 1;
}
uint8_t leftHash[32];
uint8_t rightHash[32];
Transaction_CalculateHash(left, leftHash);
Transaction_CalculateHash(right, rightHash);
return memcmp(leftHash, rightHash, sizeof(leftHash));
}
static bool BuildSpendableMempoolSelection(
signed_transaction_t** outAcceptedTxs,
size_t* outAcceptedCount,
uint64_t* outTotalFees
) {
if (!outAcceptedTxs || !outAcceptedCount || !outTotalFees) {
return false;
}
*outAcceptedTxs = NULL;
*outAcceptedCount = 0;
*outTotalFees = 0;
signed_transaction_t* snapshot = NULL;
size_t snapshotCount = 0;
if (!TxMempool_Snapshot(&snapshot, &snapshotCount)) {
return false;
}
if (snapshot && snapshotCount > 1) {
qsort(snapshot, snapshotCount, sizeof(signed_transaction_t), CompareTransactionPriority);
}
signed_transaction_t* acceptedTxs = NULL;
size_t acceptedCount = 0;
uint64_t totalFees = 0;
bool ok = BalanceSheet_SelectSpendableTransactions(snapshot, snapshotCount, &acceptedTxs, &acceptedCount, &totalFees);
free(snapshot);
if (!ok) {
free(acceptedTxs);
return false;
}
*outAcceptedTxs = acceptedTxs;
*outAcceptedCount = acceptedCount;
*outTotalFees = totalFees;
return true;
}
static void PrintBlockDetail(const block_t* block, size_t txCount, const uint8_t canonicalHash[32], const uint8_t powHash[32]) {
if (!block) {
return;
@@ -271,6 +358,46 @@ static bool ComputeHistoricalAutolykosHashFromDisk(const char* chainDataDir, uin
return ok;
}
static bool Block_GetCoinbaseAndFeeTotals(const block_t* block, uint64_t* outCoinbaseAmount, uint64_t* outTotalFees) {
if (!block || !block->transactions || !outCoinbaseAmount || !outTotalFees) {
return false;
}
bool hasCoinbase = false;
uint64_t coinbaseAmount = 0;
uint64_t totalFees = 0;
for (size_t i = 0; i < DynArr_size(block->transactions); ++i) {
signed_transaction_t* tx = (signed_transaction_t*)DynArr_at(block->transactions, i);
if (!tx) {
return false;
}
if (Address_IsCoinbase(tx->transaction.senderAddress)) {
if (hasCoinbase) {
return false;
}
hasCoinbase = true;
coinbaseAmount = tx->transaction.amount1;
continue;
}
if (UINT64_MAX - totalFees < tx->transaction.fee) {
return false;
}
totalFees += tx->transaction.fee;
}
if (!hasCoinbase) {
return false;
}
*outCoinbaseAmount = coinbaseAmount;
*outTotalFees = totalFees;
return true;
}
static bool MineAndAppendBlock(blockchain_t* chain,
block_t* block,
uint256_t* currentSupply,
@@ -294,6 +421,16 @@ static bool MineAndAppendBlock(blockchain_t* chain,
return false;
}
uint64_t coinbaseAmount = 0;
if (block->transactions && DynArr_size(block->transactions) > 0) {
signed_transaction_t* firstTx = (signed_transaction_t*)DynArr_at(block->transactions, 0);
if (firstTx && Address_IsCoinbase(firstTx->transaction.senderAddress)) {
coinbaseAmount = firstTx->transaction.amount1;
}
}
/* Debug proof removed: miner printed proof that coinbase == baseReward + totalFees during debugging. */
// After successfully appending a block, attempt to attach any orphans.
size_t attached = OrphanPool_AttemptAttach(chain);
if (attached > 0) {
@@ -303,14 +440,6 @@ static bool MineAndAppendBlock(blockchain_t* chain,
BalanceSheet_SaveToFile(chainDataDir);
}
uint64_t coinbaseAmount = 0;
if (block->transactions && DynArr_size(block->transactions) > 0) {
signed_transaction_t* firstTx = (signed_transaction_t*)DynArr_at(block->transactions, 0);
if (firstTx && Address_IsCoinbase(firstTx->transaction.senderAddress)) {
coinbaseAmount = firstTx->transaction.amount1;
}
}
(void)uint256_add_u64(currentSupply, coinbaseAmount);
uint8_t canonicalHash[32];
@@ -374,6 +503,7 @@ static bool VerifyChainFully(blockchain_t* chain) {
blockchain_t* prevChain = Chain_Create();
if (!prevChain) { return false; }
uint256_t replaySupply = uint256_from_u64(0);
uint32_t expectedDifficulty = INITIAL_DIFFICULTY;
for (size_t i = 0; i < chainSize; ++i) {
block_t* blk = NULL;
@@ -450,12 +580,31 @@ static bool VerifyChainFully(blockchain_t* chain) {
return false;
}
uint64_t expectedReward = 0;
uint64_t savedReward = currentReward;
expectedReward = CalculateBlockReward(replaySupply, prevChain);
currentReward = savedReward;
if (!Block_AllTransactionsValid(blk)) {
Block_Destroy(blk);
Chain_Destroy(prevChain);
return false;
}
uint64_t coinbaseAmount = 0;
uint64_t totalFees = 0;
if (!Block_GetCoinbaseAndFeeTotals(blk, &coinbaseAmount, &totalFees)) {
Block_Destroy(blk);
Chain_Destroy(prevChain);
return false;
}
if (UINT64_MAX - expectedReward < totalFees || coinbaseAmount != (expectedReward + totalFees)) {
Block_Destroy(blk);
Chain_Destroy(prevChain);
return false;
}
uint8_t expectedMerkle[32];
Block_CalculateMerkleRoot(blk, expectedMerkle);
if (memcmp(blk->header.merkleRoot, expectedMerkle, sizeof(expectedMerkle)) != 0) {
@@ -478,6 +627,8 @@ static bool VerifyChainFully(blockchain_t* chain) {
headerOnly.transactions = NULL;
(void)DynArr_push_back(prevChain->blocks, &headerOnly);
(void)uint256_add_u64(&replaySupply, coinbaseAmount);
Block_Destroy(blk);
}
@@ -485,6 +636,16 @@ static bool VerifyChainFully(blockchain_t* chain) {
return true;
}
// Use when error
void KillEverythingAndExit(net_node_t* node, blockchain_t* chain) {
Node_Destroy(node);
currentChain = NULL;
Chain_Destroy(chain);
Block_ShutdownPowContext();
BalanceSheet_Destroy();
exit(1);
}
int main(int argc, char* argv[]) {
//(void)argc;
//(void)argv;
@@ -509,6 +670,8 @@ int main(int argc, char* argv[]) {
}
}
ApplyRuntimeConfigFromEnv();
signal(SIGINT, handle_sigint);
srand((unsigned int)time(NULL));
@@ -537,6 +700,11 @@ int main(int argc, char* argv[]) {
uint8_t lastSavedHash[32] = {0};
if (!Chain_LoadFromFile(chain, chainDataDir, &currentSupply, &difficultyTarget, &currentReward, lastSavedHash, false)) {
printf("No existing chain loaded from %s\n", chainDataDir);
} else {
// Recompute runtime supply/reward from loaded blocks to avoid trusting stale meta values.
if (!Chain_RecomputeRuntimeState(chain)) {
fprintf(stderr, "Failed to recompute runtime state from loaded chain\n");
}
}
if (!BalanceSheet_LoadFromFile(chainDataDir)) {
@@ -576,9 +744,79 @@ int main(int argc, char* argv[]) {
}
}
// TODO: Separate loading into its own header
// Load the wallet from disk or generate new random identity
uint8_t minerAddress[32];
uint8_t minerPrivateKey[32];
uint8_t minerCompressedPubkey[33];
bool loadedWallet = false;
// Attempt load
char* path = "chain_data/wallet.data"; // TODO: Don't hardcode path
FILE* walletFile = fopen(path, "rb");
if (walletFile) {
size_t read = fread(minerPrivateKey, 1, 32, walletFile);
if (read != 32) {
fprintf(stderr, "failed to read wallet file\n");
fclose(walletFile);
}
read = fread(minerCompressedPubkey, 1, 33, walletFile);
if (read != 33) {
fprintf(stderr, "failed to read wallet file\n");
fclose(walletFile);
}
read = fread(minerAddress, 1, 32, walletFile);
if (read != 32) {
fprintf(stderr, "failed to read wallet file\n");
fclose(walletFile);
}
fclose(walletFile);
loadedWallet = true;
} else if (errno != ENOENT || errno != EISDIR || errno != EACCES || errno != EROFS || !loadedWallet) {
fprintf(stderr, "failed to open wallet file: %s\n generating new wallet...\n", strerror(errno));
if (!GenerateRandomTestAddress(minerAddress, minerPrivateKey, minerCompressedPubkey)) {
fprintf(stderr, "failed to generate test miner keypair\n");
KillEverythingAndExit(node, chain);
}
// Save the generated wallet to disk for future runs
walletFile = fopen(path, "wb");
if (!walletFile) {
fprintf(stderr, "failed to create wallet file: %s\n", strerror(errno));
KillEverythingAndExit(node, chain);
}
size_t written = fwrite(minerPrivateKey, 1, 32, walletFile);
if (written != 32) {
fprintf(stderr, "failed to write wallet file\n");
fclose(walletFile);
KillEverythingAndExit(node, chain);
}
written = fwrite(minerCompressedPubkey, 1, 33, walletFile);
if (written != 33) {
fprintf(stderr, "failed to write wallet file\n");
fclose(walletFile);
KillEverythingAndExit(node, chain);
}
written = fwrite(minerAddress, 1, 32, walletFile);
if (written != 32) {
fprintf(stderr, "failed to write wallet file\n");
fclose(walletFile);
KillEverythingAndExit(node, chain);
}
fclose(walletFile);
}
/*uint8_t minerAddress[32];
uint8_t minerPrivateKey[32];
uint8_t minerCompressedPubkey[33];
if (!GenerateTestMinerIdentity(minerPrivateKey, minerCompressedPubkey, minerAddress)) {
fprintf(stderr, "failed to generate test miner keypair\n");
Node_Destroy(node);
@@ -587,7 +825,7 @@ int main(int argc, char* argv[]) {
Block_ShutdownPowContext();
BalanceSheet_Destroy();
return 1;
}
}*/
char minerAddressHex[65];
AddressToHexString(minerAddress, minerAddressHex);
@@ -596,7 +834,7 @@ int main(int argc, char* argv[]) {
char supplyStr[80];
Uint256ToDecimal(&currentSupply, supplyStr, sizeof(supplyStr));
printf("Current chain has %zu blocks, total supply %s\n", Chain_Size(chain), supplyStr);
printf("Commands: mine <x>, send <address> <amount>, balance [address], connect <ipv4>, sync (requires nodes), flushchain, fullverify, blockdetail <block number>, wipechain, genaddr, exit\n");
printf("Commands: mine <x>, send <address> <amount> [fee], txpooldetail <txhash>, balance [address], connect <ipv4>, sync (requires nodes), flushchain, fullverify, blockdetail <block number>, wipechain, genaddr, exit\n");
char line[1024];
while (true) {
@@ -634,14 +872,38 @@ int main(int argc, char* argv[]) {
printf("Mining %llu block(s)...\n", requested);
bool minedAll = true;
for (unsigned long long i = 0; i < requested; ++i) {
block_t* block = BuildNextBlock(chain, difficultyTarget);
if (!block) {
fprintf(stderr, "failed to create block\n");
signed_transaction_t* acceptedTxs = NULL;
size_t acceptedTxCount = 0;
uint64_t totalFees = 0;
if (!BuildSpendableMempoolSelection(&acceptedTxs, &acceptedTxCount, &totalFees)) {
fprintf(stderr, "failed to select spendable transactions from mempool\n");
minedAll = false;
break;
}
AddCoinbaseTransaction(block, minerAddress, currentReward);
block_t* block = BuildNextBlock(chain, difficultyTarget);
if (!block) {
fprintf(stderr, "failed to create block\n");
free(acceptedTxs);
minedAll = false;
break;
}
uint64_t coinbaseAmount = currentReward;
if (UINT64_MAX - coinbaseAmount < totalFees) {
free(acceptedTxs);
Block_Destroy(block);
minedAll = false;
break;
}
coinbaseAmount += totalFees;
AddCoinbaseTransaction(block, minerAddress, coinbaseAmount);
for (size_t txIndex = 0; txIndex < acceptedTxCount; ++txIndex) {
Block_AddTransaction(block, &acceptedTxs[txIndex]);
}
free(acceptedTxs);
if (!MineAndAppendBlock(chain, block, &currentSupply, &currentReward, &difficultyTarget)) {
Block_Destroy(block);
@@ -651,6 +913,11 @@ int main(int argc, char* argv[]) {
free(block); // Chain stores block by value and owns copied transaction array.
// Broadcast newly mined block to outbound peers
if (node) {
Node_BroadcastChainRange(node, Chain_Size(chain) - 1, NULL);
}
if (i % 50 == 0) {
// Mid-mine flush
(void)FlushChainAndSheet(chain, chainDataDir, currentSupply, currentReward);
@@ -669,6 +936,7 @@ int main(int argc, char* argv[]) {
if (strcmp(cmd, "send") == 0) {
char* addressStr = strtok(NULL, " \t");
char* amountStr = strtok(NULL, " \t");
char* feeStr = strtok(NULL, " \t");
if (!addressStr || !amountStr) {
printf("usage: send <address> <amount>\n");
continue;
@@ -687,6 +955,21 @@ int main(int argc, char* argv[]) {
continue;
}
unsigned long long fee = 0;
if (feeStr) {
char* endptr2 = NULL;
fee = strtoull(feeStr, &endptr2, 10);
if (*feeStr == '\0' || feeStr[0] == '-' || (endptr2 && *endptr2 != '\0')) {
printf("invalid fee\n");
continue;
}
}
if (fee > UINT64_MAX - amount) {
printf("invalid fee: overflow\n");
continue;
}
balance_sheet_entry_t senderEntry;
if (!BalanceSheet_Lookup(minerAddress, &senderEntry)) {
printf("send failed: miner address has no balance\n");
@@ -705,12 +988,13 @@ int main(int argc, char* argv[]) {
continue;
}
AddCoinbaseTransaction(block, minerAddress, currentReward);
uint64_t coinbaseAmount = currentReward;
AddCoinbaseTransaction(block, minerAddress, coinbaseAmount);
signed_transaction_t spendTx;
Transaction_Init(&spendTx);
spendTx.transaction.version = 1;
spendTx.transaction.fee = 0;
spendTx.transaction.fee = (uint64_t)fee;
spendTx.transaction.amount1 = (uint64_t)amount;
spendTx.transaction.amount2 = 0;
memcpy(spendTx.transaction.senderAddress, minerAddress, sizeof(minerAddress));
@@ -719,6 +1003,7 @@ int main(int argc, char* argv[]) {
memcpy(spendTx.transaction.compressedPublicKey, minerCompressedPubkey, sizeof(minerCompressedPubkey));
Transaction_Sign(&spendTx, minerPrivateKey);
/*
Block_AddTransaction(block, &spendTx);
printf("Created transaction sending %llu pebble(s) to ", (unsigned long long)amount);
char recipientHex[65];
@@ -733,7 +1018,26 @@ int main(int argc, char* argv[]) {
FlushChainAndSheet(chain, chainDataDir, currentSupply, currentReward);
free(block);
if (node) {
Node_BroadcastChainRange(node, Chain_Size(chain) - 1, NULL);
}
printf("send committed in mined block\n");
*/
// Insert into txmempool
if (TxMempool_Insert(spendTx) < 0) {
printf("failed to add transaction to mempool, transaction rejected\n");
continue;
}
printf("transaction added to mempool, broadcasting...\n");
if (Node_BroadcastTransaction(node, &spendTx, NULL) == 0) {
printf("transaction broadcast to peers\n");
} else {
printf("failed to broadcast transaction to peers\n");
}
continue;
}
@@ -744,46 +1048,45 @@ int main(int argc, char* argv[]) {
}
// Choose the best outbound peer by advertised height
int bestIdx = -1;
uint64_t bestHeight = 0;
for (size_t i = 0; i < MAX_CONS; ++i) {
if (node->outboundClients[i].connection) {
if (node->outboundClients[i].peerBlockHeight > bestHeight) {
bestHeight = node->outboundClients[i].peerBlockHeight;
bestIdx = (int)i;
}
}
}
if (bestIdx < 0) {
tcp_connection_t* peerConn = NULL;
uint64_t peerHeight = 0;
if (Node_GetBestOutboundPeer(node, &peerConn, &peerHeight) != 0 || !peerConn) {
printf("no outbound peers to sync from\n");
continue;
}
uint64_t localHeight = (uint64_t)Chain_Size(chain);
uint64_t peerHeight = node->outboundClients[bestIdx].peerBlockHeight;
// Continue syncing in a loop until we've caught up to the peer or no progress is made.
bool madeProgressOverall = false;
while (true) {
uint64_t localHeight = (uint64_t)Chain_Size(chain);
// Determine if this is an initial sync. If so, do not apply penalty.
bool isInitialSync = (localHeight == 0);
// Only penalize small near-tip gaps. Large gaps are treated as normal catch-up,
// because a much taller peer on the same chain is not evidence of a reorg. TODO: Maybe look at this again some other day.
bool isInitialSync = (localHeight == 0) || ((peerHeight > localHeight) && ((peerHeight - localHeight) > INITIAL_SYNC_HEIGHT_DIFF));
// Compute penalty and adjusted peer height (skip penalty for initial sync)
// Compute penalty and adjusted peer height.
uint64_t delay = (peerHeight > localHeight) ? (peerHeight - localHeight) : 0ULL;
uint64_t penalty = isInitialSync ? 0ULL : FetchScheduler_ComputeReorgPenaltyBlocks(delay);
uint64_t adjustedPeerHeight = (peerHeight > penalty) ? (peerHeight - penalty) : 0ULL;
// Ensure we always make forward progress: if the penalty would reduce the
// target below our current height, fetch at least the next block. This
// lets us apply penalties for near-tip reorg risk while still allowing
// normal syncing when the peer is ahead by a small amount.
if (adjustedPeerHeight <= localHeight) {
printf("already synced (local=%" PRIu64 ", peer=%" PRIu64 ", penalty=%" PRIu64 ")\n", localHeight, peerHeight, penalty);
continue;
adjustedPeerHeight = localHeight + 1;
}
printf("syncing from peer %d: peerHeight=%" PRIu64 " adjusted=%" PRIu64 " local=%" PRIu64 " penalty=%" PRIu64 "\n",
bestIdx, peerHeight, adjustedPeerHeight, localHeight, penalty);
if (adjustedPeerHeight > peerHeight) {
adjustedPeerHeight = peerHeight;
}
tcp_connection_t* peerConn = node->outboundClients[bestIdx].connection;
printf("syncing: peerHeight=%" PRIu64 " adjusted=%" PRIu64 " local=%" PRIu64 " penalty=%" PRIu64 "\n",
peerHeight, adjustedPeerHeight, localHeight, penalty);
// Windowed parallel fetch
uint64_t start = localHeight;
uint64_t end = adjustedPeerHeight; // exclusive target height
// Windowed parallel fetch
uint64_t start = localHeight;
uint64_t end = adjustedPeerHeight; // exclusive target height
uint64_t nextReq = start;
const int maxInFlight = MAX_PARALLEL_FETCHES;
@@ -895,6 +1198,11 @@ int main(int argc, char* argv[]) {
break;
}
size_t reattached = OrphanPool_AttemptAttach(chain);
if (reattached > 0) {
printf("Reorg rollback attached %zu orphan(s)\n", reattached);
}
// Apply additional penalty by shrinking end and restart window from current Chain_Size
if (peerHeight > reorgPenalty) {
end = peerHeight - reorgPenalty;
@@ -968,10 +1276,76 @@ int main(int argc, char* argv[]) {
}
}
printf("sync complete: localHeight=%zu\n", Chain_Size(chain));
// After the window completes, check progress and possibly refresh peer height
uint64_t newLocal = (uint64_t)Chain_Size(chain);
if (newLocal > localHeight) madeProgressOverall = true;
printf("sync complete: localHeight=%" PRIu64 "\n", newLocal);
// If we've caught up to the peer, stop. Otherwise refresh peerHeight and loop again.
if (newLocal >= peerHeight) break;
// Refresh advertised peer height for this connection (it may have been updated during fetch)
pthread_mutex_lock(&node->outboundLock);
for (size_t i = 0; i < MAX_CONS; ++i) {
if (node->outboundClients[i].connection == peerConn) {
peerHeight = node->outboundClients[i].peerBlockHeight;
break;
}
}
pthread_mutex_unlock(&node->outboundLock);
// If no progress was made in this iteration, stop to avoid tight loop
if (!madeProgressOverall) {
break;
}
// Re-evaluate loop condition: continue while local < peerHeight
if ((uint64_t)Chain_Size(chain) >= peerHeight) break;
continue;
}
if (strcmp(cmd, "txpooldetail") == 0) {
char* hashStr = strtok(NULL, " \t");
if (!hashStr) {
printf("usage: txpooldetail <txhash>\n");
continue;
}
uint8_t txHash[32];
if (!ParseHexAddress32(hashStr, txHash)) {
printf("invalid tx hash: expected 64 hex chars\n");
continue;
}
signed_transaction_t tx;
if (!TxMempool_Lookup(txHash, &tx)) {
printf("transaction not found in mempool\n");
continue;
}
char senderHex[65];
char recip1Hex[65];
char recip2Hex[65];
AddressToHexString(tx.transaction.senderAddress, senderHex);
AddressToHexString(tx.transaction.recipientAddress1, recip1Hex);
AddressToHexString(tx.transaction.recipientAddress2, recip2Hex);
uint8_t calcHash[32];
Transaction_CalculateHash(&tx, calcHash);
printf("Transaction details:\n");
printf(" TxHash: "); PrintHexBytes(calcHash, 32); printf("\n");
printf(" Sender: %s%s\n", senderHex, Address_IsCoinbase(tx.transaction.senderAddress) ? " (coinbase)" : "");
printf(" Recipient1: %s\n", recip1Hex);
printf(" Recipient2: %s\n", recip2Hex);
printf(" Amount1: %llu\n", (unsigned long long)tx.transaction.amount1);
printf(" Amount2: %llu\n", (unsigned long long)tx.transaction.amount2);
printf(" Fee: %llu\n", (unsigned long long)tx.transaction.fee);
continue;
}
}
if (strcmp(cmd, "blockdetail") == 0) {
char* blockNumberStr = strtok(NULL, " \t");
char* extra = strtok(NULL, " \t");
@@ -1050,9 +1424,10 @@ int main(int argc, char* argv[]) {
if (strcmp(cmd, "connect") == 0) {
char* ipStr = strtok(NULL, " \t");
char* portStr = strtok(NULL, " \t");
char* extra = strtok(NULL, " \t");
if (!ipStr || extra) {
printf("usage: connect <ipv4>\n");
printf("usage: connect <ipv4> [port]\n");
continue;
}
@@ -1061,12 +1436,31 @@ int main(int argc, char* argv[]) {
continue;
}
if (Node_ConnectPeer(node, ipStr, LISTEN_PORT) != 0) {
printf("failed to connect to %s:%u\n", ipStr, (unsigned int)LISTEN_PORT);
unsigned short peerPort = listenPort;
if (portStr) {
char* end = NULL;
long parsedPort = strtol(portStr, &end, 10);
if (*portStr == '\0' || portStr[0] == '-' || (end && *end != '\0') || parsedPort <= 0 || parsedPort > 65535) {
printf("invalid port\n");
continue;
}
peerPort = (unsigned short)parsedPort;
if (strtok(NULL, " \t")) {
printf("usage: connect <ipv4> [port]\n");
continue;
}
}
if (Node_ConnectPeer(node, ipStr, peerPort) != 0) {
if (errno == ETIMEDOUT) {
printf("failed to connect to %s:%u (timeout)\n", ipStr, (unsigned int)peerPort);
} else {
printf("failed to connect to %s:%u\n", ipStr, (unsigned int)peerPort);
}
continue;
}
printf("connect requested to %s:%u\n", ipStr, (unsigned int)LISTEN_PORT);
printf("connect requested to %s:%u\n", ipStr, (unsigned int)peerPort);
continue;
}
@@ -1128,7 +1522,7 @@ int main(int argc, char* argv[]) {
if (strcmp(cmd, "genaddr") == 0) {
uint8_t testAddress[32];
if (!GenerateRandomTestAddress(testAddress)) {
if (!GenerateRandomTestAddress(testAddress, NULL, NULL)) {
printf("failed to generate address\n");
continue;
}

View File

@@ -11,6 +11,7 @@
#include <inttypes.h>
#include <pthread.h>
#include <unistd.h>
#include <txmempool.h>
static net_node_t* Node_FromConnection(tcp_connection_t* conn) {
if (!conn) {
@@ -28,6 +29,12 @@ static uint64_t Node_GetCurrentBlockHeight(void) {
return currentBlockHeight;
}
typedef enum {
NODE_BLOCK_REJECTED = 0,
NODE_BLOCK_ORPHAN_QUEUED = 1,
NODE_BLOCK_ACCEPTED = 2
} node_block_accept_result_t;
static void* Node_MaintenanceThread(void* arg) {
net_node_t* n = (net_node_t*)arg;
if (!n) return NULL;
@@ -61,18 +68,18 @@ static int Node_DecodePacket(const tcp_connection_t* conn, packet_type_t* outTyp
return 0;
}
static bool Node_ParseAndAcceptBlock(const unsigned char* payload, size_t payloadLen, bool persist) {
if (!payload) { return false; }
static node_block_accept_result_t Node_ParseAndAcceptBlock(const unsigned char* payload, size_t payloadLen, bool persist) {
if (!payload) { return NODE_BLOCK_REJECTED; }
size_t offset = 0;
if (payloadLen < sizeof(uint64_t) + sizeof(block_header_t) + sizeof(uint64_t)) { return false; }
if (payloadLen < sizeof(uint64_t) + sizeof(block_header_t) + sizeof(uint64_t)) { return NODE_BLOCK_REJECTED; }
uint64_t blockHeight = 0;
memcpy(&blockHeight, payload + offset, sizeof(blockHeight));
offset += sizeof(blockHeight);
block_t* blk = (block_t*)calloc(1, sizeof(block_t));
if (!blk) { return false; }
if (!blk) { return NODE_BLOCK_REJECTED; }
memcpy(&blk->header, payload + offset, sizeof(blk->header));
blk->header.blockNumber = blockHeight;
@@ -83,13 +90,13 @@ static bool Node_ParseAndAcceptBlock(const unsigned char* payload, size_t payloa
offset += sizeof(txCount);
blk->transactions = DYNARR_CREATE(signed_transaction_t, txCount == 0 ? 1 : (size_t)txCount);
if (!blk->transactions) { free(blk); return false; }
if (!blk->transactions) { free(blk); return NODE_BLOCK_REJECTED; }
for (uint64_t i = 0; i < txCount; ++i) {
if (offset + sizeof(signed_transaction_t) > payloadLen) {
DynArr_destroy(blk->transactions);
free(blk);
return false;
return NODE_BLOCK_REJECTED;
}
signed_transaction_t tx;
memcpy(&tx, payload + offset, sizeof(tx));
@@ -97,7 +104,7 @@ static bool Node_ParseAndAcceptBlock(const unsigned char* payload, size_t payloa
if (!DynArr_push_back(blk->transactions, &tx)) {
DynArr_destroy(blk->transactions);
free(blk);
return false;
return NODE_BLOCK_REJECTED;
}
}
@@ -106,28 +113,58 @@ static bool Node_ParseAndAcceptBlock(const unsigned char* payload, size_t payloa
printf("Rejected BLOCK_DATA at height %" PRIu64 " during validation\n", blockHeight);
DynArr_destroy(blk->transactions);
free(blk);
return false;
return NODE_BLOCK_REJECTED;
}
if (!currentChain) {
printf("Rejected BLOCK_DATA at height %" PRIu64 ": no active chain\n", blockHeight);
DynArr_destroy(blk->transactions);
free(blk);
return false;
return NODE_BLOCK_REJECTED;
}
// Temporary debug mode: force network-received blocks through the orphan pool to exercise reorg handling.
if (forceOrphanReorgEnabled && blk->header.blockNumber > 0) {
OrphanPool_Insert(blk, blockHeight);
printf("Forced orphan BLOCK_DATA at height %" PRIu64 "\n", blockHeight);
return NODE_BLOCK_ORPHAN_QUEUED;
}
// If parent is missing, insert into orphan pool instead of rejecting immediately.
if (blk->header.blockNumber > 0) {
uint64_t parentIndex = blk->header.blockNumber - 1;
block_t* parentCopy = NULL;
if (parentIndex >= Chain_Size(currentChain) || !Chain_GetBlockCopy(currentChain, (size_t)parentIndex, &parentCopy) || !parentCopy) {
// Insert into orphan pool and take ownership of blk
OrphanPool_Insert(blk, blockHeight);
if (parentCopy) Block_Destroy(parentCopy);
printf("Queued orphan BLOCK_DATA at height %" PRIu64 "\n", blockHeight);
return true;
uint64_t chainSize = Chain_Size(currentChain);
if (blk->header.blockNumber > chainSize) {
// Parent(s) missing; queue as orphan
OrphanPool_Insert(blk, blockHeight);
printf("Queued orphan BLOCK_DATA at height %" PRIu64 "\n", blockHeight);
return NODE_BLOCK_ORPHAN_QUEUED;
} else if (blk->header.blockNumber < chainSize) {
// Older block than current chain tip: reject
printf("Rejected BLOCK_DATA at height %" PRIu64 ": older than current chain\n", blockHeight);
DynArr_destroy(blk->transactions);
free(blk);
return NODE_BLOCK_REJECTED;
} else {
// blk->header.blockNumber == chainSize -> candidate to append. Ensure prevHash matches current tip.
if (chainSize > 0) {
block_t* last = NULL;
if (!Chain_GetBlockCopy(currentChain, (size_t)(chainSize - 1), &last) || !last) {
// Can't verify parent; queue as orphan conservatively
OrphanPool_Insert(blk, blockHeight);
printf("Queued orphan BLOCK_DATA at height %" PRIu64 " (unable to verify parent)\n", blockHeight);
if (last) Block_Destroy(last);
return NODE_BLOCK_ORPHAN_QUEUED;
}
uint8_t lastHash[32];
Block_CalculateHash(last, lastHash);
if (memcmp(lastHash, blk->header.prevHash, 32) != 0) {
// Conflicting block at same height; queue as orphan until resolved by a subsequent extension.
OrphanPool_Insert(blk, blockHeight);
Block_Destroy(last);
printf("Queued conflicting BLOCK_DATA at same height %" PRIu64 " as orphan\n", blockHeight);
return NODE_BLOCK_ORPHAN_QUEUED;
}
Block_Destroy(last);
}
Block_Destroy(parentCopy);
}
if (!Chain_AddBlock(currentChain, blk)) {
@@ -137,9 +174,23 @@ static bool Node_ParseAndAcceptBlock(const unsigned char* payload, size_t payloa
DynArr_destroy(blk->transactions);
}
free(blk);
return false;
return NODE_BLOCK_REJECTED;
}
uint64_t coinbaseAmount = 0;
if (blk->transactions) {
for (size_t i = 0; i < DynArr_size(blk->transactions); ++i) {
signed_transaction_t* tx = (signed_transaction_t*)DynArr_at(blk->transactions, i);
if (tx && Address_IsCoinbase(tx->transaction.senderAddress)) {
coinbaseAmount = tx->transaction.amount1;
break;
}
}
}
(void)uint256_add_u64(&currentSupply, coinbaseAmount);
currentReward = CalculateBlockReward(currentSupply, currentChain);
// Persist on accept if requested
if (persist) {
Chain_SaveToFile(currentChain, chainDataDir, currentSupply, currentReward);
@@ -156,7 +207,7 @@ static bool Node_ParseAndAcceptBlock(const unsigned char* payload, size_t payloa
Chain_SaveToFile(currentChain, chainDataDir, currentSupply, currentReward);
BalanceSheet_SaveToFile(chainDataDir);
}
return true;
return NODE_BLOCK_ACCEPTED;
}
static void Node_ForwardConnect(net_node_t* node, tcp_connection_t* conn) {
@@ -198,7 +249,13 @@ net_node_t* Node_Create() {
}
}
TcpServer_Init(node->server, LISTEN_PORT, "0.0.0.0");
// Initialize outbound lock and seen-block cache
pthread_mutex_init(&node->seenLock, NULL);
pthread_mutex_init(&node->outboundLock, NULL);
node->seenBlocks = DynSet_Create(32); // 32-byte canonical hashes
TxMempool_Init();
TcpServer_Init(node->server, listenPort, "0.0.0.0");
node->server->owner = node;
node->server->on_connect = Node_Server_OnConnect;
@@ -242,6 +299,14 @@ void Node_Destroy(net_node_t* node) {
}
OrphanPool_Destroy();
TxMempool_Destroy();
if (node->seenBlocks) {
DynSet_Destroy(node->seenBlocks);
node->seenBlocks = NULL;
}
pthread_mutex_destroy(&node->seenLock);
pthread_mutex_destroy(&node->outboundLock);
free(node);
}
@@ -336,10 +401,65 @@ int Node_SendPacket(net_node_t* node, tcp_connection_t* conn, packet_type_t pack
return rc;
}
int Node_BroadcastTransaction(net_node_t* node, signed_transaction_t* tx, tcp_connection_t* excludeNode) {
if (!node || !tx) {
return -1;
}
// Serialize transaction into payload
size_t payloadLen = sizeof(signed_transaction_t);
unsigned char* payload = (unsigned char*)malloc(payloadLen);
if (!payload) {
return -1;
}
memcpy(payload, tx, sizeof(signed_transaction_t));
// Broadcast to all outbound peers
pthread_mutex_lock(&node->outboundLock);
for (size_t i = 0; i < MAX_CONS; ++i) {
tcp_connection_t* connection = node->outboundClients[i].connection;
if (connection && connection != excludeNode) {
(void)Node_SendPacket(node, connection, PACKET_TYPE_BROADCAST_TX, payload, payloadLen);
}
}
pthread_mutex_unlock(&node->outboundLock);
free(payload);
return 0;
}
void Node_Server_OnConnect(tcp_connection_t* client) {
net_node_t* node = Node_FromConnection(client);
Node_ForwardConnect(node, client);
printf("Inbound node connected: %u\n", client ? client->connectionId : 0U);
if (echoPeersEnabled && node && client) {
// Attempt to create an outbound connection back to the peer's IP on our configured port.
// We avoid connecting if we already have an outbound to the same IP.
char ipbuf[INET_ADDRSTRLEN];
if (inet_ntop(AF_INET, &client->peerAddr.sin_addr, ipbuf, sizeof(ipbuf))) {
// Use the configured port as the target port for the peer's listening service.
unsigned short targetPort = listenPort;
int shouldConnect = 1;
pthread_mutex_lock(&node->outboundLock);
for (size_t i = 0; i < MAX_CONS; ++i) {
if (node->outboundClients[i].connection) {
struct in_addr otherAddr = node->outboundClients[i].connection->peerAddr.sin_addr;
if (otherAddr.s_addr == client->peerAddr.sin_addr.s_addr) {
shouldConnect = 0;
break;
}
}
}
pthread_mutex_unlock(&node->outboundLock);
if (shouldConnect) {
// Try to connect; ignore failure silently
(void)Node_ConnectPeer(node, ipbuf, targetPort);
}
}
}
}
void Node_Server_OnData(tcp_connection_t* client) {
@@ -498,15 +618,67 @@ void Node_Server_OnData(tcp_connection_t* client) {
}
case PACKET_TYPE_BROADCAST_BLOCK: {
// Accept broadcast blocks from peers and try to append
if (Node_ParseAndAcceptBlock(payload, payloadLen, true)) {
printf("Accepted BROADCAST_BLOCK from node %u\n", client ? client->connectionId : 0U);
} else {
printf("Rejected BROADCAST_BLOCK from node %u\n", client ? client->connectionId : 0U);
if (payloadLen >= sizeof(uint64_t)) {
uint64_t blockHeight = 0;
memcpy(&blockHeight, payload, sizeof(blockHeight));
node_block_accept_result_t result = Node_ParseAndAcceptBlock(payload, payloadLen, true);
if (result == NODE_BLOCK_ACCEPTED) {
printf("Accepted BROADCAST_BLOCK from node %u\n", client ? client->connectionId : 0U);
net_node_t* node = Node_FromConnection(client);
if (node) {
Node_BroadcastChainRange(node, (size_t)blockHeight, client);
}
} else if (result == NODE_BLOCK_ORPHAN_QUEUED) {
printf("Queued orphan BROADCAST_BLOCK from node %u\n", client ? client->connectionId : 0U);
} else {
printf("Rejected BROADCAST_BLOCK from node %u\n", client ? client->connectionId : 0U);
}
}
break;
}
case PACKET_TYPE_ACK_BLOCK:
case PACKET_TYPE_BROADCAST_TX:
case PACKET_TYPE_BROADCAST_TX: {
// Decode the block or transaction data inside
if (payloadLen == sizeof(signed_transaction_t)) {
signed_transaction_t tx;
memcpy(&tx, payload, sizeof(tx));
uint8_t txHash[32];
char txHashHex[65];
Transaction_CalculateHash(&tx, txHash);
to_hex(txHash, txHashHex);
printf("Received packet type %u from node %u with transaction sending %llu pebble(s)\n",
(unsigned int)packetType, client ? client->connectionId : 0U, (unsigned long long)tx.transaction.amount1);
if (!Transaction_Verify(&tx)) {
printf("Received invalid transaction from node %u\n", client ? client->connectionId : 0U);
return;
}
// Push to mempool if it's not already present
if (!TxMempool_Lookup(txHash, &tx)) {
if (TxMempool_Insert(tx) >= 0) {
printf("Added transaction %s from node %u to mempool\n", txHashHex, client ? client->connectionId : 0U);
// Broadcast to other peers
net_node_t* node = Node_FromConnection(client);
if (node) {
Node_BroadcastTransaction(node, &tx, client);
}
} else {
printf("Failed to add transaction %s from node %u to mempool\n", txHashHex, client ? client->connectionId : 0U);
}
} else {
printf("Transaction %s from node %u already seen!\n", txHashHex, client ? client->connectionId : 0U);
}
} else {
printf("Received packet type %u from node %u with invalid payload length %zu\n",
(unsigned int)packetType, client ? client->connectionId : 0U, payloadLen);
// TODO: Ignoring for now, might error node later if we want to be strict about malformed messages
}
break;
}
case PACKET_TYPE_ACK_TX:
case PACKET_TYPE_ERROR: {
// Decode the message inside as text
@@ -594,12 +766,14 @@ void Node_Client_OnData(tcp_connection_t* client) {
// Store peer-advertised height on matching outbound client
net_node_t* node = Node_FromConnection(client);
if (node) {
pthread_mutex_lock(&node->outboundLock);
for (size_t i = 0; i < MAX_CONS; ++i) {
if (node->outboundClients[i].connection == client) {
node->outboundClients[i].peerBlockHeight = blockHeight;
break;
}
}
pthread_mutex_unlock(&node->outboundLock);
}
break;
}
@@ -614,23 +788,74 @@ void Node_Client_OnData(tcp_connection_t* client) {
return;
}
case PACKET_TYPE_BLOCK_DATA: {
if (Node_ParseAndAcceptBlock(payload, payloadLen, true)) {
printf("Accepted BLOCK_DATA from node %u\n", client ? client->connectionId : 0U);
} else {
printf("Rejected BLOCK_DATA from node %u\n", client ? client->connectionId : 0U);
if (payloadLen >= sizeof(uint64_t)) {
uint64_t blockHeight = 0;
memcpy(&blockHeight, payload, sizeof(blockHeight));
node_block_accept_result_t result = Node_ParseAndAcceptBlock(payload, payloadLen, true);
if (result == NODE_BLOCK_ACCEPTED) {
printf("Accepted BLOCK_DATA from node %u\n", client ? client->connectionId : 0U);
net_node_t* node = Node_FromConnection(client);
if (node) {
// Update peer advertised height
pthread_mutex_lock(&node->outboundLock);
for (size_t i = 0; i < MAX_CONS; ++i) {
if (node->outboundClients[i].connection == client) {
if (node->outboundClients[i].peerBlockHeight < blockHeight) {
node->outboundClients[i].peerBlockHeight = blockHeight;
}
break;
}
}
pthread_mutex_unlock(&node->outboundLock);
Node_BroadcastChainRange(node, (size_t)blockHeight, client);
}
} else if (result == NODE_BLOCK_ORPHAN_QUEUED) {
printf("Queued orphan BLOCK_DATA from node %u\n", client ? client->connectionId : 0U);
} else {
printf("Rejected BLOCK_DATA from node %u\n", client ? client->connectionId : 0U);
}
}
break;
}
case PACKET_TYPE_BROADCAST_BLOCK: {
if (Node_ParseAndAcceptBlock(payload, payloadLen, true)) {
printf("Accepted BROADCAST_BLOCK from node %u\n", client ? client->connectionId : 0U);
} else {
printf("Rejected BROADCAST_BLOCK from node %u\n", client ? client->connectionId : 0U);
if (payloadLen >= sizeof(uint64_t)) {
uint64_t blockHeight = 0;
memcpy(&blockHeight, payload, sizeof(blockHeight));
node_block_accept_result_t result = Node_ParseAndAcceptBlock(payload, payloadLen, true);
if (result == NODE_BLOCK_ACCEPTED) {
printf("Accepted BROADCAST_BLOCK from node %u\n", client ? client->connectionId : 0U);
net_node_t* node = Node_FromConnection(client);
if (node) {
// Update peer advertised height
pthread_mutex_lock(&node->outboundLock);
for (size_t i = 0; i < MAX_CONS; ++i) {
if (node->outboundClients[i].connection == client) {
if (node->outboundClients[i].peerBlockHeight < blockHeight) {
node->outboundClients[i].peerBlockHeight = blockHeight;
}
break;
}
}
pthread_mutex_unlock(&node->outboundLock);
Node_BroadcastChainRange(node, (size_t)blockHeight, client);
}
} else if (result == NODE_BLOCK_ORPHAN_QUEUED) {
printf("Queued orphan BROADCAST_BLOCK from node %u\n", client ? client->connectionId : 0U);
} else {
printf("Rejected BROADCAST_BLOCK from node %u\n", client ? client->connectionId : 0U);
}
}
break;
}
case PACKET_TYPE_ACK_BLOCK:
case PACKET_TYPE_BROADCAST_TX:
case PACKET_TYPE_BROADCAST_TX: {
// Client can't receive these!
printf("Received unexpected packet type %u from node %u\n", (unsigned int)packetType, client ? client->connectionId : 0U);
break;
}
case PACKET_TYPE_ACK_TX:
case PACKET_TYPE_ERROR: {
// Decode the message inside as text
@@ -656,10 +881,128 @@ void Node_Client_OnData(tcp_connection_t* client) {
void Node_Client_OnDisconnect(tcp_connection_t* client) {
net_node_t* node = Node_FromConnection(client);
if (node && node->outboundCount > 0) {
node->outboundCount--;
if (node) {
// Clear peer advertised height for this outbound slot
pthread_mutex_lock(&node->outboundLock);
for (size_t i = 0; i < MAX_CONS; ++i) {
if (node->outboundClients[i].connection == client) {
node->outboundClients[i].peerBlockHeight = 0;
break;
}
}
pthread_mutex_unlock(&node->outboundLock);
if (node->outboundCount > 0) {
node->outboundCount--;
}
}
Node_ForwardDisconnect(node, client);
printf("Outbound node disconnected: %u\n", client ? client->connectionId : 0U);
}
int Node_GetBestOutboundPeer(net_node_t* node, tcp_connection_t** outConn, uint64_t* outHeight) {
if (!node || !outConn || !outHeight) return -1;
tcp_connection_t* best = NULL;
uint64_t bestH = 0;
pthread_mutex_lock(&node->outboundLock);
for (size_t i = 0; i < MAX_CONS; ++i) {
if (node->outboundClients[i].connection) {
if (node->outboundClients[i].peerBlockHeight > bestH || best == NULL) {
best = node->outboundClients[i].connection;
bestH = node->outboundClients[i].peerBlockHeight;
}
}
}
pthread_mutex_unlock(&node->outboundLock);
if (!best) return -1;
*outConn = best;
*outHeight = bestH;
return 0;
}
void Node_BroadcastChainRange(net_node_t* node, size_t startHeightInclusive, tcp_connection_t* sourceConn) {
if (!node || !currentChain) return;
size_t chainSize = Chain_Size(currentChain);
if (startHeightInclusive >= chainSize) return;
uint32_t sourceIp = 0;
if (sourceConn) {
sourceIp = sourceConn->peerAddr.sin_addr.s_addr;
}
for (size_t h = startHeightInclusive; h < chainSize; ++h) {
block_t* blk = NULL;
if (!Chain_GetBlockCopy(currentChain, h, &blk) || !blk) {
if (!Chain_LoadBlockFromFile(chainDataDir, h, true, &blk, NULL) || !blk) {
continue;
}
} else if (!blk->transactions) {
block_t* full = NULL;
if (Chain_LoadBlockFromFile(chainDataDir, h, true, &full, NULL) && full) {
Block_Destroy(blk);
blk = full;
}
}
if (!blk || !blk->transactions) {
if (blk) Block_Destroy(blk);
continue;
}
unsigned char hash[32];
Block_CalculateHash(blk, hash);
// Dedupe using seenBlocks
int seen = 0;
pthread_mutex_lock(&node->seenLock);
if (DynSet_Contains(node->seenBlocks, hash)) {
seen = 1;
} else {
DynSet_Insert(node->seenBlocks, hash);
}
pthread_mutex_unlock(&node->seenLock);
if (seen) {
Block_Destroy(blk);
continue;
}
// Serialize payload: [uint64_t height][block_header_t][uint64_t txCount][transactions...]
size_t txCount = DynArr_size(blk->transactions);
size_t payloadLen = sizeof(uint64_t) + sizeof(block_header_t) + sizeof(uint64_t) + (txCount * sizeof(signed_transaction_t));
unsigned char* payload = (unsigned char*)malloc(payloadLen);
if (!payload) {
Block_Destroy(blk);
continue;
}
size_t off = 0;
uint64_t h64 = (uint64_t)h;
memcpy(payload + off, &h64, sizeof(h64)); off += sizeof(h64);
memcpy(payload + off, &blk->header, sizeof(block_header_t)); off += sizeof(block_header_t);
uint64_t txCount64 = (uint64_t)txCount;
memcpy(payload + off, &txCount64, sizeof(txCount64)); off += sizeof(txCount64);
for (size_t ti = 0; ti < txCount; ++ti) {
signed_transaction_t* tx = (signed_transaction_t*)DynArr_at(blk->transactions, ti);
memcpy(payload + off, tx, sizeof(signed_transaction_t)); off += sizeof(signed_transaction_t);
}
// Snapshot outbound clients and send
pthread_mutex_lock(&node->outboundLock);
for (size_t i = 0; i < MAX_CONS; ++i) {
tcp_connection_t* conn = node->outboundClients[i].connection;
if (!conn) continue;
if (conn == sourceConn) continue;
if (sourceIp != 0 && conn->peerAddr.sin_addr.s_addr == sourceIp) continue;
Node_SendPacket(node, conn, PACKET_TYPE_BROADCAST_BLOCK, payload, off);
}
pthread_mutex_unlock(&node->outboundLock);
free(payload);
Block_Destroy(blk);
}
}

View File

@@ -38,6 +38,71 @@ void OrphanPool_Insert(block_t* block, uint64_t height) {
(void)DynArr_push_back(g_orphans, &e);
}
static size_t OrphanPool_TryAdoptBranch(blockchain_t* chain, uint64_t forkHeight) {
if (!g_orphans || !chain) return 0;
DynArr* seq = DYNARR_CREATE(block_t*, 8);
if (!seq) return 0;
size_t cursor = forkHeight;
while (1) {
bool found = false;
size_t count = DynArr_size(g_orphans);
for (size_t i = 0; i < count; ++i) {
orphan_entry_t* entry = (orphan_entry_t*)DynArr_at(g_orphans, i);
if (!entry || !entry->block) continue;
if (entry->height == cursor) {
(void)DynArr_push_back(seq, &entry->block);
found = true;
break;
}
}
if (!found) break;
cursor++;
}
size_t seqCount = DynArr_size(seq);
if (seqCount == 0) {
DynArr_destroy(seq);
return 0;
}
size_t currentTipHeight = Chain_Size(chain) == 0 ? 0 : Chain_Size(chain) - 1;
size_t seqTopHeight = forkHeight + seqCount - 1;
if (seqTopHeight <= currentTipHeight) {
DynArr_destroy(seq);
return 0;
}
size_t rollbackHeight = (forkHeight == 0) ? 0 : (forkHeight - 1);
if (!Chain_RollbackToHeight(chain, rollbackHeight)) {
DynArr_destroy(seq);
return 0;
}
size_t attached = 0;
for (size_t i = 0; i < seqCount; ++i) {
block_t* bptr = *(block_t**)DynArr_at(seq, i);
if (!bptr || !Chain_AddBlock(chain, bptr)) {
break;
}
size_t count = DynArr_size(g_orphans);
for (size_t j = 0; j < count; ++j) {
orphan_entry_t* entry = (orphan_entry_t*)DynArr_at(g_orphans, j);
if (entry && entry->block == bptr) {
DynArr_remove(g_orphans, j);
break;
}
}
attached++;
}
DynArr_destroy(seq);
return attached;
}
size_t OrphanPool_AttemptAttach(blockchain_t* chain) {
if (!g_orphans || !chain) return 0;
size_t attached = 0;
@@ -67,6 +132,60 @@ size_t OrphanPool_AttemptAttach(blockchain_t* chain) {
}
if (parentExists) {
if (e->height < Chain_Size(chain)) {
block_t* local = NULL;
if (Chain_GetBlockCopy(chain, (size_t)e->height, &local) && local) {
uint8_t localHash[32];
uint8_t orphanHash[32];
Block_CalculateHash(local, localHash);
Block_CalculateHash(e->block, orphanHash);
Block_Destroy(local);
if (memcmp(localHash, orphanHash, 32) != 0) {
size_t adopted = OrphanPool_TryAdoptBranch(chain, e->height);
if (adopted > 0) {
attached += adopted;
madeProgress = true;
n = DynArr_size(g_orphans);
i = (size_t)-1;
break;
}
}
} else if (local) {
Block_Destroy(local);
}
}
// Verify that the parent's hash matches the orphan's prevHash before attaching.
bool parentMatches = false;
if (e->height == 0) {
parentMatches = (Chain_Size(chain) == 0);
} else {
block_t* parent = NULL;
if (Chain_GetBlockCopy(chain, (size_t)parentIndex, &parent) && parent) {
uint8_t parentHash[32];
Block_CalculateHash(parent, parentHash);
parentMatches = (memcmp(parentHash, e->block->header.prevHash, 32) == 0);
Block_Destroy(parent);
} else {
parentMatches = false;
}
}
if (!parentMatches) {
// Parent exists but does not match this orphan's prevHash.
size_t adopted = OrphanPool_TryAdoptBranch(chain, e->height);
if (adopted > 0) {
attached += adopted;
madeProgress = true;
n = DynArr_size(g_orphans);
i = (size_t)-1;
break;
}
continue;
}
// Try to add to chain
if (Chain_AddBlock(chain, e->block)) {
attached++;
@@ -78,13 +197,8 @@ size_t OrphanPool_AttemptAttach(blockchain_t* chain) {
i = (size_t)-1; // reset outer loop
break;
} else {
// Chain_AddBlock rejected it (maybe invalid). Drop it.
Block_Destroy(e->block);
DynArr_remove(g_orphans, i);
n = DynArr_size(g_orphans);
i = (size_t)-1;
madeProgress = true;
break;
// Keep the orphan around; rejection may be temporary while the local tip is being reorged.
continue;
}
}
}

View File

@@ -9,6 +9,8 @@
#include <string.h>
#include <sys/socket.h>
#include <unistd.h>
#include <fcntl.h>
#include <sys/select.h>
static void* TcpClient_ThreadProc(void* arg) {
tcp_client_t* client = (tcp_client_t*)arg;
@@ -95,11 +97,52 @@ int TcpClient_Connect(
return -1;
}
if (connect(sockFd, (struct sockaddr*)&peerAddr, sizeof(peerAddr)) < 0) {
close(sockFd);
return -1;
// Use non-blocking connect with a timeout to avoid long blocking in the CLI.
int flags = fcntl(sockFd, F_GETFL, 0);
if (flags == -1) flags = 0;
fcntl(sockFd, F_SETFL, flags | O_NONBLOCK);
int rc = connect(sockFd, (struct sockaddr*)&peerAddr, sizeof(peerAddr));
if (rc < 0) {
if (errno != EINPROGRESS) {
close(sockFd);
return -1;
}
// Wait up to 5 seconds for the socket to become writable (connected)
struct timeval tv;
tv.tv_sec = 5;
tv.tv_usec = 0;
fd_set wfds;
FD_ZERO(&wfds);
FD_SET(sockFd, &wfds);
int sel = select(sockFd + 1, NULL, &wfds, NULL, &tv);
if (sel <= 0) {
// timeout or error
if (sel == 0) {
errno = ETIMEDOUT;
}
close(sockFd);
return -1;
}
// Check for socket error
int so_error = 0;
socklen_t len = sizeof(so_error);
if (getsockopt(sockFd, SOL_SOCKET, SO_ERROR, &so_error, &len) < 0) {
close(sockFd);
return -1;
}
if (so_error != 0) {
errno = so_error;
close(sockFd);
return -1;
}
}
// Restore blocking mode
fcntl(sockFd, F_SETFL, flags & ~O_NONBLOCK);
tcp_connection_t* conn = (tcp_connection_t*)malloc(sizeof(*conn));
if (!conn) {
close(sockFd);

View File

@@ -214,10 +214,26 @@ int TcpConnection_SendFramed(tcp_connection_t* conn, const void* payload, size_t
pthread_mutex_lock(&conn->sendLock);
#ifdef USE_IPV6
int sock;
if (conn->sockFd6 >= 0) {
// IPv6 is available, attempt to send on it. If it fails, we'll fall back to IPv4 if available.
sock = conn->sockFd6;
} else {
// IPv4 fallback
sock = conn->sockFd;
}
int rc = TcpConnection_SendRaw(sock, &beLen, sizeof(beLen));
if (rc == 0 && payloadLen > 0) {
rc = TcpConnection_SendRaw(sock, payload, payloadLen);
}
#else
int rc = TcpConnection_SendRaw(conn->sockFd, &beLen, sizeof(beLen));
if (rc == 0 && payloadLen > 0) {
rc = TcpConnection_SendRaw(conn->sockFd, payload, payloadLen);
}
#endif
pthread_mutex_unlock(&conn->sendLock);
@@ -235,6 +251,11 @@ void TcpConnection_RequestClose(tcp_connection_t* conn) {
if (conn->sockFd >= 0) {
shutdown(conn->sockFd, SHUT_RDWR);
}
#ifdef USE_IPV6
if (conn->sockFd6 >= 0) {
shutdown(conn->sockFd6, SHUT_RDWR);
}
#endif
}
pthread_mutex_unlock(&conn->stateLock);
}

View File

@@ -172,6 +172,9 @@ tcp_server_t* TcpServer_Create() {
svr->isRunning = 0;
svr->maxClients = 0;
svr->clientsArrPtr = NULL;
#ifdef USE_IPV6
svr->sockFd6 = -1;
#endif
if (pthread_mutex_init(&svr->clientsMutex, NULL) != 0) {
free(svr);
@@ -217,6 +220,28 @@ void TcpServer_Init(tcp_server_t* ptr, unsigned short port, const char* addr) {
close(ptr->sockFd);
ptr->sockFd = -1;
}
#ifdef USE_IPV6
// IPv6 support
ptr->sockFd6 = socket(AF_INET6, SOCK_STREAM, 0);
if (ptr->sockFd6 >= 0) {
ptr->opt6 = 1;
setsockopt(ptr->sockFd6, SOL_SOCKET, SO_REUSEADDR, &ptr->opt6, sizeof(int));
memset(&ptr->addr6, 0, sizeof(ptr->addr6));
ptr->addr6.sin6_family = AF_INET6;
ptr->addr6.sin6_port = htons(port);
inet_pton(AF_INET6, addr, &ptr->addr6.sin6_addr);
if (bind(ptr->sockFd6, (struct sockaddr*)&ptr->addr6, sizeof(ptr->addr6)) < 0) {
close(ptr->sockFd6);
ptr->sockFd6 = -1;
}
} else {
ptr->sockFd6 = -1; // IPv6 is optional, so if it isn't available, we just set it to -1
}
#else
// Safety for my future "I forgot the ifdef guard" self
ptr->sockFd6 = -1; // IPv6 not supported in this build
#endif
}
void TcpServer_Start(tcp_server_t* ptr, int maxcons) {
@@ -228,6 +253,15 @@ void TcpServer_Start(tcp_server_t* ptr, int maxcons) {
return;
}
#ifdef USE_IPV6
if (ptr->sockFd6 >= 0) {
if (listen(ptr->sockFd6, maxcons) < 0) {
close(ptr->sockFd6);
ptr->sockFd6 = -1;
}
}
#endif
pthread_mutex_lock(&ptr->clientsMutex);
ptr->maxClients = (size_t)maxcons;
@@ -268,6 +302,14 @@ void TcpServer_Stop(tcp_server_t* ptr) {
ptr->sockFd = -1;
}
#ifdef USE_IPV6
if (ptr->sockFd6 >= 0) {
shutdown(ptr->sockFd6, SHUT_RDWR);
close(ptr->sockFd6);
ptr->sockFd6 = -1;
}
#endif
if (ptr->svrThread != 0 && !pthread_equal(ptr->svrThread, pthread_self())) {
pthread_join(ptr->svrThread, NULL);
}
@@ -339,6 +381,12 @@ void TcpServer_KillClient(tcp_server_t* ptr, tcp_connection_t* cli) {
so_linger.l_linger = 0;
setsockopt(cli->sockFd, SOL_SOCKET, SO_LINGER, &so_linger, sizeof(so_linger));
#ifdef USE_IPV6
if (cli->sockFd6 >= 0) {
setsockopt(cli->sockFd6, SOL_SOCKET, SO_LINGER, &so_linger, sizeof(so_linger));
}
#endif
TcpServer_Disconnect(ptr, cli);
}

View File

@@ -1,14 +1,21 @@
#include <txmempool.h>
#include <pthread.h>
static pthread_mutex_t g_txMempoolLock;
static bool g_txMempoolLockInitialized = false;
khash_t(tx_mempool_map_m)* txMempool = NULL;
void TxMempool_Init() {
txMempool = kh_init(tx_mempool_map_m);
pthread_mutex_init(&g_txMempoolLock, NULL);
g_txMempoolLockInitialized = true;
}
int TxMempool_Insert(signed_transaction_t tx) {
if (!txMempool) { return -1; }
pthread_mutex_lock(&g_txMempoolLock);
uint8_t txHash[32];
Transaction_CalculateHash(&tx, txHash);
@@ -18,17 +25,21 @@ int TxMempool_Insert(signed_transaction_t tx) {
int ret;
khiter_t k = kh_put(tx_mempool_map_m, txMempool, key, &ret);
if (k == kh_end(txMempool)) {
pthread_mutex_unlock(&g_txMempoolLock);
return -1;
}
kh_value(txMempool, k) = tx;
pthread_mutex_unlock(&g_txMempoolLock);
return ret;
}
bool TxMempool_Lookup(uint8_t* txHash, signed_transaction_t* out) {
if (!txMempool || !txHash || !out) { return false; }
pthread_mutex_lock(&g_txMempoolLock);
key32_t key;
memcpy(key.bytes, txHash, 32);
@@ -36,15 +47,65 @@ bool TxMempool_Lookup(uint8_t* txHash, signed_transaction_t* out) {
if (k != kh_end(txMempool)) {
signed_transaction_t tx = kh_value(txMempool, k);
memcpy(out, &tx, sizeof(signed_transaction_t));
pthread_mutex_unlock(&g_txMempoolLock);
return true;
}
pthread_mutex_unlock(&g_txMempoolLock);
return false;
}
bool TxMempool_Snapshot(signed_transaction_t** outTxs, size_t* outCount) {
if (!outTxs || !outCount) {
return false;
}
*outTxs = NULL;
*outCount = 0;
if (!txMempool) {
return true;
}
pthread_mutex_lock(&g_txMempoolLock);
size_t count = 0;
khiter_t k;
for (k = kh_begin(txMempool); k != kh_end(txMempool); ++k) {
if (kh_exist(txMempool, k)) {
++count;
}
}
if (count == 0) {
pthread_mutex_unlock(&g_txMempoolLock);
return true;
}
signed_transaction_t* snapshot = (signed_transaction_t*)malloc(count * sizeof(signed_transaction_t));
if (!snapshot) {
pthread_mutex_unlock(&g_txMempoolLock);
return false;
}
size_t index = 0;
for (k = kh_begin(txMempool); k != kh_end(txMempool); ++k) {
if (kh_exist(txMempool, k)) {
snapshot[index++] = kh_value(txMempool, k);
}
}
pthread_mutex_unlock(&g_txMempoolLock);
*outTxs = snapshot;
*outCount = count;
return true;
}
void TxMempool_Print() {
if (!txMempool) { return; }
pthread_mutex_lock(&g_txMempoolLock);
khiter_t k;
for (k = kh_begin(txMempool); k != kh_end(txMempool); ++k) {
if (kh_exist(txMempool, k)) {
@@ -62,10 +123,37 @@ void TxMempool_Print() {
(unsigned long long)tx.transaction.fee);
}
}
pthread_mutex_unlock(&g_txMempoolLock);
}
void TxMempool_Destroy() {
if (txMempool) {
pthread_mutex_lock(&g_txMempoolLock);
kh_destroy(tx_mempool_map_m, txMempool);
txMempool = NULL;
pthread_mutex_unlock(&g_txMempoolLock);
}
if (g_txMempoolLockInitialized) {
pthread_mutex_destroy(&g_txMempoolLock);
g_txMempoolLockInitialized = false;
}
}
bool TxMempool_Remove(const uint8_t* txHash) {
if (!txMempool || !txHash) { return false; }
pthread_mutex_lock(&g_txMempoolLock);
key32_t key;
memcpy(key.bytes, txHash, 32);
khiter_t k = kh_get(tx_mempool_map_m, txMempool, key);
if (k == kh_end(txMempool)) {
pthread_mutex_unlock(&g_txMempoolLock);
return false;
}
kh_del(tx_mempool_map_m, txMempool, k);
pthread_mutex_unlock(&g_txMempoolLock);
return true;
}