Compare commits

..

27 Commits

Author SHA1 Message Date
da50b4e8c1 Start IPv6 - Lord help me 2026-06-03 11:20:19 +02:00
00bd711501 remove proof 2026-05-29 14:31:24 +02:00
17ef3b74fd remove from mempool on mine; TEMPORARY log math proof of fee inclusion in coinbase tx 2026-05-29 14:28:27 +02:00
c1914dc3e7 add optional fee argument to send command 2026-05-29 14:00:58 +02:00
39293029c5 recompute state bug fixed 2026-05-29 13:51:34 +02:00
763aeb648f add fee-aware mining, coinbase validation, and reorg-safe orphan handling
Mining: blocks now include mempool txs, select spendable txs by fee, and pay coinbase as base reward + fees in main.c.
 - Consensus: block validation now enforces coinbase accounting and rejects invalid coinbase placement, including coinbase on amount2, in block.c and transaction.c.
 - Chain state: rollback now rebuilds currentSupply/currentReward, and block addition preflights spendability before mutating balances in chain.c.
 - Orphans/reorgs: orphan retry is safer, rollback-triggered sync reattaches orphans immediately, and transient orphan failures no longer drop blocks in orphan_pool.c and main.c.
 - Networking/mempool: node lifecycle now initializes the mempool, broadcasts can exclude one peer, and mempool snapshotting supports mining selection in net_node.c and txmempool.c.
 - Ledger simulation: added non-mutating spendable-transaction selection for block assembly in balance_sheet.c.
2026-05-29 13:44:15 +02:00
41a154a9fd fix pushing to txmempool 2026-05-29 12:49:51 +02:00
91d7bfa4e7 start adding tx system - test broadcast 2026-05-29 12:45:22 +02:00
4cfe85f6f2 orphans and wallet files 2026-05-28 13:01:23 +02:00
4f10f013f6 orphans reorg test 2026-05-15 22:38:18 +02:00
f94655a0ed segfaults and orphans 2026-05-15 22:32:34 +02:00
58ff36b218 cli fix 2026-05-15 19:54:48 +02:00
8f3559b3f6 segfault fix 2026-05-15 19:46:57 +02:00
971a4d9e49 auto resync on penalties 2026-05-15 19:33:57 +02:00
f9c94876d9 reorg bugs 2026-05-15 19:30:58 +02:00
9405801f6b con timeout 2026-05-15 19:28:21 +02:00
0fb2615d4c sync errors 2026-05-15 19:01:51 +02:00
55ca03f4ff orphan test 2026-05-15 18:49:49 +02:00
ce27dafaba todo update, forward block broadcasts, optional echo connect 2026-05-15 18:37:50 +02:00
4201b5bcc6 linux errors 2026-05-15 16:21:33 +02:00
46ff16fc3e linux errors 2026-05-15 16:18:06 +02:00
6dd14ce087 linux errors 2026-05-15 16:12:09 +02:00
644695c018 linux errors 2026-05-15 16:07:16 +02:00
5e520d57f6 thread blocking DAG alloc fix 2026-05-15 13:17:24 +02:00
3337ac85ab reorgs, fetch batching (parallel fetch), orphans 2026-05-15 13:01:27 +02:00
ad339dc696 sync 2026-05-15 12:23:07 +02:00
361ac73e45 global externs refactor, some tcp methods 2026-05-14 17:36:40 +02:00
29 changed files with 2859 additions and 254 deletions

View File

@@ -160,21 +160,20 @@ if(SKALACOIN_ENABLE_AUTOLYKOS2_REF)
) )
endif() endif()
if(TARGET CURL::libcurl) if(TARGET CURL::libcurl)
set(_AUTOLYKOS2_CURL_LIB CURL::libcurl) target_link_libraries(autolykos2_ref PRIVATE
${CMAKE_THREAD_LIBS_INIT}
OpenSSL::SSL
OpenSSL::Crypto
CURL::libcurl
)
elseif(DEFINED CURL_LIBRARIES AND CURL_LIBRARIES) elseif(DEFINED CURL_LIBRARIES AND CURL_LIBRARIES)
set(_AUTOLYKOS2_CURL_LIB ${CURL_LIBRARIES}) target_link_libraries(autolykos2_ref PRIVATE
${CMAKE_THREAD_LIBS_INIT}
OpenSSL::SSL
OpenSSL::Crypto
${CURL_LIBRARIES}
)
else() else()
set(_AUTOLYKOS2_CURL_LIB "")
endif()
target_link_libraries(autolykos2_ref PRIVATE
${CMAKE_THREAD_LIBS_INIT}
OpenSSL::SSL
OpenSSL::Crypto
$<$<BOOL:$_AUTOLYKOS2_CURL_LIB>:$_AUTOLYKOS2_CURL_LIB>
)
if(NOT _AUTOLYKOS2_CURL_LIB)
message(FATAL_ERROR "autolykos2_ref requires libcurl (curl/curl.h). Install libcurl devel package or allow FetchContent to build it.") message(FATAL_ERROR "autolykos2_ref requires libcurl (curl/curl.h). Install libcurl devel package or allow FetchContent to build it.")
endif() endif()
set(SKALACOIN_AUTOLYKOS2_REF_AVAILABLE ON) set(SKALACOIN_AUTOLYKOS2_REF_AVAILABLE ON)
@@ -230,5 +229,6 @@ target_compile_options(node PRIVATE
target_compile_definitions(node PRIVATE target_compile_definitions(node PRIVATE
CHAIN_DATA_DIR="${CMAKE_BINARY_DIR}/chain_data" CHAIN_DATA_DIR="${CMAKE_BINARY_DIR}/chain_data"
$<$<BOOL:${SKALACOIN_AUTOLYKOS2_REF_AVAILABLE}>:SKALACOIN_AUTOLYKOS2_REF_AVAILABLE> $<$<BOOL:${SKALACOIN_AUTOLYKOS2_REF_AVAILABLE}>:SKALACOIN_AUTOLYKOS2_REF_AVAILABLE>
$<$<BOOL:1>:_POSIX_C_SOURCE=200809L>
) )
set_target_properties(node PROPERTIES OUTPUT_NAME "skalacoin_node") set_target_properties(node PROPERTIES OUTPUT_NAME "skalacoin_node")

View File

@@ -1,6 +1,4 @@
TODO: TODO:
Implement Horizen's "Reorg Penalty" system to make it harder for the young chain to be attacked by a powerful miner.
Make transactions private. A bit more work, but it's a challenge worth taking on. Make transactions private. A bit more work, but it's a challenge worth taking on.
I want to make an "optional privacy" system, where the TX can be public or private. Of course private TXs need more bytes, so the fees (although low) will be higher for them. I want to make an "optional privacy" system, where the TX can be public or private. Of course private TXs need more bytes, so the fees (although low) will be higher for them.
I need to figure out a way to make the privacy work without a UTXO system, and instead, with a "Balance Sheet" approach. I need to figure out a way to make the privacy work without a UTXO system, and instead, with a "Balance Sheet" approach.
@@ -10,6 +8,23 @@ Maybe move the node system to an async event loop instead of spawning threads.
A potential race could occur if the P2P node receives a new block, or flushes a new block to disk while the user is running a full verify. A potential race could occur if the P2P node receives a new block, or flushes a new block to disk while the user is running a full verify.
Maybe think about how block broadcasting works. Instead of unsolicited broadcasting, maybe only advertise a new height and have peers request the block if they want it. This would reduce bandwidth usage, but it also means that blocks won't propagate as fast, which could lead to more orphaned blocks. It's a tradeoff.
Check if Block FullVerify is actually verifying fully (not missing any conditions).
A loophole in the reorg penalty system could potentially exist where someone broadcasts blocks one-at-a-time. Determine a solution to this.
IPv6 support for the P2P node. Come on guys, it's 2026. RFC 2460 was in 1998. It's about time.
Like if someone is behind NAT, fine, workable. CGNAT? Lmao good luck.
TO TEST:
Implement Horizen's "Reorg Penalty" system to make it harder for the young chain to be attacked by a powerful miner.
NOTE:
Because tx sizes are currently fixed, mining can use raw fee ordering for now. If tx sizes ever become dynamic, revisit selection to consider fee/byte instead.
Mempool snapshotting for mining should hold the lock only long enough to copy pending txs, but if the mempool grows very large that copy may still be non-trivial.
DONE: DONE:
I want to move away from the Monero emission. I want to do something a bit radical for cryptocurrency, but I feel like it's necessary to make it more like money: I want to move away from the Monero emission. I want to do something a bit radical for cryptocurrency, but I feel like it's necessary to make it more like money:
a constant inflation rate of 1.5% per year. It's lower than fiat (USD is ~2.8% per year), and it additionally doesn't fluctuate during crisis. It's constant. a constant inflation rate of 1.5% per year. It's lower than fiat (USD is ~2.8% per year), and it additionally doesn't fluctuate during crisis. It's constant.

View File

@@ -8,6 +8,7 @@
#include <stdio.h> #include <stdio.h>
#include <khash/khash.h> #include <khash/khash.h>
#include <crypto/crypto.h> #include <crypto/crypto.h>
#include <block/transaction.h>
#include <string.h> #include <string.h>
#include <utils.h> #include <utils.h>
#include <uint256.h> #include <uint256.h>
@@ -29,4 +30,12 @@ bool BalanceSheet_LoadFromFile(const char* inPath);
void BalanceSheet_Print(); void BalanceSheet_Print();
void BalanceSheet_Destroy(); void BalanceSheet_Destroy();
bool BalanceSheet_SelectSpendableTransactions(
const signed_transaction_t* candidates,
size_t candidateCount,
signed_transaction_t** outAccepted,
size_t* outAcceptedCount,
uint64_t* outTotalFees
);
#endif #endif

View File

@@ -36,8 +36,13 @@ void Block_AddTransaction(block_t* block, signed_transaction_t* tx);
void Block_RemoveTransaction(block_t* block, uint8_t* txHash); void Block_RemoveTransaction(block_t* block, uint8_t* txHash);
bool Block_HasValidProofOfWork(const block_t* block); bool Block_HasValidProofOfWork(const block_t* block);
bool Block_AllTransactionsValid(const block_t* block); bool Block_AllTransactionsValid(const block_t* block);
bool Block_ValidateCoinbaseAndFees(const block_t* block, uint64_t expectedCoinbaseAmount, uint64_t* outTotalFees);
bool Block_IsFullyValid(const block_t* block);
void Block_ShutdownPowContext(void); void Block_ShutdownPowContext(void);
void Block_Destroy(block_t* block); void Block_Destroy(block_t* block);
void Block_Print(const block_t* block); void Block_Print(const block_t* block);
void Block_ShortPrint(const block_t* block);
// Deep-copy a block (allocates a new `block_t*`). Caller must call `Block_Destroy`.
block_t* Block_Copy(const block_t* src);
#endif #endif

View File

@@ -24,6 +24,17 @@ size_t Chain_Size(blockchain_t* chain);
bool Chain_IsValid(blockchain_t* chain); bool Chain_IsValid(blockchain_t* chain);
void Chain_Wipe(blockchain_t* chain); void Chain_Wipe(blockchain_t* chain);
// Roll back the chain to `height` (exclusive): after this call, Chain_Size(chain) == height
// Returns true on success.
bool Chain_RollbackToHeight(blockchain_t* chain, size_t height);
// Recompute `currentSupply` and `currentReward` from the in-memory chain blocks.
// Returns true on success and updates runtime state globals.
bool Chain_RecomputeRuntimeState(blockchain_t* chain);
// Retrieve a deep copy of the block at `index`. Caller must free with `Block_Destroy`.
bool Chain_GetBlockCopy(blockchain_t* chain, size_t index, block_t** outCopy);
// I/O // I/O
bool Chain_SaveToFile(blockchain_t* chain, const char* dirpath, uint256_t currentSupply, uint64_t currentReward); bool Chain_SaveToFile(blockchain_t* chain, const char* dirpath, uint256_t currentSupply, uint64_t currentReward);
bool Chain_LoadFromFile(blockchain_t* chain, const char* dirpath, uint256_t* outCurrentSupply, uint32_t* outDifficultyTarget, uint64_t* outCurrentReward, uint8_t* outLastSavedHash, bool loadTransactions); bool Chain_LoadFromFile(blockchain_t* chain, const char* dirpath, uint256_t* outCurrentSupply, uint32_t* outDifficultyTarget, uint64_t* outCurrentReward, uint8_t* outLastSavedHash, bool loadTransactions);

View File

@@ -7,11 +7,12 @@
#include <block/chain.h> #include <block/chain.h>
#include <block/block.h> #include <block/block.h>
extern uint64_t currentBlockHeight; #include <runtime_state.h>
// Nets // Nets
#define MAX_CONS 32 // Some baseline for now #define MAX_CONS 32 // Some baseline for now
#define LISTEN_PORT 9393 #define LISTEN_PORT 9393
#define ECHO_PEERS 1 // If non-zero, automatically attempt to connect back to any inbound peers (helps form bidirectional peering)
#define TCP_THREAD_STACK_SIZE (512 * 1024) // 512 KB. We could get away with like 128 KB since it's mostly just recv bufs, but it's good having some breathing room. #define TCP_THREAD_STACK_SIZE (512 * 1024) // 512 KB. We could get away with like 128 KB since it's mostly just recv bufs, but it's good having some breathing room.
// This is also for client threads. The server has the default (~8 MB on POSIX). // This is also for client threads. The server has the default (~8 MB on POSIX).
@@ -23,6 +24,22 @@ extern uint64_t currentBlockHeight;
//#define INITIAL_DIFFICULTY 0x1f0c1422 // Default compact target used by Autolykos2 PoW (This is ridiculously low) //#define INITIAL_DIFFICULTY 0x1f0c1422 // Default compact target used by Autolykos2 PoW (This is ridiculously low)
#define INITIAL_DIFFICULTY 0x1f1b7c51 // This takes 90s on my machine with a single thread, good for testing #define INITIAL_DIFFICULTY 0x1f1b7c51 // This takes 90s on my machine with a single thread, good for testing
// Sync / Reorg tuning constants
// Timeouts and retry/backoff behavior for block fetches during sync (milliseconds)
static const uint64_t SYNC_REQUEST_TIMEOUT_MS = 5000ULL; // 5s
static const int MAX_SYNC_RETRIES = 4; // retry attempts per block fetch
static const uint64_t SYNC_BACKOFF_BASE_MS = 200ULL; // base backoff in ms (exponential)
// Parallelism
static const int MAX_PARALLEL_FETCHES = 8; // concurrent block fetches during windowed sync
// Heuristic: if peer is this many blocks ahead, treat as initial sync
static const uint64_t INITIAL_SYNC_HEIGHT_DIFF = 50ULL;
// Reorg penalty configuration (used to penalize peers reporting higher heights but with delayed work)
static const uint64_t REORG_PENALTY_GRACE_BLOCKS = 3ULL; // allow small reorgs without penalty
static const double REORG_PENALTY_FACTOR = 1.0; // base scaling factor (theta)
static const double REORG_PENALTY_EXPONENT = 2.0; // exponent p in penalty ~ B^p
static const double REORG_PENALTY_REF_BLOCK_TIME = 150.0; // reference block time in seconds used by original scheme
// Reward schedule acceleration: 1 means normal-speed progression. // Reward schedule acceleration: 1 means normal-speed progression.
#define EMISSION_ACCELERATION_FACTOR 1ULL #define EMISSION_ACCELERATION_FACTOR 1ULL
@@ -56,11 +73,8 @@ extern uint64_t currentBlockHeight;
static const uint64_t M_CAP = 18446744073709551615ULL; // Max uint64 static const uint64_t M_CAP = 18446744073709551615ULL; // Max uint64
static const uint64_t TAIL_EMISSION = 750000000000ULL; // 0.75 coins per block floor static const uint64_t TAIL_EMISSION = 750000000000ULL; // 0.75 coins per block floor
static uint64_t currentReward = 750000000000ULL; // Epoch reward cache for phase 3
// No max supply. Instead of halving, it'll follow a more gradual, Monero-like emission curve. // No max supply. Instead of halving, it'll follow a more gradual, Monero-like emission curve.
static uint256_t currentSupply = {{0, 0, 0, 0}}; // Global variable to track total supply; updated with each block mined
// Phase 3: update once per effective epoch and keep a fixed per-block reward for that epoch. // Phase 3: update once per effective epoch and keep a fixed per-block reward for that epoch.
static inline uint64_t GetInflationRateReward(uint256_t currentSupply, blockchain_t* chain) { static inline uint64_t GetInflationRateReward(uint256_t currentSupply, blockchain_t* chain) {
if (!chain || !chain->blocks) { return 0x00; } // Invalid if (!chain || !chain->blocks) { return 0x00; } // Invalid
@@ -169,10 +183,16 @@ static inline size_t CalculateTargetDAGSize(blockchain_t* chain) {
} }
// Get the height - EPOCH_LENGTH block and the last block; // Get the height - EPOCH_LENGTH block and the last block;
block_t* lastBlock = Chain_GetBlock(chain, Chain_Size(chain) - 1); block_t* lastBlock = NULL;
block_t* epochStartBlock = Chain_GetBlock(chain, (size_t)(Chain_Size(chain) - 1 - EPOCH_LENGTH)); block_t* epochStartBlock = NULL;
if (!lastBlock || !epochStartBlock) { if (!Chain_GetBlockCopy(chain, Chain_Size(chain) - 1, &lastBlock) || !lastBlock) {
return 0; // Invalid if (lastBlock) Block_Destroy(lastBlock);
return 0;
}
if (!Chain_GetBlockCopy(chain, (size_t)(Chain_Size(chain) - 1 - EPOCH_LENGTH), &epochStartBlock) || !epochStartBlock) {
Block_Destroy(lastBlock);
if (epochStartBlock) Block_Destroy(epochStartBlock);
return 0;
} }
int64_t difficultyDelta = (int64_t)epochStartBlock->header.difficultyTarget - (int64_t)lastBlock->header.difficultyTarget; int64_t difficultyDelta = (int64_t)epochStartBlock->header.difficultyTarget - (int64_t)lastBlock->header.difficultyTarget;
@@ -193,10 +213,15 @@ static inline size_t CalculateTargetDAGSize(blockchain_t* chain) {
int64_t targetSize = (int64_t)DAG_BASE_SIZE + growth; int64_t targetSize = (int64_t)DAG_BASE_SIZE + growth;
if (targetSize <= 0) { if (targetSize <= 0) {
Block_Destroy(lastBlock);
Block_Destroy(epochStartBlock);
return 0; return 0;
} }
return (size_t)targetSize; size_t out = (size_t)targetSize;
Block_Destroy(lastBlock);
Block_Destroy(epochStartBlock);
return out;
} }
static inline void GetNextDAGSeed(blockchain_t* chain, uint8_t outSeed[32]) { static inline void GetNextDAGSeed(blockchain_t* chain, uint8_t outSeed[32]) {
@@ -208,13 +233,15 @@ static inline void GetNextDAGSeed(blockchain_t* chain, uint8_t outSeed[32]) {
return; return;
} }
block_t* prevBlock = Chain_GetBlock(chain, Chain_Size(chain) - 1); block_t* prevBlock = NULL;
if (!prevBlock) { if (!Chain_GetBlockCopy(chain, Chain_Size(chain) - 1, &prevBlock) || !prevBlock) {
memset(outSeed, 0x00, 32); // Fallback to zeroes if we can't get the previous block for some reason; The caller should treat this as an error if height >= EPOCH_LENGTH memset(outSeed, 0x00, 32); // Fallback to zeroes if we can't get the previous block for some reason; The caller should treat this as an error if height >= EPOCH_LENGTH
if (prevBlock) Block_Destroy(prevBlock);
return; return;
} }
Block_CalculateHash(prevBlock, outSeed); Block_CalculateHash(prevBlock, outSeed);
Block_Destroy(prevBlock);
} }
#endif #endif

View File

@@ -0,0 +1,10 @@
#ifndef FETCH_SCHEDULER_H
#define FETCH_SCHEDULER_H
#include <stdint.h>
// Compute penalty in blocks for a delayed/heavy reorg reported by a peer.
// Returns the number of penalty blocks to subtract from the peer's advertised work.
uint64_t FetchScheduler_ComputeReorgPenaltyBlocks(uint64_t delayBlocks);
#endif

View File

@@ -14,15 +14,32 @@
#include <stddef.h> #include <stddef.h>
#include <dynarr.h> #include <dynarr.h>
#include <dynset.h>
#include <pthread.h>
#include <block/block.h>
#include <block/chain.h>
#include <block/transaction.h>
typedef struct { typedef struct {
tcp_server_t* server; tcp_server_t* server;
tcp_client_t outboundClients[MAX_CONS]; tcp_client_t outboundClients[MAX_CONS];
size_t outboundCount; size_t outboundCount;
// Dedup cache for recently seen block hashes (canonical 32-byte hash)
DynSet* seenBlocks;
// Protects seenBlocks
pthread_mutex_t seenLock;
// Protects outboundClients snapshots and peerBlockHeight writes
pthread_mutex_t outboundLock;
void (*on_connect)(tcp_connection_t* conn, void* user); void (*on_connect)(tcp_connection_t* conn, void* user);
void (*on_data)(tcp_connection_t* conn, const unsigned char* data, size_t len, void* user); void (*on_data)(tcp_connection_t* conn, const unsigned char* data, size_t len, void* user);
void (*on_disconnect)(tcp_connection_t* conn, void* user); void (*on_disconnect)(tcp_connection_t* conn, void* user);
void* callbackUser; void* callbackUser;
// Maintenance thread for periodic tasks (orphan attach, pruning, metrics)
pthread_t maintenanceThread;
volatile int maintenanceRunning;
int maintenanceIntervalMs;
} net_node_t; } net_node_t;
net_node_t* Node_Create(); net_node_t* Node_Create();
@@ -40,6 +57,11 @@ int Node_ConnectPeer(net_node_t* node, const char* ip, unsigned short port);
int Node_ConnectStartupPeers(net_node_t* node, const char** ips, const unsigned short* ports, size_t peersCount); int Node_ConnectStartupPeers(net_node_t* node, const char** ips, const unsigned short* ports, size_t peersCount);
int Node_SendPacket(net_node_t* node, tcp_connection_t* conn, packet_type_t packetType, const void* payload, size_t payloadLen); int Node_SendPacket(net_node_t* node, tcp_connection_t* conn, packet_type_t packetType, const void* payload, size_t payloadLen);
int Node_BroadcastTransaction(net_node_t* node, signed_transaction_t* tx, tcp_connection_t* excludeNode);
// Helpers for outbound peer selection and block broadcast
int Node_GetBestOutboundPeer(net_node_t* node, tcp_connection_t** outConn, uint64_t* outHeight);
void Node_BroadcastChainRange(net_node_t* node, size_t startHeightInclusive, tcp_connection_t* sourceConn);
// Callback logic // Callback logic
void Node_Server_OnConnect(tcp_connection_t* client); void Node_Server_OnConnect(tcp_connection_t* client);

View File

@@ -0,0 +1,20 @@
#ifndef ORPHAN_POOL_H
#define ORPHAN_POOL_H
#include <stdint.h>
#include <block/block.h>
#include <block/chain.h>
// Initialize/destroy the global orphan pool
void OrphanPool_Init(void);
void OrphanPool_Destroy(void);
// Insert an orphan block into the pool. Ownership of `block` is transferred to the pool.
// `height` is the block number from the header.
void OrphanPool_Insert(block_t* block, uint64_t height);
// Attempt to attach any orphans whose parents now exist in `chain`.
// Returns the number of blocks successfully attached.
size_t OrphanPool_AttemptAttach(blockchain_t* chain);
#endif

25
include/runtime_state.h Normal file
View File

@@ -0,0 +1,25 @@
#ifndef RUNTIME_STATE_H
#define RUNTIME_STATE_H
#include <stdint.h>
#include <uint256.h>
#include <block/chain.h>
#include <pthread.h>
extern uint64_t currentBlockHeight;
extern blockchain_t* currentChain;
extern uint256_t currentSupply;
extern uint64_t currentReward;
extern uint32_t difficultyTarget;
extern const char* chainDataDir;
extern unsigned short listenPort;
extern bool echoPeersEnabled;
extern bool forceOrphanReorgEnabled;
// Global synchronization primitives for runtime state
extern pthread_rwlock_t chainLock; // protects chain structure and related mutations
extern pthread_mutex_t balanceSheetLock; // protects balance sheet map
#endif

View File

@@ -3,6 +3,7 @@
#include <arpa/inet.h> #include <arpa/inet.h>
#include <stddef.h> #include <stddef.h>
#include <stdint.h>
#include <constants.h> #include <constants.h>
#include <tcpd/tcpconnection.h> #include <tcpd/tcpconnection.h>
@@ -13,6 +14,7 @@ typedef struct {
void (*on_data)(tcp_connection_t* conn); void (*on_data)(tcp_connection_t* conn);
void (*on_disconnect)(tcp_connection_t* conn); void (*on_disconnect)(tcp_connection_t* conn);
void* owner; void* owner;
uint64_t peerBlockHeight;
} tcp_client_t; } tcp_client_t;
int TcpClient_Init(tcp_client_t* client); int TcpClient_Init(tcp_client_t* client);

View File

@@ -19,8 +19,17 @@ typedef enum {
typedef struct tcp_connection_t tcp_connection_t; typedef struct tcp_connection_t tcp_connection_t;
struct tcp_connection_t { struct tcp_connection_t {
// TODO: We should make it so only ONE of this needs to be available.
// Because of my temporary "I just need something that works" horseshit that I'm about to write, you'll need IPv4 and IPv6 is optional.
// Note to self: Don't pull an IETF and some "NAT exists, we're fine" bullshit, because if we end up with our eqvivalent of Teredo or CGNAT, I'm gonna be fucking pissed.
// And no, the solution isn't "eh, just bind to 0.0.0.0 and ignore it", because if we do that, we'll inevitably end up with a host that only has IPv6 and then we'll be fucked.
// Honestly, I'm proud of whoever runs IPv6-only. Brave soul.
int sockFd; int sockFd;
struct sockaddr_in peerAddr; struct sockaddr_in peerAddr;
#ifdef USE_IPV6
int sockFd6; // For IPv6 support
struct sockaddr_in6 peerAddr6; // For IPv6 support
#endif
uint32_t connectionId; uint32_t connectionId;
tcp_connection_role_t role; tcp_connection_role_t role;
@@ -54,6 +63,7 @@ int TcpConnection_SetDataBuffer(tcp_connection_t* conn, const unsigned char* dat
void TcpConnection_ResetFramingState(tcp_connection_t* conn); void TcpConnection_ResetFramingState(tcp_connection_t* conn);
int TcpConnection_FeedFramedData(tcp_connection_t* conn, const unsigned char* input, size_t inputLen); int TcpConnection_FeedFramedData(tcp_connection_t* conn, const unsigned char* input, size_t inputLen);
// This just takes a socket ID, so it's independent from the v4/v6 stuff. It works for both.
int TcpConnection_SendRaw(int sockFd, const void* data, size_t len); int TcpConnection_SendRaw(int sockFd, const void* data, size_t len);
int TcpConnection_SendFramed(tcp_connection_t* conn, const void* payload, size_t payloadLen); int TcpConnection_SendFramed(tcp_connection_t* conn, const void* payload, size_t payloadLen);

View File

@@ -11,7 +11,12 @@
typedef struct { typedef struct {
int sockFd; int sockFd;
struct sockaddr_in addr; struct sockaddr_in addr;
#ifdef USE_IPV6
int sockFd6; // IPv6 support
struct sockaddr_in6 addr6; // IPv6 support
#endif
int opt; int opt;
int opt6; // IPv6 support
int isRunning; int isRunning;
void* owner; void* owner;

View File

@@ -13,7 +13,10 @@ void TxMempool_Init();
// Assumed that the transation was confirmed to be valid // Assumed that the transation was confirmed to be valid
int TxMempool_Insert(signed_transaction_t tx); int TxMempool_Insert(signed_transaction_t tx);
bool TxMempool_Lookup(uint8_t* txHash, signed_transaction_t* out); bool TxMempool_Lookup(uint8_t* txHash, signed_transaction_t* out);
bool TxMempool_Snapshot(signed_transaction_t** outTxs, size_t* outCount);
void TxMempool_Print(); void TxMempool_Print();
// Remove a transaction from the mempool by its hash. Returns true if removed.
bool TxMempool_Remove(const uint8_t* txHash);
void TxMempool_Destroy(); void TxMempool_Destroy();
#endif #endif

View File

@@ -36,6 +36,19 @@ static inline uint64_t get_current_time_ms(void) {
return (spec.tv_sec * 1000) + (spec.tv_nsec / 1000000); return (spec.tv_sec * 1000) + (spec.tv_nsec / 1000000);
} }
static inline void sleep_for_microseconds(uint64_t microseconds) {
struct timespec req;
req.tv_sec = (time_t)(microseconds / 1000000ULL);
req.tv_nsec = (long)((microseconds % 1000000ULL) * 1000ULL);
while (nanosleep(&req, &req) == -1) {
continue;
}
}
static inline void sleep_for_milliseconds(uint64_t milliseconds) {
sleep_for_microseconds(milliseconds * 1000ULL);
}
static inline void AddressToHexString(const uint8_t address[32], char out[65]) { static inline void AddressToHexString(const uint8_t address[32], char out[65]) {
if (!address || !out) { if (!address || !out) {
return; return;
@@ -154,7 +167,7 @@ static inline bool GenerateTestMinerIdentity(uint8_t privateKey[32], uint8_t com
return false; return false;
} }
static inline bool GenerateRandomTestAddress(uint8_t outAddress[32]) { static inline bool GenerateRandomTestAddress(uint8_t outAddress[32], uint8_t outPrivateKey[32], uint8_t outCompressedPubkey[33]) {
if (!outAddress) { if (!outAddress) {
return false; return false;
} }
@@ -187,11 +200,18 @@ static inline bool GenerateRandomTestAddress(uint8_t outAddress[32]) {
} }
AddressFromCompressedPubkey(compressedPubkey, outAddress); AddressFromCompressedPubkey(compressedPubkey, outAddress);
if (outPrivateKey) {
memcpy(outPrivateKey, privateKey, 32);
}
if (outCompressedPubkey) {
memcpy(outCompressedPubkey, compressedPubkey, 33);
}
secp256k1_context_destroy(ctx); secp256k1_context_destroy(ctx);
return true; return true;
} }
secp256k1_context_destroy(ctx); secp256k1_context_destroy(ctx);
return false; return false;
} }

0
myeasylog.log Normal file
View File

View File

@@ -254,7 +254,7 @@ static bool Autolykos2_HashCore(
DWORD ms = (DWORD)((autolykos2_sleepBetweenHashOperationsMicroseconds + 999) / 1000); DWORD ms = (DWORD)((autolykos2_sleepBetweenHashOperationsMicroseconds + 999) / 1000);
Sleep(ms); Sleep(ms);
#else #else
usleep((useconds_t)autolykos2_sleepBetweenHashOperationsMicroseconds); sleep_for_microseconds(autolykos2_sleepBetweenHashOperationsMicroseconds);
#endif #endif
} }

View File

@@ -1,32 +1,179 @@
#include <balance_sheet.h> #include <balance_sheet.h>
#include <pthread.h>
khash_t(balance_sheet_map_m)* sheetMap = NULL; khash_t(balance_sheet_map_m)* sheetMap = NULL;
static pthread_mutex_t g_sheetLock;
void BalanceSheet_Init() { static bool BalanceSheet_GetSimEntry(
sheetMap = kh_init(balance_sheet_map_m); khash_t(balance_sheet_map_m)* simMap,
const uint8_t address[32],
balance_sheet_entry_t* out
) {
if (!simMap || !address || !out) {
return false;
}
key32_t key;
memcpy(key.bytes, address, 32);
khiter_t k = kh_get(balance_sheet_map_m, simMap, key);
if (k != kh_end(simMap)) {
*out = kh_value(simMap, k);
return true;
}
if (BalanceSheet_Lookup((uint8_t*)address, out)) {
int ret = 0;
k = kh_put(balance_sheet_map_m, simMap, key, &ret);
if (k == kh_end(simMap)) {
return false;
}
kh_value(simMap, k) = *out;
return true;
}
memset(out, 0, sizeof(*out));
memcpy(out->address, address, 32);
out->balance = uint256_from_u64(0);
int ret = 0;
k = kh_put(balance_sheet_map_m, simMap, key, &ret);
if (k == kh_end(simMap)) {
return false;
}
kh_value(simMap, k) = *out;
return true;
} }
int BalanceSheet_Insert(balance_sheet_entry_t entry) { static bool BalanceSheet_StoreSimEntry(
if (!sheetMap) { return -1; } khash_t(balance_sheet_map_m)* simMap,
const balance_sheet_entry_t* entry
) {
if (!simMap || !entry) {
return false;
}
key32_t key;
memcpy(key.bytes, entry->address, 32);
int ret = 0;
khiter_t k = kh_put(balance_sheet_map_m, simMap, key, &ret);
if (k == kh_end(simMap)) {
return false;
}
kh_value(simMap, k) = *entry;
return true;
}
static bool BalanceSheet_ApplyCandidateTransaction(
khash_t(balance_sheet_map_m)* simMap,
const signed_transaction_t* tx,
uint64_t* outFee
) {
if (!simMap || !tx) {
return false;
}
if (Address_IsCoinbase(tx->transaction.senderAddress)) {
return true;
}
if (!Transaction_Verify(tx)) {
return false;
}
balance_sheet_entry_t senderEntry;
if (!BalanceSheet_GetSimEntry(simMap, tx->transaction.senderAddress, &senderEntry)) {
return false;
}
uint256_t spend = uint256_from_u64(0);
if (uint256_add_u64(&spend, tx->transaction.amount1) ||
uint256_add_u64(&spend, tx->transaction.amount2) ||
uint256_add_u64(&spend, tx->transaction.fee)) {
return false;
}
if (uint256_cmp(&senderEntry.balance, &spend) < 0) {
return false;
}
if (!uint256_subtract(&senderEntry.balance, &spend)) {
return false;
}
if (!BalanceSheet_StoreSimEntry(simMap, &senderEntry)) {
return false;
}
balance_sheet_entry_t recipient1Entry;
if (!BalanceSheet_GetSimEntry(simMap, tx->transaction.recipientAddress1, &recipient1Entry)) {
return false;
}
if (uint256_add_u64(&recipient1Entry.balance, tx->transaction.amount1)) {
return false;
}
if (!BalanceSheet_StoreSimEntry(simMap, &recipient1Entry)) {
return false;
}
if (tx->transaction.amount2 > 0) {
balance_sheet_entry_t recipient2Entry;
if (!BalanceSheet_GetSimEntry(simMap, tx->transaction.recipientAddress2, &recipient2Entry)) {
return false;
}
if (uint256_add_u64(&recipient2Entry.balance, tx->transaction.amount2)) {
return false;
}
if (!BalanceSheet_StoreSimEntry(simMap, &recipient2Entry)) {
return false;
}
}
if (outFee) {
*outFee = tx->transaction.fee;
}
return true;
}
static int BalanceSheet_InsertLocked(balance_sheet_entry_t entry) {
if (!sheetMap) {
return -1;
}
// Encapsulate key
key32_t key; key32_t key;
memcpy(key.bytes, entry.address, 32); memcpy(key.bytes, entry.address, 32);
int ret; int ret = 0;
khiter_t k = kh_put(balance_sheet_map_m, sheetMap, key, &ret); khiter_t k = kh_put(balance_sheet_map_m, sheetMap, key, &ret);
if (k == kh_end(sheetMap)) { if (k == kh_end(sheetMap)) {
return -1; return -1;
} }
kh_value(sheetMap, k) = entry; kh_value(sheetMap, k) = entry;
return ret;
}
void BalanceSheet_Init() {
sheetMap = kh_init(balance_sheet_map_m);
pthread_mutex_init(&g_sheetLock, NULL);
}
int BalanceSheet_Insert(balance_sheet_entry_t entry) {
if (!sheetMap) { return -1; }
pthread_mutex_lock(&g_sheetLock);
int ret = BalanceSheet_InsertLocked(entry);
pthread_mutex_unlock(&g_sheetLock);
return ret; return ret;
} }
bool BalanceSheet_Lookup(uint8_t* address, balance_sheet_entry_t* out) { bool BalanceSheet_Lookup(uint8_t* address, balance_sheet_entry_t* out) {
if (!address || !out) { return false; } if (!address || !out) { return false; }
pthread_mutex_lock(&g_sheetLock);
key32_t key; key32_t key;
memcpy(key.bytes, address, 32); memcpy(key.bytes, address, 32);
@@ -34,20 +181,26 @@ bool BalanceSheet_Lookup(uint8_t* address, balance_sheet_entry_t* out) {
if (k != kh_end(sheetMap)) { if (k != kh_end(sheetMap)) {
balance_sheet_entry_t entry = kh_value(sheetMap, k); balance_sheet_entry_t entry = kh_value(sheetMap, k);
memcpy(out, &entry, sizeof(balance_sheet_entry_t)); memcpy(out, &entry, sizeof(balance_sheet_entry_t));
pthread_mutex_unlock(&g_sheetLock);
return true; return true;
} }
pthread_mutex_unlock(&g_sheetLock);
return false; return false;
} }
bool BalanceSheet_SaveToFile(const char* outPath) { bool BalanceSheet_SaveToFile(const char* outPath) {
if (!sheetMap) { return false; } if (!sheetMap) { return false; }
pthread_mutex_lock(&g_sheetLock);
char outFile[512]; char outFile[512];
strcpy(outFile, outPath); strcpy(outFile, outPath);
strcat(outFile, "/balance_sheet.data"); strcat(outFile, "/balance_sheet.data");
FILE* file = fopen(outFile, "wb"); FILE* file = fopen(outFile, "wb");
if (!file) { return false; } if (!file) {
pthread_mutex_unlock(&g_sheetLock);
return false;
}
khiter_t k; khiter_t k;
for (k = kh_begin(sheetMap); k != kh_end(sheetMap); ++k) { for (k = kh_begin(sheetMap); k != kh_end(sheetMap); ++k) {
@@ -55,39 +208,48 @@ bool BalanceSheet_SaveToFile(const char* outPath) {
balance_sheet_entry_t entry = kh_val(sheetMap, k); balance_sheet_entry_t entry = kh_val(sheetMap, k);
if (fwrite(&entry, sizeof(balance_sheet_entry_t), 1, file) != 1) { if (fwrite(&entry, sizeof(balance_sheet_entry_t), 1, file) != 1) {
fclose(file); fclose(file);
pthread_mutex_unlock(&g_sheetLock);
return false; return false;
} }
} }
} }
fclose(file); fclose(file);
pthread_mutex_unlock(&g_sheetLock);
return true; return true;
} }
bool BalanceSheet_LoadFromFile(const char* inPath) { bool BalanceSheet_LoadFromFile(const char* inPath) {
if (!sheetMap) { return false; } if (!sheetMap) { return false; }
pthread_mutex_lock(&g_sheetLock);
char inFile[512]; char inFile[512];
strcpy(inFile, inPath); strcpy(inFile, inPath);
strcat(inFile, "/balance_sheet.data"); strcat(inFile, "/balance_sheet.data");
FILE* file = fopen(inFile, "rb"); FILE* file = fopen(inFile, "rb");
if (!file) { return false; } if (!file) {
pthread_mutex_unlock(&g_sheetLock);
return false;
}
balance_sheet_entry_t entry; balance_sheet_entry_t entry;
while (fread(&entry, sizeof(balance_sheet_entry_t), 1, file) == 1) { while (fread(&entry, sizeof(balance_sheet_entry_t), 1, file) == 1) {
if (BalanceSheet_Insert(entry) < 0) { if (BalanceSheet_InsertLocked(entry) < 0) {
fclose(file); fclose(file);
pthread_mutex_unlock(&g_sheetLock);
return false; return false;
} }
} }
fclose(file); fclose(file);
pthread_mutex_unlock(&g_sheetLock);
return true; return true;
} }
void BalanceSheet_Print() { void BalanceSheet_Print() {
if (!sheetMap) { return; } if (!sheetMap) { return; }
pthread_mutex_lock(&g_sheetLock);
// Iterate through every entry // Iterate through every entry
khiter_t k; khiter_t k;
for (k = kh_begin(sheetMap); k != kh_end(sheetMap); ++k) { for (k = kh_begin(sheetMap); k != kh_end(sheetMap); ++k) {
@@ -107,9 +269,72 @@ void BalanceSheet_Print() {
balanceStr); balanceStr);
} }
} }
pthread_mutex_unlock(&g_sheetLock);
} }
void BalanceSheet_Destroy() { void BalanceSheet_Destroy() {
kh_destroy(balance_sheet_map_m, sheetMap); kh_destroy(balance_sheet_map_m, sheetMap);
sheetMap = NULL; sheetMap = NULL;
pthread_mutex_destroy(&g_sheetLock);
}
bool BalanceSheet_SelectSpendableTransactions(
const signed_transaction_t* candidates,
size_t candidateCount,
signed_transaction_t** outAccepted,
size_t* outAcceptedCount,
uint64_t* outTotalFees
) {
if (!outAccepted || !outAcceptedCount || !outTotalFees) {
return false;
}
*outAccepted = NULL;
*outAcceptedCount = 0;
*outTotalFees = 0;
if (!candidates || candidateCount == 0) {
return true;
}
signed_transaction_t* accepted = (signed_transaction_t*)calloc(candidateCount, sizeof(signed_transaction_t));
if (!accepted) {
return false;
}
khash_t(balance_sheet_map_m)* simMap = kh_init(balance_sheet_map_m);
if (!simMap) {
free(accepted);
return false;
}
size_t acceptedCount = 0;
uint64_t totalFees = 0;
for (size_t i = 0; i < candidateCount; ++i) {
const signed_transaction_t* tx = &candidates[i];
if (Address_IsCoinbase(tx->transaction.senderAddress)) {
continue;
}
uint64_t fee = 0;
if (!BalanceSheet_ApplyCandidateTransaction(simMap, tx, &fee)) {
continue;
}
accepted[acceptedCount++] = *tx;
totalFees += fee;
}
kh_destroy(balance_sheet_map_m, simMap);
if (acceptedCount == 0) {
free(accepted);
accepted = NULL;
}
*outAccepted = accepted;
*outAcceptedCount = acceptedCount;
*outTotalFees = totalFees;
return true;
} }

View File

@@ -214,23 +214,99 @@ bool Block_AllTransactionsValid(const block_t* block) {
for (size_t i = 0; i < DynArr_size(block->transactions); i++) { for (size_t i = 0; i < DynArr_size(block->transactions); i++) {
signed_transaction_t* tx = (signed_transaction_t*)DynArr_at(block->transactions, i); signed_transaction_t* tx = (signed_transaction_t*)DynArr_at(block->transactions, i);
if (!Transaction_Verify(tx)) {
return false;
}
if (tx && Address_IsCoinbase(tx->transaction.senderAddress)) { if (tx && Address_IsCoinbase(tx->transaction.senderAddress)) {
if (hasCoinbase) { if (hasCoinbase) {
return false; // More than one coinbase transaction return false;
} }
hasCoinbase = true; hasCoinbase = true;
continue; // Coinbase transactions are valid since the miner has the right to create coins. Only rule is one per block.
}
if (!Transaction_Verify(tx)) {
return false;
} }
} }
return true && hasCoinbase && DynArr_size(block->transactions) > 0; // Every block must have at least one transaction (the coinbase) return true && hasCoinbase && DynArr_size(block->transactions) > 0; // Every block must have at least one transaction (the coinbase)
} }
bool Block_ValidateCoinbaseAndFees(const block_t* block, uint64_t expectedCoinbaseAmount, uint64_t* outTotalFees) {
if (!block || !block->transactions) {
return false;
}
bool hasCoinbase = false;
uint64_t totalFees = 0;
uint8_t zeroAddress[32] = {0};
for (size_t i = 0; i < DynArr_size(block->transactions); ++i) {
signed_transaction_t* tx = (signed_transaction_t*)DynArr_at(block->transactions, i);
if (!tx) {
return false;
}
if (Address_IsCoinbase(tx->transaction.senderAddress)) {
if (hasCoinbase) {
return false;
}
hasCoinbase = true;
if (!Transaction_Verify(tx)) {
return false;
}
if (tx->transaction.fee != 0 || tx->transaction.amount2 != 0) {
return false;
}
if (tx->transaction.amount1 != expectedCoinbaseAmount) {
return false;
}
if (Address_IsCoinbase(tx->transaction.recipientAddress1)) {
return false;
}
if (memcmp(tx->transaction.recipientAddress2, zeroAddress, sizeof(zeroAddress)) != 0) {
return false;
}
continue;
}
if (!Transaction_Verify(tx)) {
return false;
}
if (UINT64_MAX - totalFees < tx->transaction.fee) {
return false;
}
totalFees += tx->transaction.fee;
}
if (!hasCoinbase) {
return false;
}
if (outTotalFees) {
*outTotalFees = totalFees;
}
return true;
}
bool Block_IsFullyValid(const block_t* block) {
bool merkleValid = false;
uint8_t calculatedMerkleRoot[32];
if (block && block->transactions) {
Block_CalculateMerkleRoot(block, calculatedMerkleRoot);
merkleValid = (memcmp(calculatedMerkleRoot, block->header.merkleRoot, 32) == 0);
}
return Block_HasValidProofOfWork(block) && Block_AllTransactionsValid(block) && DynArr_size(block->transactions) > 0 && merkleValid;
}
void Block_Destroy(block_t* block) { void Block_Destroy(block_t* block) {
if (!block) return; if (!block) return;
DynArr_destroy(block->transactions); DynArr_destroy(block->transactions);
@@ -274,3 +350,48 @@ void Block_Print(const block_t* block) {
printf("No transactions (or none loaded)\n"); printf("No transactions (or none loaded)\n");
} }
} }
void Block_ShortPrint(const block_t* block) {
if (!block) return;
printf("Block #%llu: Timestamp %llu, Nonce %llu, DiffTarget 0x%08x, Version %u, PrevHash %02x%02x...%02x%02x, MerkleRoot %02x%02x...%02x%02x, TxCount %zu\n",
(unsigned long long)block->header.blockNumber,
(unsigned long long)block->header.timestamp,
(unsigned long long)block->header.nonce,
block->header.difficultyTarget,
block->header.version,
block->header.prevHash[0], block->header.prevHash[1], block->header.prevHash[30], block->header.prevHash[31],
block->header.merkleRoot[0], block->header.merkleRoot[1], block->header.merkleRoot[30], block->header.merkleRoot[31],
block->transactions ? DynArr_size(block->transactions) : 0);
}
block_t* Block_Copy(const block_t* src) {
if (!src) return NULL;
block_t* dst = (block_t*)malloc(sizeof(block_t));
if (!dst) return NULL;
dst->header = src->header;
if (src->transactions) {
size_t txCount = DynArr_size(src->transactions);
dst->transactions = DYNARR_CREATE(signed_transaction_t, txCount == 0 ? 1 : txCount);
if (!dst->transactions) {
free(dst);
return NULL;
}
for (size_t i = 0; i < txCount; ++i) {
signed_transaction_t* tx = (signed_transaction_t*)DynArr_at(src->transactions, i);
if (!tx) {
DynArr_destroy(dst->transactions);
free(dst);
return NULL;
}
if (!DynArr_push_back(dst->transactions, tx)) {
DynArr_destroy(dst->transactions);
free(dst);
return NULL;
}
}
} else {
dst->transactions = NULL;
}
return dst;
}

View File

@@ -1,8 +1,11 @@
#include <block/chain.h> #include <block/chain.h>
#include <constants.h> #include <constants.h>
#include <runtime_state.h>
#include <txmempool.h>
#include <errno.h> #include <errno.h>
#include <limits.h> #include <limits.h>
#include <sys/stat.h> #include <sys/stat.h>
#include <pthread.h>
uint64_t currentBlockHeight = 0; uint64_t currentBlockHeight = 0;
@@ -95,6 +98,37 @@ static bool DebitAddress(const uint8_t address[32], const uint256_t* amount) {
return BalanceSheet_Insert(entry) >= 0; return BalanceSheet_Insert(entry) >= 0;
} }
bool Chain_RecomputeRuntimeState(blockchain_t* chain) {
if (!chain) {
return false;
}
uint256_t rebuiltSupply = uint256_from_u64(0);
for (size_t i = 0; i < chain->size; ++i) {
block_t* blk = (block_t*)DynArr_at(chain->blocks, i);
if (!blk || !blk->transactions) {
return false;
}
for (size_t j = 0; j < DynArr_size(blk->transactions); ++j) {
signed_transaction_t* tx = (signed_transaction_t*)DynArr_at(blk->transactions, j);
if (!tx) {
return false;
}
if (Address_IsCoinbase(tx->transaction.senderAddress)) {
if (uint256_add_u64(&rebuiltSupply, tx->transaction.amount1)) {
return false;
}
}
}
}
currentSupply = rebuiltSupply;
currentReward = CalculateBlockReward(currentSupply, chain);
return true;
}
static void Chain_ClearBlocks(blockchain_t* chain) { static void Chain_ClearBlocks(blockchain_t* chain) {
if (!chain || !chain->blocks) { if (!chain || !chain->blocks) {
return; return;
@@ -135,6 +169,8 @@ void Chain_Destroy(blockchain_t* chain) {
} }
bool Chain_AddBlock(blockchain_t* chain, block_t* block) { bool Chain_AddBlock(blockchain_t* chain, block_t* block) {
bool ok = true;
if (!chain || !block || !chain->blocks) { if (!chain || !block || !chain->blocks) {
return false; return false;
} }
@@ -143,96 +179,209 @@ bool Chain_AddBlock(blockchain_t* chain, block_t* block) {
return false; return false;
} }
// First pass: ensure all non-coinbase senders can cover the full spend // Acquire global write locks to protect chain and balance sheet mutations.
// (amount1 + amount2 + fee) before mutating the chain or balance sheet. pthread_rwlock_wrlock(&chainLock);
size_t txCount = DynArr_size(block->transactions); pthread_mutex_lock(&balanceSheetLock);
for (size_t i = 0; i < txCount; ++i) {
signed_transaction_t* tx = (signed_transaction_t*)DynArr_at(block->transactions, i);
if (!tx) {
return false;
}
if (Address_IsCoinbase(tx->transaction.senderAddress)) { // Ensure the incoming block's header.blockNumber matches the index it will be appended at.
continue; size_t expectedIndex = DynArr_size(chain->blocks);
} if (block->header.blockNumber != expectedIndex) {
// Mismatched block number; reject to avoid duplicate indices or inconsistent headers.
uint256_t spend; pthread_mutex_unlock(&balanceSheetLock);
if (!BuildSpendAmount(tx, &spend)) { pthread_rwlock_unlock(&chainLock);
return false;
}
balance_sheet_entry_t senderEntry;
if (!BalanceSheet_Lookup(tx->transaction.senderAddress, &senderEntry)) {
fprintf(stderr, "Error: Sender address not found in balance sheet during block addition. Bailing!\n");
return false;
}
if (uint256_cmp(&senderEntry.balance, &spend) < 0) {
fprintf(stderr, "Error: Sender balance insufficient for block transaction. Bailing!\n");
return false;
}
}
// Push the block only after validation succeeds.
block_t* blk = (block_t*)DynArr_push_back(chain->blocks, block);
if (!blk) {
return false; return false;
} }
chain->size++;
currentBlockHeight = (uint64_t)(chain->size - 1);
// Second pass: apply the ledger changes. do {
if (blk->transactions) { size_t txCount = DynArr_size(block->transactions);
txCount = DynArr_size(blk->transactions); signed_transaction_t* candidateTxs = (signed_transaction_t*)calloc(txCount, sizeof(signed_transaction_t));
if (!candidateTxs) {
ok = false;
break;
}
size_t nonCoinbaseCount = 0;
for (size_t i = 0; i < txCount; ++i) { for (size_t i = 0; i < txCount; ++i) {
signed_transaction_t* tx = (signed_transaction_t*)DynArr_at(blk->transactions, i); signed_transaction_t* tx = (signed_transaction_t*)DynArr_at(block->transactions, i);
if (!tx) { if (!tx) {
continue; ok = false;
break;
} }
candidateTxs[i] = *tx;
if (!Address_IsCoinbase(tx->transaction.senderAddress)) { if (!Address_IsCoinbase(tx->transaction.senderAddress)) {
uint256_t spend; ++nonCoinbaseCount;
if (!BuildSpendAmount(tx, &spend) || !DebitAddress(tx->transaction.senderAddress, &spend)) { }
fprintf(stderr, "Error: Failed to debit sender balance during block addition. Bailing!\n"); }
return false;
if (!ok) {
free(candidateTxs);
break;
}
signed_transaction_t* spendableTxs = NULL;
size_t spendableCount = 0;
uint64_t totalFees = 0;
if (!BalanceSheet_SelectSpendableTransactions(candidateTxs, txCount, &spendableTxs, &spendableCount, &totalFees)) {
free(candidateTxs);
ok = false;
break;
}
free(candidateTxs);
if (spendableCount != nonCoinbaseCount) {
free(spendableTxs);
ok = false;
break;
}
uint64_t expectedCoinbaseAmount = currentReward;
if (UINT64_MAX - expectedCoinbaseAmount < totalFees) {
free(spendableTxs);
ok = false;
break;
}
expectedCoinbaseAmount += totalFees;
// Debug: log expected coinbase and fees to aid diagnosis when nodes disagree
{
uint64_t cbAmount = 0;
if (block->transactions && DynArr_size(block->transactions) > 0) {
signed_transaction_t* firstTx = (signed_transaction_t*)DynArr_at(block->transactions, 0);
if (firstTx && Address_IsCoinbase(firstTx->transaction.senderAddress)) {
cbAmount = firstTx->transaction.amount1;
}
} }
char supplyStr[80];
Uint256ToDecimal(&currentSupply, supplyStr, sizeof(supplyStr));
printf("Chain_AddBlock: blockIndex=%zu expectedCoinbase=%llu totalFees=%llu observedBlockCoinbase=%llu currentReward=%llu currentSupply=%s\n",
expectedIndex,
(unsigned long long)expectedCoinbaseAmount,
(unsigned long long)totalFees,
(unsigned long long)cbAmount,
(unsigned long long)currentReward,
supplyStr);
} }
if (!CreditAddress(tx->transaction.recipientAddress1, tx->transaction.amount1)) { uint64_t observedFees = 0;
fprintf(stderr, "Error: Failed to credit recipient1 balance during block addition. Bailing!\n"); if (!Block_ValidateCoinbaseAndFees(block, expectedCoinbaseAmount, &observedFees) || observedFees != totalFees) {
return false; // Log mismatch details for debugging
} printf("Chain_AddBlock: validation failed: expectedCoinbase=%llu totalFees=%llu observedFees=%llu\n",
(unsigned long long)expectedCoinbaseAmount,
(unsigned long long)totalFees,
(unsigned long long)observedFees);
free(spendableTxs);
ok = false;
break;
}
if (tx->transaction.amount2 > 0) { free(spendableTxs);
uint8_t zeroAddress[32] = {0};
if (memcmp(tx->transaction.recipientAddress2, zeroAddress, 32) == 0) { // Push the block only after validation succeeds.
fprintf(stderr, "Error: amount2 is non-zero but recipient2 is empty during block addition. Bailing!\n"); block_t* blk = (block_t*)DynArr_push_back(chain->blocks, block);
return false; if (!blk) { ok = false; break; }
chain->size++;
currentBlockHeight = (uint64_t)(chain->size - 1);
// Second pass: apply the ledger changes.
if (blk->transactions) {
txCount = DynArr_size(blk->transactions);
for (size_t i = 0; i < txCount; ++i) {
signed_transaction_t* tx = (signed_transaction_t*)DynArr_at(blk->transactions, i);
if (!tx) {
continue;
} }
if (!CreditAddress(tx->transaction.recipientAddress2, tx->transaction.amount2)) { if (!Address_IsCoinbase(tx->transaction.senderAddress)) {
fprintf(stderr, "Error: Failed to credit recipient2 balance during block addition. Bailing!\n"); uint256_t spend;
return false; if (!BuildSpendAmount(tx, &spend) || !DebitAddress(tx->transaction.senderAddress, &spend)) {
fprintf(stderr, "Error: Failed to debit sender balance during block addition. Bailing!\n");
ok = false; break;
}
}
if (!CreditAddress(tx->transaction.recipientAddress1, tx->transaction.amount1)) {
fprintf(stderr, "Error: Failed to credit recipient1 balance during block addition. Bailing!\n");
ok = false; break;
}
if (tx->transaction.amount2 > 0) {
uint8_t zeroAddress[32] = {0};
if (memcmp(tx->transaction.recipientAddress2, zeroAddress, 32) == 0) {
fprintf(stderr, "Error: amount2 is non-zero but recipient2 is empty during block addition. Bailing!\n");
ok = false; break;
}
if (!CreditAddress(tx->transaction.recipientAddress2, tx->transaction.amount2)) {
fprintf(stderr, "Error: Failed to credit recipient2 balance during block addition. Bailing!\n");
ok = false; break;
}
} }
} }
} }
}
return true; // Remove mined non-coinbase transactions from the mempool so they are not re-mined or re-broadcast.
if (blk->transactions) {
for (size_t i = 0; i < DynArr_size(blk->transactions); ++i) {
signed_transaction_t* tx = (signed_transaction_t*)DynArr_at(blk->transactions, i);
if (!tx) continue;
if (Address_IsCoinbase(tx->transaction.senderAddress)) continue;
uint8_t txHash[32];
Transaction_CalculateHash(tx, txHash);
if (TxMempool_Remove(txHash)) {
// optional: log removal
// printf("TxMempool_Remove: removed tx from mempool: "); PrintHexBytes(txHash, 32); printf("\n");
}
}
}
// ok remains true if no failures
} while (0);
// Release locks
pthread_mutex_unlock(&balanceSheetLock);
pthread_rwlock_unlock(&chainLock);
printf("Added new block to chain:\n");
Block_ShortPrint(block);
/* Debug proof removed: coinbase == baseReward + totalFees was printed here during debugging. */
return ok;
} }
block_t* Chain_GetBlock(blockchain_t* chain, size_t index) { block_t* Chain_GetBlock(blockchain_t* chain, size_t index) {
if (chain) { if (!chain) return NULL;
return DynArr_at(chain->blocks, index); block_t* blk = NULL;
pthread_rwlock_rdlock(&chainLock);
blk = (block_t*)DynArr_at(chain->blocks, index);
pthread_rwlock_unlock(&chainLock);
return blk;
}
bool Chain_GetBlockCopy(blockchain_t* chain, size_t index, block_t** outCopy) {
if (!chain || !outCopy) return false;
*outCopy = NULL;
pthread_rwlock_rdlock(&chainLock);
block_t* src = (block_t*)DynArr_at(chain->blocks, index);
if (!src) {
pthread_rwlock_unlock(&chainLock);
return false;
} }
return NULL; block_t* copy = Block_Copy(src);
pthread_rwlock_unlock(&chainLock);
if (!copy) return false;
*outCopy = copy;
return true;
} }
size_t Chain_Size(blockchain_t* chain) { size_t Chain_Size(blockchain_t* chain) {
if (chain) { if (!chain) return 0;
return DynArr_size(chain->blocks); size_t sz = 0;
} pthread_rwlock_rdlock(&chainLock);
return 0; sz = DynArr_size(chain->blocks);
pthread_rwlock_unlock(&chainLock);
return sz;
} }
bool Chain_IsValid(blockchain_t* chain) { bool Chain_IsValid(blockchain_t* chain) {
@@ -271,6 +420,137 @@ bool Chain_IsValid(blockchain_t* chain) {
return true; return true;
} }
bool Chain_RollbackToHeight(blockchain_t* chain, size_t height) {
if (!chain || !chain->blocks) return false;
pthread_rwlock_wrlock(&chainLock);
pthread_mutex_lock(&balanceSheetLock);
size_t cur = DynArr_size(chain->blocks);
if (height >= cur) {
pthread_mutex_unlock(&balanceSheetLock);
pthread_rwlock_unlock(&chainLock);
return true; // nothing to do
}
// Remove blocks above height
for (size_t i = cur; i > height; --i) {
size_t idx = i - 1;
block_t* blk = (block_t*)DynArr_at(chain->blocks, idx);
if (blk && blk->transactions) {
DynArr_destroy(blk->transactions);
blk->transactions = NULL;
}
DynArr_remove(chain->blocks, idx);
}
chain->size = DynArr_size(chain->blocks);
currentBlockHeight = chain->size ? (uint64_t)(chain->size - 1) : 0ULL;
// Rebuild balance sheet from scratch up to current chain size
BalanceSheet_Destroy();
BalanceSheet_Init();
for (size_t i = 0; i < chain->size; ++i) {
block_t* blk = (block_t*)DynArr_at(chain->blocks, i);
block_t* toProcess = blk;
bool loaded = false;
if (!blk || !blk->transactions) {
// Try to load from disk
block_t* loadedBlk = NULL;
size_t txCount = 0;
if (!Chain_LoadBlockFromFile(chainDataDir, (uint64_t)i, true, &loadedBlk, &txCount)) {
// Can't rebuild without transactions
pthread_mutex_unlock(&balanceSheetLock);
pthread_rwlock_unlock(&chainLock);
return false;
}
toProcess = loadedBlk;
loaded = true;
}
// Apply transactions
if (toProcess && toProcess->transactions) {
size_t txCount = DynArr_size(toProcess->transactions);
for (size_t ti = 0; ti < txCount; ++ti) {
signed_transaction_t* tx = (signed_transaction_t*)DynArr_at(toProcess->transactions, ti);
if (!tx) continue;
// Coinbase credit
if (Address_IsCoinbase(tx->transaction.senderAddress)) {
balance_sheet_entry_t entry;
if (!BalanceSheet_Lookup(tx->transaction.recipientAddress1, &entry)) {
memset(&entry, 0, sizeof(entry));
memcpy(entry.address, tx->transaction.recipientAddress1, 32);
entry.balance = uint256_from_u64(tx->transaction.amount1);
} else {
(void)uint256_add_u64(&entry.balance, tx->transaction.amount1);
}
(void)BalanceSheet_Insert(entry);
continue;
}
// Non-coinbase: debit sender
uint256_t spend = uint256_from_u64(0);
(void)uint256_add_u64(&spend, tx->transaction.amount1);
if (tx->transaction.amount2 > 0) (void)uint256_add_u64(&spend, tx->transaction.amount2);
if (tx->transaction.fee > 0) (void)uint256_add_u64(&spend, tx->transaction.fee);
balance_sheet_entry_t senderEntry;
if (!BalanceSheet_Lookup(tx->transaction.senderAddress, &senderEntry)) {
// Missing sender; create zero and then subtract (will underflow if invalid)
memset(&senderEntry, 0, sizeof(senderEntry));
memcpy(senderEntry.address, tx->transaction.senderAddress, 32);
senderEntry.balance = uint256_from_u64(0);
}
(void)uint256_subtract(&senderEntry.balance, &spend);
(void)BalanceSheet_Insert(senderEntry);
// Credit recipient1
balance_sheet_entry_t rec1;
if (!BalanceSheet_Lookup(tx->transaction.recipientAddress1, &rec1)) {
memset(&rec1, 0, sizeof(rec1));
memcpy(rec1.address, tx->transaction.recipientAddress1, 32);
rec1.balance = uint256_from_u64(tx->transaction.amount1);
} else {
(void)uint256_add_u64(&rec1.balance, tx->transaction.amount1);
}
(void)BalanceSheet_Insert(rec1);
// Credit recipient2 if any
if (tx->transaction.amount2 > 0) {
balance_sheet_entry_t rec2;
if (!BalanceSheet_Lookup(tx->transaction.recipientAddress2, &rec2)) {
memset(&rec2, 0, sizeof(rec2));
memcpy(rec2.address, tx->transaction.recipientAddress2, 32);
rec2.balance = uint256_from_u64(tx->transaction.amount2);
} else {
(void)uint256_add_u64(&rec2.balance, tx->transaction.amount2);
}
(void)BalanceSheet_Insert(rec2);
}
}
}
if (loaded && toProcess) {
if (toProcess->transactions) DynArr_destroy(toProcess->transactions);
free(toProcess);
}
}
if (!Chain_RecomputeRuntimeState(chain)) {
pthread_mutex_unlock(&balanceSheetLock);
pthread_rwlock_unlock(&chainLock);
return false;
}
pthread_mutex_unlock(&balanceSheetLock);
pthread_rwlock_unlock(&chainLock);
return true;
}
void Chain_Wipe(blockchain_t* chain) { void Chain_Wipe(blockchain_t* chain) {
Chain_ClearBlocks(chain); Chain_ClearBlocks(chain);
currentBlockHeight = 0; currentBlockHeight = 0;
@@ -298,112 +578,134 @@ bool Chain_SaveToFile(blockchain_t* chain, const char* dirpath, uint256_t curren
if (!BuildPath(tablePath, sizeof(tablePath), dirpath, "chain.table")) { if (!BuildPath(tablePath, sizeof(tablePath), dirpath, "chain.table")) {
return false; return false;
} }
// Find metadata file (create if not exists) to get the saved chain size (+ other things) char metaTmpPath[512];
FILE* metaFile = fopen(metaPath, "rb+"); char chainTmpPath[512];
FILE* chainFile = fopen(chainPath, "rb+"); char tableTmpPath[512];
FILE* tableFile = fopen(tablePath, "rb+"); if (!BuildPath(metaTmpPath, sizeof(metaTmpPath), dirpath, "chain.meta.tmp") ||
!BuildPath(chainTmpPath, sizeof(chainTmpPath), dirpath, "chain.data.tmp") ||
!BuildPath(tableTmpPath, sizeof(tableTmpPath), dirpath, "chain.table.tmp")) {
return false;
}
pthread_rwlock_wrlock(&chainLock);
FILE* metaFile = fopen(metaTmpPath, "wb+");
FILE* chainFile = fopen(chainTmpPath, "wb+");
FILE* tableFile = fopen(tableTmpPath, "wb+");
if (!metaFile || !chainFile || !tableFile) { if (!metaFile || !chainFile || !tableFile) {
// Just overwrite everything if (metaFile) fclose(metaFile);
metaFile = fopen(metaPath, "wb+"); if (chainFile) fclose(chainFile);
if (!metaFile) { return false; } if (tableFile) fclose(tableFile);
pthread_rwlock_unlock(&chainLock);
// Initialize metadata with size 0 remove(metaTmpPath);
size_t initialSize = 0; remove(chainTmpPath);
fwrite(&initialSize, sizeof(size_t), 1, metaFile); remove(tableTmpPath);
// Write last block hash (32 bytes of zeros for now)
uint8_t zeroHash[32] = {0};
fwrite(zeroHash, sizeof(uint8_t), 32, metaFile);
uint256_t zeroSupply = {0};
fwrite(&zeroSupply, sizeof(uint256_t), 1, metaFile);
uint32_t initialTarget = INITIAL_DIFFICULTY;
fwrite(&initialTarget, sizeof(uint32_t), 1, metaFile);
uint64_t initialReward = 0;
fwrite(&initialReward, sizeof(uint64_t), 1, metaFile);
chainFile = fopen(chainPath, "wb+");
if (!chainFile) { return false; }
tableFile = fopen(tablePath, "wb+");
if (!tableFile) { return false; }
// TODO: Potentially some other things here, we'll see
}
// Read
size_t savedSize = 0;
fread(&savedSize, sizeof(size_t), 1, metaFile);
uint8_t lastSavedHash[32];
fread(lastSavedHash, sizeof(uint8_t), 32, metaFile);
// Assume chain saved is valid, and that the chain in memory is valid (as LoadFromFile will verify the saved one)
if (savedSize > DynArr_size(chain->blocks)) {
// Saved chain is longer than current chain, this should not happen if we are always saving the current chain, but just in case, fail to save to avoid overwriting a potentially valid longer chain with a shorter one.
fclose(metaFile);
fclose(chainFile);
fclose(tableFile);
return false; return false;
} }
// Filename format: dirpath/chain.data const size_t chainSize = DynArr_size(chain->blocks);
// File format: ([block_header][num_transactions][transactions...])[*length] - since block_header is fixed size, LoadFromFile will only read those by default uint64_t byteCount = 0;
for (size_t i = 0; i < chainSize; ++i) {
fseek(chainFile, 0, SEEK_END); // Seek to the end of those files
fseek(tableFile, 0, SEEK_END);
long pos = ftell(chainFile);
if (pos < 0) {
fclose(metaFile);
fclose(chainFile);
fclose(tableFile);
return false;
}
uint64_t byteCount = (uint64_t)pos; // Get the size
// Save blocks that are not yet saved
for (size_t i = savedSize; i < DynArr_size(chain->blocks); i++) {
block_t* blk = (block_t*)DynArr_at(chain->blocks, i); block_t* blk = (block_t*)DynArr_at(chain->blocks, i);
if (!blk) { if (!blk) {
fclose(metaFile); fclose(metaFile);
fclose(chainFile); fclose(chainFile);
fclose(tableFile); fclose(tableFile);
pthread_rwlock_unlock(&chainLock);
remove(metaTmpPath);
remove(chainTmpPath);
remove(tableTmpPath);
return false; return false;
} }
uint64_t preIncrementByteSize = byteCount; block_t* diskCopy = blk;
bool loadedTemp = false;
// Construct file path if (!diskCopy->transactions) {
// Write block header if (!Chain_LoadBlockFromFile(dirpath, (uint64_t)i, true, &diskCopy, NULL) || !diskCopy || !diskCopy->transactions) {
fwrite(&blk->header, sizeof(block_header_t), 1, chainFile); if (loadedTemp && diskCopy) {
size_t txSize = DynArr_size(blk->transactions); Block_Destroy(diskCopy);
fwrite(&txSize, sizeof(size_t), 1, chainFile); // Write number of transactions }
byteCount += sizeof(block_header_t) + sizeof(size_t);
// Write transactions
for (size_t j = 0; j < txSize; j++) {
signed_transaction_t* tx = (signed_transaction_t*)DynArr_at(blk->transactions, j);
if (fwrite(tx, sizeof(signed_transaction_t), 1, chainFile) != 1) {
fclose(chainFile);
fclose(metaFile); fclose(metaFile);
fclose(chainFile);
fclose(tableFile); fclose(tableFile);
pthread_rwlock_unlock(&chainLock);
remove(metaTmpPath);
remove(chainTmpPath);
remove(tableTmpPath);
return false; return false;
} }
loadedTemp = true;
}
const uint64_t blockStart = byteCount;
if (fwrite(&diskCopy->header, sizeof(block_header_t), 1, chainFile) != 1) {
if (loadedTemp) Block_Destroy(diskCopy);
fclose(metaFile);
fclose(chainFile);
fclose(tableFile);
pthread_rwlock_unlock(&chainLock);
remove(metaTmpPath);
remove(chainTmpPath);
remove(tableTmpPath);
return false;
}
const size_t txSize = DynArr_size(diskCopy->transactions);
if (fwrite(&txSize, sizeof(size_t), 1, chainFile) != 1) {
if (loadedTemp) Block_Destroy(diskCopy);
fclose(metaFile);
fclose(chainFile);
fclose(tableFile);
pthread_rwlock_unlock(&chainLock);
remove(metaTmpPath);
remove(chainTmpPath);
remove(tableTmpPath);
return false;
}
byteCount += sizeof(block_header_t) + sizeof(size_t);
for (size_t j = 0; j < txSize; ++j) {
signed_transaction_t* tx = (signed_transaction_t*)DynArr_at(diskCopy->transactions, j);
if (!tx || fwrite(tx, sizeof(signed_transaction_t), 1, chainFile) != 1) {
if (loadedTemp) Block_Destroy(diskCopy);
fclose(metaFile);
fclose(chainFile);
fclose(tableFile);
pthread_rwlock_unlock(&chainLock);
remove(metaTmpPath);
remove(chainTmpPath);
remove(tableTmpPath);
return false;
}
byteCount += sizeof(signed_transaction_t); byteCount += sizeof(signed_transaction_t);
} }
// Create an entry in the block table
block_table_entry_t entry; block_table_entry_t entry;
entry.blockNumber = i; entry.blockNumber = i;
entry.byteNumber = preIncrementByteSize; entry.byteNumber = blockStart;
entry.blockSize = byteCount - preIncrementByteSize; entry.blockSize = byteCount - blockStart;
fwrite(&entry, sizeof(block_table_entry_t), 1, tableFile); if (fwrite(&entry, sizeof(block_table_entry_t), 1, tableFile) != 1) {
if (loadedTemp) Block_Destroy(diskCopy);
fclose(metaFile);
fclose(chainFile);
fclose(tableFile);
pthread_rwlock_unlock(&chainLock);
remove(metaTmpPath);
remove(chainTmpPath);
remove(tableTmpPath);
return false;
}
DynArr_destroy(blk->transactions); if (loadedTemp) {
blk->transactions = NULL; // Clear transactions to save memory since they're now saved on disk Block_Destroy(diskCopy);
} else if (blk->transactions) {
DynArr_destroy(blk->transactions);
blk->transactions = NULL;
}
} }
// Update metadata with new size and last block hash size_t newSize = chainSize;
size_t newSize = DynArr_size(chain->blocks);
fseek(metaFile, 0, SEEK_SET); fseek(metaFile, 0, SEEK_SET);
fwrite(&newSize, sizeof(size_t), 1, metaFile); fwrite(&newSize, sizeof(size_t), 1, metaFile);
uint32_t difficultyTarget = INITIAL_DIFFICULTY; uint32_t difficultyTarget = INITIAL_DIFFICULTY;
@@ -421,16 +723,23 @@ bool Chain_SaveToFile(blockchain_t* chain, const char* dirpath, uint256_t curren
fwrite(&difficultyTarget, sizeof(uint32_t), 1, metaFile); fwrite(&difficultyTarget, sizeof(uint32_t), 1, metaFile);
fwrite(&currentReward, sizeof(uint64_t), 1, metaFile); fwrite(&currentReward, sizeof(uint64_t), 1, metaFile);
// Safety
fflush(metaFile); fflush(metaFile);
fflush(chainFile); fflush(chainFile);
fflush(tableFile); fflush(tableFile);
// Close all pointers
fclose(metaFile); fclose(metaFile);
fclose(chainFile); fclose(chainFile);
fclose(tableFile); fclose(tableFile);
if (rename(metaTmpPath, metaPath) != 0 || rename(chainTmpPath, chainPath) != 0 || rename(tableTmpPath, tablePath) != 0) {
pthread_rwlock_unlock(&chainLock);
remove(metaTmpPath);
remove(chainTmpPath);
remove(tableTmpPath);
return false;
}
pthread_rwlock_unlock(&chainLock);
return true; return true;
} }

View File

@@ -42,7 +42,23 @@ bool Transaction_Verify(const signed_transaction_t* tx) {
} }
if (Address_IsCoinbase(tx->transaction.senderAddress)) { if (Address_IsCoinbase(tx->transaction.senderAddress)) {
// Coinbase transactions are valid if the signature is correct for the block (handled in Block_Verify) if (tx->transaction.amount1 == 0) {
return false;
}
if (tx->transaction.amount2 != 0) {
return false;
}
if (Address_IsCoinbase(tx->transaction.recipientAddress1) || Address_IsCoinbase(tx->transaction.recipientAddress2)) {
return false;
}
uint8_t zeroAddress[32] = {0};
if (memcmp(tx->transaction.recipientAddress2, zeroAddress, 32) != 0) {
return false;
}
return true; return true;
} }

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,24 @@
#include <nets/fetch_scheduler.h>
#include <constants.h>
#include <math.h>
// Note: floating point is used intentionally here for readability and
// because the final penalty is rounded to whole blocks. This keeps the
// implementation straightforward while avoiding subtle integer overflow
// for large exponents. If desired, replace with fixed-point arithmetic.
uint64_t FetchScheduler_ComputeReorgPenaltyBlocks(uint64_t delayBlocks) {
if (delayBlocks <= REORG_PENALTY_GRACE_BLOCKS) {
return 0ULL;
}
double B = (double)delayBlocks;
double factor = REORG_PENALTY_FACTOR;
double exp = REORG_PENALTY_EXPONENT;
double timeScale = ((double)TARGET_BLOCK_TIME) / REORG_PENALTY_REF_BLOCK_TIME;
double raw = factor * pow(B, exp) * timeScale;
if (raw < 0.0) raw = 0.0;
uint64_t penalty = (uint64_t)ceil(raw);
return penalty;
}

View File

@@ -4,6 +4,15 @@
#include <stdlib.h> #include <stdlib.h>
#include <string.h> #include <string.h>
#include <runtime_state.h>
#include <balance_sheet.h>
#include <inttypes.h>
#include <nets/orphan_pool.h>
#include <inttypes.h>
#include <pthread.h>
#include <unistd.h>
#include <txmempool.h>
static net_node_t* Node_FromConnection(tcp_connection_t* conn) { static net_node_t* Node_FromConnection(tcp_connection_t* conn) {
if (!conn) { if (!conn) {
return NULL; return NULL;
@@ -12,6 +21,37 @@ static net_node_t* Node_FromConnection(tcp_connection_t* conn) {
return (net_node_t*)conn->owner; return (net_node_t*)conn->owner;
} }
static uint64_t Node_GetCurrentBlockHeight(void) {
if (currentChain) {
return (uint64_t)Chain_Size(currentChain);
}
return currentBlockHeight;
}
typedef enum {
NODE_BLOCK_REJECTED = 0,
NODE_BLOCK_ORPHAN_QUEUED = 1,
NODE_BLOCK_ACCEPTED = 2
} node_block_accept_result_t;
static void* Node_MaintenanceThread(void* arg) {
net_node_t* n = (net_node_t*)arg;
if (!n) return NULL;
while (n->maintenanceRunning) {
if (currentChain) {
size_t attached = OrphanPool_AttemptAttach(currentChain);
if (attached > 0) {
printf("Maintenance: attached %zu orphan(s)\n", attached);
Chain_SaveToFile(currentChain, chainDataDir, currentSupply, currentReward);
BalanceSheet_SaveToFile(chainDataDir);
}
}
sleep_for_milliseconds((uint64_t)n->maintenanceIntervalMs);
}
return NULL;
}
static int Node_DecodePacket(const tcp_connection_t* conn, packet_type_t* outType, const unsigned char** outPayload, size_t* outPayloadLen) { static int Node_DecodePacket(const tcp_connection_t* conn, packet_type_t* outType, const unsigned char** outPayload, size_t* outPayloadLen) {
if (!conn || !outType || !outPayload || !outPayloadLen || conn->dataBufLen < 1 || !conn->dataBuf) { if (!conn || !outType || !outPayload || !outPayloadLen || conn->dataBufLen < 1 || !conn->dataBuf) {
return -1; return -1;
@@ -28,6 +68,148 @@ static int Node_DecodePacket(const tcp_connection_t* conn, packet_type_t* outTyp
return 0; return 0;
} }
static node_block_accept_result_t Node_ParseAndAcceptBlock(const unsigned char* payload, size_t payloadLen, bool persist) {
if (!payload) { return NODE_BLOCK_REJECTED; }
size_t offset = 0;
if (payloadLen < sizeof(uint64_t) + sizeof(block_header_t) + sizeof(uint64_t)) { return NODE_BLOCK_REJECTED; }
uint64_t blockHeight = 0;
memcpy(&blockHeight, payload + offset, sizeof(blockHeight));
offset += sizeof(blockHeight);
block_t* blk = (block_t*)calloc(1, sizeof(block_t));
if (!blk) { return NODE_BLOCK_REJECTED; }
memcpy(&blk->header, payload + offset, sizeof(blk->header));
blk->header.blockNumber = blockHeight;
offset += sizeof(blk->header);
uint64_t txCount = 0;
memcpy(&txCount, payload + offset, sizeof(txCount));
offset += sizeof(txCount);
blk->transactions = DYNARR_CREATE(signed_transaction_t, txCount == 0 ? 1 : (size_t)txCount);
if (!blk->transactions) { free(blk); return NODE_BLOCK_REJECTED; }
for (uint64_t i = 0; i < txCount; ++i) {
if (offset + sizeof(signed_transaction_t) > payloadLen) {
DynArr_destroy(blk->transactions);
free(blk);
return NODE_BLOCK_REJECTED;
}
signed_transaction_t tx;
memcpy(&tx, payload + offset, sizeof(tx));
offset += sizeof(tx);
if (!DynArr_push_back(blk->transactions, &tx)) {
DynArr_destroy(blk->transactions);
free(blk);
return NODE_BLOCK_REJECTED;
}
}
// Validate block
if (!Block_IsFullyValid(blk)) {
printf("Rejected BLOCK_DATA at height %" PRIu64 " during validation\n", blockHeight);
DynArr_destroy(blk->transactions);
free(blk);
return NODE_BLOCK_REJECTED;
}
if (!currentChain) {
printf("Rejected BLOCK_DATA at height %" PRIu64 ": no active chain\n", blockHeight);
DynArr_destroy(blk->transactions);
free(blk);
return NODE_BLOCK_REJECTED;
}
// Temporary debug mode: force network-received blocks through the orphan pool to exercise reorg handling.
if (forceOrphanReorgEnabled && blk->header.blockNumber > 0) {
OrphanPool_Insert(blk, blockHeight);
printf("Forced orphan BLOCK_DATA at height %" PRIu64 "\n", blockHeight);
return NODE_BLOCK_ORPHAN_QUEUED;
}
// If parent is missing, insert into orphan pool instead of rejecting immediately.
uint64_t chainSize = Chain_Size(currentChain);
if (blk->header.blockNumber > chainSize) {
// Parent(s) missing; queue as orphan
OrphanPool_Insert(blk, blockHeight);
printf("Queued orphan BLOCK_DATA at height %" PRIu64 "\n", blockHeight);
return NODE_BLOCK_ORPHAN_QUEUED;
} else if (blk->header.blockNumber < chainSize) {
// Older block than current chain tip: reject
printf("Rejected BLOCK_DATA at height %" PRIu64 ": older than current chain\n", blockHeight);
DynArr_destroy(blk->transactions);
free(blk);
return NODE_BLOCK_REJECTED;
} else {
// blk->header.blockNumber == chainSize -> candidate to append. Ensure prevHash matches current tip.
if (chainSize > 0) {
block_t* last = NULL;
if (!Chain_GetBlockCopy(currentChain, (size_t)(chainSize - 1), &last) || !last) {
// Can't verify parent; queue as orphan conservatively
OrphanPool_Insert(blk, blockHeight);
printf("Queued orphan BLOCK_DATA at height %" PRIu64 " (unable to verify parent)\n", blockHeight);
if (last) Block_Destroy(last);
return NODE_BLOCK_ORPHAN_QUEUED;
}
uint8_t lastHash[32];
Block_CalculateHash(last, lastHash);
if (memcmp(lastHash, blk->header.prevHash, 32) != 0) {
// Conflicting block at same height; queue as orphan until resolved by a subsequent extension.
OrphanPool_Insert(blk, blockHeight);
Block_Destroy(last);
printf("Queued conflicting BLOCK_DATA at same height %" PRIu64 " as orphan\n", blockHeight);
return NODE_BLOCK_ORPHAN_QUEUED;
}
Block_Destroy(last);
}
}
if (!Chain_AddBlock(currentChain, blk)) {
// Chain_AddBlock failed; cleanup
printf("Rejected BLOCK_DATA at height %" PRIu64 " during chain add\n", blockHeight);
if (blk->transactions) {
DynArr_destroy(blk->transactions);
}
free(blk);
return NODE_BLOCK_REJECTED;
}
uint64_t coinbaseAmount = 0;
if (blk->transactions) {
for (size_t i = 0; i < DynArr_size(blk->transactions); ++i) {
signed_transaction_t* tx = (signed_transaction_t*)DynArr_at(blk->transactions, i);
if (tx && Address_IsCoinbase(tx->transaction.senderAddress)) {
coinbaseAmount = tx->transaction.amount1;
break;
}
}
}
(void)uint256_add_u64(&currentSupply, coinbaseAmount);
currentReward = CalculateBlockReward(currentSupply, currentChain);
// Persist on accept if requested
if (persist) {
Chain_SaveToFile(currentChain, chainDataDir, currentSupply, currentReward);
BalanceSheet_SaveToFile(chainDataDir);
}
// Chain_AddBlock copied the block into the chain; free our temporary wrapper but do NOT destroy transactions (they are freed by Chain_SaveToFile when persisted)
free(blk);
// Attempt to attach any orphans that may now have their parents present.
size_t attached = OrphanPool_AttemptAttach(currentChain);
if (attached > 0) {
printf("Attached %zu orphan(s) after accepting block\n", attached);
// Persist after attaching orphans
Chain_SaveToFile(currentChain, chainDataDir, currentSupply, currentReward);
BalanceSheet_SaveToFile(chainDataDir);
}
return NODE_BLOCK_ACCEPTED;
}
static void Node_ForwardConnect(net_node_t* node, tcp_connection_t* conn) { static void Node_ForwardConnect(net_node_t* node, tcp_connection_t* conn) {
if (node && node->on_connect) { if (node && node->on_connect) {
node->on_connect(conn, node->callbackUser); node->on_connect(conn, node->callbackUser);
@@ -67,7 +249,13 @@ net_node_t* Node_Create() {
} }
} }
TcpServer_Init(node->server, LISTEN_PORT, "0.0.0.0"); // Initialize outbound lock and seen-block cache
pthread_mutex_init(&node->seenLock, NULL);
pthread_mutex_init(&node->outboundLock, NULL);
node->seenBlocks = DynSet_Create(32); // 32-byte canonical hashes
TxMempool_Init();
TcpServer_Init(node->server, listenPort, "0.0.0.0");
node->server->owner = node; node->server->owner = node;
node->server->on_connect = Node_Server_OnConnect; node->server->on_connect = Node_Server_OnConnect;
@@ -76,6 +264,16 @@ net_node_t* Node_Create() {
TcpServer_Start(node->server, MAX_CONS); TcpServer_Start(node->server, MAX_CONS);
OrphanPool_Init();
// Start maintenance thread
node->maintenanceRunning = 1;
node->maintenanceIntervalMs = 1000; // 1s
if (pthread_create(&node->maintenanceThread, NULL, Node_MaintenanceThread, node) != 0) {
// Failed to start maintenance thread; continue without it
node->maintenanceRunning = 0;
}
return node; return node;
} }
@@ -94,6 +292,22 @@ void Node_Destroy(net_node_t* node) {
TcpServer_Destroy(node->server); TcpServer_Destroy(node->server);
} }
// Stop maintenance thread
if (node->maintenanceRunning) {
node->maintenanceRunning = 0;
pthread_join(node->maintenanceThread, NULL);
}
OrphanPool_Destroy();
TxMempool_Destroy();
if (node->seenBlocks) {
DynSet_Destroy(node->seenBlocks);
node->seenBlocks = NULL;
}
pthread_mutex_destroy(&node->seenLock);
pthread_mutex_destroy(&node->outboundLock);
free(node); free(node);
} }
@@ -187,10 +401,65 @@ int Node_SendPacket(net_node_t* node, tcp_connection_t* conn, packet_type_t pack
return rc; return rc;
} }
int Node_BroadcastTransaction(net_node_t* node, signed_transaction_t* tx, tcp_connection_t* excludeNode) {
if (!node || !tx) {
return -1;
}
// Serialize transaction into payload
size_t payloadLen = sizeof(signed_transaction_t);
unsigned char* payload = (unsigned char*)malloc(payloadLen);
if (!payload) {
return -1;
}
memcpy(payload, tx, sizeof(signed_transaction_t));
// Broadcast to all outbound peers
pthread_mutex_lock(&node->outboundLock);
for (size_t i = 0; i < MAX_CONS; ++i) {
tcp_connection_t* connection = node->outboundClients[i].connection;
if (connection && connection != excludeNode) {
(void)Node_SendPacket(node, connection, PACKET_TYPE_BROADCAST_TX, payload, payloadLen);
}
}
pthread_mutex_unlock(&node->outboundLock);
free(payload);
return 0;
}
void Node_Server_OnConnect(tcp_connection_t* client) { void Node_Server_OnConnect(tcp_connection_t* client) {
net_node_t* node = Node_FromConnection(client); net_node_t* node = Node_FromConnection(client);
Node_ForwardConnect(node, client); Node_ForwardConnect(node, client);
printf("Inbound node connected: %u\n", client ? client->connectionId : 0U); printf("Inbound node connected: %u\n", client ? client->connectionId : 0U);
if (echoPeersEnabled && node && client) {
// Attempt to create an outbound connection back to the peer's IP on our configured port.
// We avoid connecting if we already have an outbound to the same IP.
char ipbuf[INET_ADDRSTRLEN];
if (inet_ntop(AF_INET, &client->peerAddr.sin_addr, ipbuf, sizeof(ipbuf))) {
// Use the configured port as the target port for the peer's listening service.
unsigned short targetPort = listenPort;
int shouldConnect = 1;
pthread_mutex_lock(&node->outboundLock);
for (size_t i = 0; i < MAX_CONS; ++i) {
if (node->outboundClients[i].connection) {
struct in_addr otherAddr = node->outboundClients[i].connection->peerAddr.sin_addr;
if (otherAddr.s_addr == client->peerAddr.sin_addr.s_addr) {
shouldConnect = 0;
break;
}
}
}
pthread_mutex_unlock(&node->outboundLock);
if (shouldConnect) {
// Try to connect; ignore failure silently
(void)Node_ConnectPeer(node, ipbuf, targetPort);
}
}
}
} }
void Node_Server_OnData(tcp_connection_t* client) { void Node_Server_OnData(tcp_connection_t* client) {
@@ -224,8 +493,9 @@ void Node_Server_OnData(tcp_connection_t* client) {
size_t ackOffset = 0; size_t ackOffset = 0;
memcpy(ackData + ackOffset, &protoVersion, sizeof(protoVersion)); memcpy(ackData + ackOffset, &protoVersion, sizeof(protoVersion));
ackOffset += sizeof(protoVersion); ackOffset += sizeof(protoVersion);
memcpy(ackData + ackOffset, &currentBlockHeight, sizeof(currentBlockHeight)); uint64_t currentHeight = Node_GetCurrentBlockHeight();
ackOffset += sizeof(currentBlockHeight); memcpy(ackData + ackOffset, &currentHeight, sizeof(currentHeight));
ackOffset += sizeof(currentHeight);
Node_SendPacket(Node_FromConnection(client), client, PACKET_TYPE_ACK_HELLO, ackData, ackOffset); Node_SendPacket(Node_FromConnection(client), client, PACKET_TYPE_ACK_HELLO, ackData, ackOffset);
@@ -241,11 +511,174 @@ void Node_Server_OnData(tcp_connection_t* client) {
TcpConnection_RequestClose(client); TcpConnection_RequestClose(client);
return; return;
} }
case PACKET_TYPE_FETCH_BLOCK: case PACKET_TYPE_FETCH_BLOCK: {
case PACKET_TYPE_BLOCK_DATA: // Decode FETCH_BLOCK - payload is the block height as uint64_t
case PACKET_TYPE_BROADCAST_BLOCK: if (payloadLen != sizeof(uint64_t)) {
return;
}
uint64_t requestedHeight;
memcpy(&requestedHeight, payload, sizeof(requestedHeight));
printf("Received FETCH_BLOCK for height %" PRIu64 " from node %u\n", requestedHeight, client ? client->connectionId : 0U);
if (requestedHeight > Node_GetCurrentBlockHeight()) {
printf("Requested block height %" PRIu64 " is higher than current height, ignoring\n", requestedHeight);
// Error the client, but don't kill
const char* msg = "Requested block height is higher than my current height!";
Node_SendPacket(Node_FromConnection(client), client, PACKET_TYPE_ERROR, msg, strlen(msg));
return;
}
// Find the block (deep-copy it for safe access)
block_t* block = NULL;
bool loadedFromDisk = false;
if (!Chain_GetBlockCopy(currentChain, (size_t)requestedHeight, &block) || !block) {
// Try loading from disk directly
if (!Chain_LoadBlockFromFile(chainDataDir, requestedHeight, true, &block, NULL) || !block) {
printf("Requested block height %" PRIu64 " not found, ignoring\n", requestedHeight);
const char* msg = "Requested block not found!";
Node_SendPacket(Node_FromConnection(client), client, PACKET_TYPE_ERROR, msg, strlen(msg));
return;
}
loadedFromDisk = true;
} else if (!block->transactions) {
// In-memory chain may be compacted to headers only after persistence.
block_t* fullBlock = NULL;
if (Chain_LoadBlockFromFile(chainDataDir, requestedHeight, true, &fullBlock, NULL) && fullBlock) {
Block_Destroy(block);
block = fullBlock;
loadedFromDisk = true;
}
}
if (!block || !block->transactions) {
printf("Requested block height %" PRIu64 " has no transaction data available\n", requestedHeight);
const char* msg = "Requested block missing transactions!";
Node_SendPacket(Node_FromConnection(client), client, PACKET_TYPE_ERROR, msg, strlen(msg));
if (block) {
Block_Destroy(block);
}
return;
}
if (loadedFromDisk) {
printf("Serving block %" PRIu64 " from disk with %zu transaction(s)\n",
requestedHeight,
DynArr_size(block->transactions));
}
// Serialize into a BLOCK_DATA packet [block header][tx count - 8 bytes][transactions...]
size_t txCount = block->transactions ? DynArr_size(block->transactions) : 0;
size_t blockDataSize = sizeof(uint64_t) + sizeof(block_header_t) + sizeof(uint64_t) + (txCount * sizeof(signed_transaction_t));
unsigned char* blockData = (unsigned char*)malloc(blockDataSize);
if (!blockData) {
// Generic error response
printf("Failed to allocate memory for block data response to node %u\n", client ? client->connectionId : 0U);
const char* msg = "Generic error for block data!";
Node_SendPacket(Node_FromConnection(client), client, PACKET_TYPE_ERROR, msg, strlen(msg));
Block_Destroy(block);
return;
}
size_t offset = 0;
// Write height first
uint64_t heightLE = requestedHeight;
memcpy(blockData + offset, &heightLE, sizeof(heightLE));
offset += sizeof(heightLE);
memcpy(blockData + offset, &block->header, sizeof(block_header_t));
offset += sizeof(block_header_t);
uint64_t txCount64 = (uint64_t)txCount;
memcpy(blockData + offset, &txCount64, sizeof(txCount64));
offset += sizeof(txCount64);
if (block->transactions && txCount > 0) {
for (size_t ti = 0; ti < txCount; ++ti) {
signed_transaction_t* tx = (signed_transaction_t*)DynArr_at(block->transactions, ti);
memcpy(blockData + offset, tx, sizeof(signed_transaction_t));
offset += sizeof(signed_transaction_t);
}
}
// Send the block data
Node_SendPacket(Node_FromConnection(client), client, PACKET_TYPE_BLOCK_DATA, blockData, offset);
free(blockData);
Block_Destroy(block);
break;
}
case PACKET_TYPE_BLOCK_DATA: {
// Server can't receive these!
printf("Received unexpected packet type %u from node %u\n", (unsigned int)packetType, client ? client->connectionId : 0U);
// Send the error and kill the connection
const char* msg = "You can't send me BLOCK_DATA! I'm a server!";
Node_SendPacket(Node_FromConnection(client), client, PACKET_TYPE_ERROR, msg, strlen(msg));
TcpConnection_RequestClose(client);
return;
}
case PACKET_TYPE_BROADCAST_BLOCK: {
// Accept broadcast blocks from peers and try to append
if (payloadLen >= sizeof(uint64_t)) {
uint64_t blockHeight = 0;
memcpy(&blockHeight, payload, sizeof(blockHeight));
node_block_accept_result_t result = Node_ParseAndAcceptBlock(payload, payloadLen, true);
if (result == NODE_BLOCK_ACCEPTED) {
printf("Accepted BROADCAST_BLOCK from node %u\n", client ? client->connectionId : 0U);
net_node_t* node = Node_FromConnection(client);
if (node) {
Node_BroadcastChainRange(node, (size_t)blockHeight, client);
}
} else if (result == NODE_BLOCK_ORPHAN_QUEUED) {
printf("Queued orphan BROADCAST_BLOCK from node %u\n", client ? client->connectionId : 0U);
} else {
printf("Rejected BROADCAST_BLOCK from node %u\n", client ? client->connectionId : 0U);
}
}
break;
}
case PACKET_TYPE_ACK_BLOCK: case PACKET_TYPE_ACK_BLOCK:
case PACKET_TYPE_BROADCAST_TX: case PACKET_TYPE_BROADCAST_TX: {
// Decode the block or transaction data inside
if (payloadLen == sizeof(signed_transaction_t)) {
signed_transaction_t tx;
memcpy(&tx, payload, sizeof(tx));
uint8_t txHash[32];
char txHashHex[65];
Transaction_CalculateHash(&tx, txHash);
to_hex(txHash, txHashHex);
printf("Received packet type %u from node %u with transaction sending %llu pebble(s)\n",
(unsigned int)packetType, client ? client->connectionId : 0U, (unsigned long long)tx.transaction.amount1);
if (!Transaction_Verify(&tx)) {
printf("Received invalid transaction from node %u\n", client ? client->connectionId : 0U);
return;
}
// Push to mempool if it's not already present
if (!TxMempool_Lookup(txHash, &tx)) {
if (TxMempool_Insert(tx) >= 0) {
printf("Added transaction %s from node %u to mempool\n", txHashHex, client ? client->connectionId : 0U);
// Broadcast to other peers
net_node_t* node = Node_FromConnection(client);
if (node) {
Node_BroadcastTransaction(node, &tx, client);
}
} else {
printf("Failed to add transaction %s from node %u to mempool\n", txHashHex, client ? client->connectionId : 0U);
}
} else {
printf("Transaction %s from node %u already seen!\n", txHashHex, client ? client->connectionId : 0U);
}
} else {
printf("Received packet type %u from node %u with invalid payload length %zu\n",
(unsigned int)packetType, client ? client->connectionId : 0U, payloadLen);
// TODO: Ignoring for now, might error node later if we want to be strict about malformed messages
}
break;
}
case PACKET_TYPE_ACK_TX: case PACKET_TYPE_ACK_TX:
case PACKET_TYPE_ERROR: { case PACKET_TYPE_ERROR: {
// Decode the message inside as text // Decode the message inside as text
@@ -287,7 +720,7 @@ void Node_Client_OnConnect(tcp_connection_t* client) {
size_t offset = 0; size_t offset = 0;
uint32_t protoVersion = 1; // little-endian uint32_t protoVersion = 1; // little-endian
uint64_t blockHeight = currentBlockHeight; uint64_t blockHeight = Node_GetCurrentBlockHeight();
memcpy((unsigned char*)data + offset, &protoVersion, sizeof(protoVersion)); // This is technically "unsafe", but I honestly just don't give a shit at this point memcpy((unsigned char*)data + offset, &protoVersion, sizeof(protoVersion)); // This is technically "unsafe", but I honestly just don't give a shit at this point
offset += sizeof(protoVersion); offset += sizeof(protoVersion);
memcpy((unsigned char*)data + offset, &blockHeight, sizeof(blockHeight)); memcpy((unsigned char*)data + offset, &blockHeight, sizeof(blockHeight));
@@ -328,14 +761,101 @@ void Node_Client_OnData(tcp_connection_t* client) {
memcpy(&protoVersion, payload, sizeof(protoVersion)); memcpy(&protoVersion, payload, sizeof(protoVersion));
memcpy(&blockHeight, payload + sizeof(protoVersion), sizeof(blockHeight)); memcpy(&blockHeight, payload + sizeof(protoVersion), sizeof(blockHeight));
printf("Received ACK_HELLO from node %u with protoVersion %u and blockHeight %lu\n", client ? client->connectionId : 0U, protoVersion, (unsigned long)blockHeight); printf("Received ACK_HELLO from node %u with protoVersion %u and blockHeight %" PRIu64 "\n", client ? client->connectionId : 0U, protoVersion, blockHeight);
// Store peer-advertised height on matching outbound client
net_node_t* node = Node_FromConnection(client);
if (node) {
pthread_mutex_lock(&node->outboundLock);
for (size_t i = 0; i < MAX_CONS; ++i) {
if (node->outboundClients[i].connection == client) {
node->outboundClients[i].peerBlockHeight = blockHeight;
break;
}
}
pthread_mutex_unlock(&node->outboundLock);
}
break;
}
case PACKET_TYPE_FETCH_BLOCK: {
// A client can't serve a block!
printf("Received unexpected FETCH_BLOCK packet from node %u\n", client ? client->connectionId : 0U);
// Send the error and kill the connection (this might be too aggressive)
const char* msg = "You can't FETCH_BLOCK from me! I'm a client!";
Node_SendPacket(Node_FromConnection(client), client, PACKET_TYPE_ERROR, msg, strlen(msg));
TcpConnection_RequestClose(client);
return;
}
case PACKET_TYPE_BLOCK_DATA: {
if (payloadLen >= sizeof(uint64_t)) {
uint64_t blockHeight = 0;
memcpy(&blockHeight, payload, sizeof(blockHeight));
node_block_accept_result_t result = Node_ParseAndAcceptBlock(payload, payloadLen, true);
if (result == NODE_BLOCK_ACCEPTED) {
printf("Accepted BLOCK_DATA from node %u\n", client ? client->connectionId : 0U);
net_node_t* node = Node_FromConnection(client);
if (node) {
// Update peer advertised height
pthread_mutex_lock(&node->outboundLock);
for (size_t i = 0; i < MAX_CONS; ++i) {
if (node->outboundClients[i].connection == client) {
if (node->outboundClients[i].peerBlockHeight < blockHeight) {
node->outboundClients[i].peerBlockHeight = blockHeight;
}
break;
}
}
pthread_mutex_unlock(&node->outboundLock);
Node_BroadcastChainRange(node, (size_t)blockHeight, client);
}
} else if (result == NODE_BLOCK_ORPHAN_QUEUED) {
printf("Queued orphan BLOCK_DATA from node %u\n", client ? client->connectionId : 0U);
} else {
printf("Rejected BLOCK_DATA from node %u\n", client ? client->connectionId : 0U);
}
}
break;
}
case PACKET_TYPE_BROADCAST_BLOCK: {
if (payloadLen >= sizeof(uint64_t)) {
uint64_t blockHeight = 0;
memcpy(&blockHeight, payload, sizeof(blockHeight));
node_block_accept_result_t result = Node_ParseAndAcceptBlock(payload, payloadLen, true);
if (result == NODE_BLOCK_ACCEPTED) {
printf("Accepted BROADCAST_BLOCK from node %u\n", client ? client->connectionId : 0U);
net_node_t* node = Node_FromConnection(client);
if (node) {
// Update peer advertised height
pthread_mutex_lock(&node->outboundLock);
for (size_t i = 0; i < MAX_CONS; ++i) {
if (node->outboundClients[i].connection == client) {
if (node->outboundClients[i].peerBlockHeight < blockHeight) {
node->outboundClients[i].peerBlockHeight = blockHeight;
}
break;
}
}
pthread_mutex_unlock(&node->outboundLock);
Node_BroadcastChainRange(node, (size_t)blockHeight, client);
}
} else if (result == NODE_BLOCK_ORPHAN_QUEUED) {
printf("Queued orphan BROADCAST_BLOCK from node %u\n", client ? client->connectionId : 0U);
} else {
printf("Rejected BROADCAST_BLOCK from node %u\n", client ? client->connectionId : 0U);
}
}
break; break;
} }
case PACKET_TYPE_FETCH_BLOCK:
case PACKET_TYPE_BLOCK_DATA:
case PACKET_TYPE_BROADCAST_BLOCK:
case PACKET_TYPE_ACK_BLOCK: case PACKET_TYPE_ACK_BLOCK:
case PACKET_TYPE_BROADCAST_TX: case PACKET_TYPE_BROADCAST_TX: {
// Client can't receive these!
printf("Received unexpected packet type %u from node %u\n", (unsigned int)packetType, client ? client->connectionId : 0U);
break;
}
case PACKET_TYPE_ACK_TX: case PACKET_TYPE_ACK_TX:
case PACKET_TYPE_ERROR: { case PACKET_TYPE_ERROR: {
// Decode the message inside as text // Decode the message inside as text
@@ -361,10 +881,128 @@ void Node_Client_OnData(tcp_connection_t* client) {
void Node_Client_OnDisconnect(tcp_connection_t* client) { void Node_Client_OnDisconnect(tcp_connection_t* client) {
net_node_t* node = Node_FromConnection(client); net_node_t* node = Node_FromConnection(client);
if (node && node->outboundCount > 0) { if (node) {
node->outboundCount--; // Clear peer advertised height for this outbound slot
pthread_mutex_lock(&node->outboundLock);
for (size_t i = 0; i < MAX_CONS; ++i) {
if (node->outboundClients[i].connection == client) {
node->outboundClients[i].peerBlockHeight = 0;
break;
}
}
pthread_mutex_unlock(&node->outboundLock);
if (node->outboundCount > 0) {
node->outboundCount--;
}
} }
Node_ForwardDisconnect(node, client); Node_ForwardDisconnect(node, client);
printf("Outbound node disconnected: %u\n", client ? client->connectionId : 0U); printf("Outbound node disconnected: %u\n", client ? client->connectionId : 0U);
} }
int Node_GetBestOutboundPeer(net_node_t* node, tcp_connection_t** outConn, uint64_t* outHeight) {
if (!node || !outConn || !outHeight) return -1;
tcp_connection_t* best = NULL;
uint64_t bestH = 0;
pthread_mutex_lock(&node->outboundLock);
for (size_t i = 0; i < MAX_CONS; ++i) {
if (node->outboundClients[i].connection) {
if (node->outboundClients[i].peerBlockHeight > bestH || best == NULL) {
best = node->outboundClients[i].connection;
bestH = node->outboundClients[i].peerBlockHeight;
}
}
}
pthread_mutex_unlock(&node->outboundLock);
if (!best) return -1;
*outConn = best;
*outHeight = bestH;
return 0;
}
void Node_BroadcastChainRange(net_node_t* node, size_t startHeightInclusive, tcp_connection_t* sourceConn) {
if (!node || !currentChain) return;
size_t chainSize = Chain_Size(currentChain);
if (startHeightInclusive >= chainSize) return;
uint32_t sourceIp = 0;
if (sourceConn) {
sourceIp = sourceConn->peerAddr.sin_addr.s_addr;
}
for (size_t h = startHeightInclusive; h < chainSize; ++h) {
block_t* blk = NULL;
if (!Chain_GetBlockCopy(currentChain, h, &blk) || !blk) {
if (!Chain_LoadBlockFromFile(chainDataDir, h, true, &blk, NULL) || !blk) {
continue;
}
} else if (!blk->transactions) {
block_t* full = NULL;
if (Chain_LoadBlockFromFile(chainDataDir, h, true, &full, NULL) && full) {
Block_Destroy(blk);
blk = full;
}
}
if (!blk || !blk->transactions) {
if (blk) Block_Destroy(blk);
continue;
}
unsigned char hash[32];
Block_CalculateHash(blk, hash);
// Dedupe using seenBlocks
int seen = 0;
pthread_mutex_lock(&node->seenLock);
if (DynSet_Contains(node->seenBlocks, hash)) {
seen = 1;
} else {
DynSet_Insert(node->seenBlocks, hash);
}
pthread_mutex_unlock(&node->seenLock);
if (seen) {
Block_Destroy(blk);
continue;
}
// Serialize payload: [uint64_t height][block_header_t][uint64_t txCount][transactions...]
size_t txCount = DynArr_size(blk->transactions);
size_t payloadLen = sizeof(uint64_t) + sizeof(block_header_t) + sizeof(uint64_t) + (txCount * sizeof(signed_transaction_t));
unsigned char* payload = (unsigned char*)malloc(payloadLen);
if (!payload) {
Block_Destroy(blk);
continue;
}
size_t off = 0;
uint64_t h64 = (uint64_t)h;
memcpy(payload + off, &h64, sizeof(h64)); off += sizeof(h64);
memcpy(payload + off, &blk->header, sizeof(block_header_t)); off += sizeof(block_header_t);
uint64_t txCount64 = (uint64_t)txCount;
memcpy(payload + off, &txCount64, sizeof(txCount64)); off += sizeof(txCount64);
for (size_t ti = 0; ti < txCount; ++ti) {
signed_transaction_t* tx = (signed_transaction_t*)DynArr_at(blk->transactions, ti);
memcpy(payload + off, tx, sizeof(signed_transaction_t)); off += sizeof(signed_transaction_t);
}
// Snapshot outbound clients and send
pthread_mutex_lock(&node->outboundLock);
for (size_t i = 0; i < MAX_CONS; ++i) {
tcp_connection_t* conn = node->outboundClients[i].connection;
if (!conn) continue;
if (conn == sourceConn) continue;
if (sourceIp != 0 && conn->peerAddr.sin_addr.s_addr == sourceIp) continue;
Node_SendPacket(node, conn, PACKET_TYPE_BROADCAST_BLOCK, payload, off);
}
pthread_mutex_unlock(&node->outboundLock);
free(payload);
Block_Destroy(blk);
}
}

208
src/nets/orphan_pool.c Normal file
View File

@@ -0,0 +1,208 @@
#include <nets/orphan_pool.h>
#include <dynarr.h>
#include <stdlib.h>
#include <string.h>
#include <stdio.h>
typedef struct {
block_t* block;
uint64_t height;
} orphan_entry_t;
static DynArr* g_orphans = NULL;
void OrphanPool_Init(void) {
if (g_orphans) return;
g_orphans = DYNARR_CREATE(orphan_entry_t, 16);
}
void OrphanPool_Destroy(void) {
if (!g_orphans) return;
size_t n = DynArr_size(g_orphans);
for (size_t i = 0; i < n; ++i) {
orphan_entry_t* e = (orphan_entry_t*)DynArr_at(g_orphans, i);
if (e && e->block) {
Block_Destroy(e->block);
}
}
DynArr_destroy(g_orphans);
g_orphans = NULL;
}
void OrphanPool_Insert(block_t* block, uint64_t height) {
if (!block) return;
if (!g_orphans) OrphanPool_Init();
orphan_entry_t e;
e.block = block;
e.height = height;
(void)DynArr_push_back(g_orphans, &e);
}
static size_t OrphanPool_TryAdoptBranch(blockchain_t* chain, uint64_t forkHeight) {
if (!g_orphans || !chain) return 0;
DynArr* seq = DYNARR_CREATE(block_t*, 8);
if (!seq) return 0;
size_t cursor = forkHeight;
while (1) {
bool found = false;
size_t count = DynArr_size(g_orphans);
for (size_t i = 0; i < count; ++i) {
orphan_entry_t* entry = (orphan_entry_t*)DynArr_at(g_orphans, i);
if (!entry || !entry->block) continue;
if (entry->height == cursor) {
(void)DynArr_push_back(seq, &entry->block);
found = true;
break;
}
}
if (!found) break;
cursor++;
}
size_t seqCount = DynArr_size(seq);
if (seqCount == 0) {
DynArr_destroy(seq);
return 0;
}
size_t currentTipHeight = Chain_Size(chain) == 0 ? 0 : Chain_Size(chain) - 1;
size_t seqTopHeight = forkHeight + seqCount - 1;
if (seqTopHeight <= currentTipHeight) {
DynArr_destroy(seq);
return 0;
}
size_t rollbackHeight = (forkHeight == 0) ? 0 : (forkHeight - 1);
if (!Chain_RollbackToHeight(chain, rollbackHeight)) {
DynArr_destroy(seq);
return 0;
}
size_t attached = 0;
for (size_t i = 0; i < seqCount; ++i) {
block_t* bptr = *(block_t**)DynArr_at(seq, i);
if (!bptr || !Chain_AddBlock(chain, bptr)) {
break;
}
size_t count = DynArr_size(g_orphans);
for (size_t j = 0; j < count; ++j) {
orphan_entry_t* entry = (orphan_entry_t*)DynArr_at(g_orphans, j);
if (entry && entry->block == bptr) {
DynArr_remove(g_orphans, j);
break;
}
}
attached++;
}
DynArr_destroy(seq);
return attached;
}
size_t OrphanPool_AttemptAttach(blockchain_t* chain) {
if (!g_orphans || !chain) return 0;
size_t attached = 0;
bool madeProgress = true;
// Attempt repeatedly while progress is made (to handle chained orphans)
while (madeProgress) {
madeProgress = false;
size_t n = DynArr_size(g_orphans);
for (size_t i = 0; i < n; ++i) {
orphan_entry_t* e = (orphan_entry_t*)DynArr_at(g_orphans, i);
if (!e || !e->block) continue;
uint64_t parentIndex = (e->height == 0) ? (uint64_t)-1 : (e->height - 1);
bool parentExists = false;
if (e->height == 0) {
// genesis-style block: parent is zero-hash; accept if chain empty
parentExists = (Chain_Size(chain) == 0);
} else if (parentIndex < Chain_Size(chain)) {
block_t* parent = NULL;
if (Chain_GetBlockCopy(chain, (size_t)parentIndex, &parent) && parent) {
parentExists = true;
Block_Destroy(parent);
} else {
parentExists = false;
}
}
if (parentExists) {
if (e->height < Chain_Size(chain)) {
block_t* local = NULL;
if (Chain_GetBlockCopy(chain, (size_t)e->height, &local) && local) {
uint8_t localHash[32];
uint8_t orphanHash[32];
Block_CalculateHash(local, localHash);
Block_CalculateHash(e->block, orphanHash);
Block_Destroy(local);
if (memcmp(localHash, orphanHash, 32) != 0) {
size_t adopted = OrphanPool_TryAdoptBranch(chain, e->height);
if (adopted > 0) {
attached += adopted;
madeProgress = true;
n = DynArr_size(g_orphans);
i = (size_t)-1;
break;
}
}
} else if (local) {
Block_Destroy(local);
}
}
// Verify that the parent's hash matches the orphan's prevHash before attaching.
bool parentMatches = false;
if (e->height == 0) {
parentMatches = (Chain_Size(chain) == 0);
} else {
block_t* parent = NULL;
if (Chain_GetBlockCopy(chain, (size_t)parentIndex, &parent) && parent) {
uint8_t parentHash[32];
Block_CalculateHash(parent, parentHash);
parentMatches = (memcmp(parentHash, e->block->header.prevHash, 32) == 0);
Block_Destroy(parent);
} else {
parentMatches = false;
}
}
if (!parentMatches) {
// Parent exists but does not match this orphan's prevHash.
size_t adopted = OrphanPool_TryAdoptBranch(chain, e->height);
if (adopted > 0) {
attached += adopted;
madeProgress = true;
n = DynArr_size(g_orphans);
i = (size_t)-1;
break;
}
continue;
}
// Try to add to chain
if (Chain_AddBlock(chain, e->block)) {
attached++;
madeProgress = true;
// remove this entry
DynArr_remove(g_orphans, i);
// adjust indices
n = DynArr_size(g_orphans);
i = (size_t)-1; // reset outer loop
break;
} else {
// Keep the orphan around; rejection may be temporary while the local tip is being reorged.
continue;
}
}
}
}
return attached;
}

View File

@@ -9,6 +9,8 @@
#include <string.h> #include <string.h>
#include <sys/socket.h> #include <sys/socket.h>
#include <unistd.h> #include <unistd.h>
#include <fcntl.h>
#include <sys/select.h>
static void* TcpClient_ThreadProc(void* arg) { static void* TcpClient_ThreadProc(void* arg) {
tcp_client_t* client = (tcp_client_t*)arg; tcp_client_t* client = (tcp_client_t*)arg;
@@ -51,6 +53,7 @@ int TcpClient_Init(tcp_client_t* client) {
memset(client, 0, sizeof(*client)); memset(client, 0, sizeof(*client));
client->connection = NULL; client->connection = NULL;
client->peerBlockHeight = 0;
return 0; return 0;
} }
@@ -94,11 +97,52 @@ int TcpClient_Connect(
return -1; return -1;
} }
if (connect(sockFd, (struct sockaddr*)&peerAddr, sizeof(peerAddr)) < 0) { // Use non-blocking connect with a timeout to avoid long blocking in the CLI.
close(sockFd); int flags = fcntl(sockFd, F_GETFL, 0);
return -1; if (flags == -1) flags = 0;
fcntl(sockFd, F_SETFL, flags | O_NONBLOCK);
int rc = connect(sockFd, (struct sockaddr*)&peerAddr, sizeof(peerAddr));
if (rc < 0) {
if (errno != EINPROGRESS) {
close(sockFd);
return -1;
}
// Wait up to 5 seconds for the socket to become writable (connected)
struct timeval tv;
tv.tv_sec = 5;
tv.tv_usec = 0;
fd_set wfds;
FD_ZERO(&wfds);
FD_SET(sockFd, &wfds);
int sel = select(sockFd + 1, NULL, &wfds, NULL, &tv);
if (sel <= 0) {
// timeout or error
if (sel == 0) {
errno = ETIMEDOUT;
}
close(sockFd);
return -1;
}
// Check for socket error
int so_error = 0;
socklen_t len = sizeof(so_error);
if (getsockopt(sockFd, SOL_SOCKET, SO_ERROR, &so_error, &len) < 0) {
close(sockFd);
return -1;
}
if (so_error != 0) {
errno = so_error;
close(sockFd);
return -1;
}
} }
// Restore blocking mode
fcntl(sockFd, F_SETFL, flags & ~O_NONBLOCK);
tcp_connection_t* conn = (tcp_connection_t*)malloc(sizeof(*conn)); tcp_connection_t* conn = (tcp_connection_t*)malloc(sizeof(*conn));
if (!conn) { if (!conn) {
close(sockFd); close(sockFd);

View File

@@ -214,10 +214,26 @@ int TcpConnection_SendFramed(tcp_connection_t* conn, const void* payload, size_t
pthread_mutex_lock(&conn->sendLock); pthread_mutex_lock(&conn->sendLock);
#ifdef USE_IPV6
int sock;
if (conn->sockFd6 >= 0) {
// IPv6 is available, attempt to send on it. If it fails, we'll fall back to IPv4 if available.
sock = conn->sockFd6;
} else {
// IPv4 fallback
sock = conn->sockFd;
}
int rc = TcpConnection_SendRaw(sock, &beLen, sizeof(beLen));
if (rc == 0 && payloadLen > 0) {
rc = TcpConnection_SendRaw(sock, payload, payloadLen);
}
#else
int rc = TcpConnection_SendRaw(conn->sockFd, &beLen, sizeof(beLen)); int rc = TcpConnection_SendRaw(conn->sockFd, &beLen, sizeof(beLen));
if (rc == 0 && payloadLen > 0) { if (rc == 0 && payloadLen > 0) {
rc = TcpConnection_SendRaw(conn->sockFd, payload, payloadLen); rc = TcpConnection_SendRaw(conn->sockFd, payload, payloadLen);
} }
#endif
pthread_mutex_unlock(&conn->sendLock); pthread_mutex_unlock(&conn->sendLock);
@@ -235,6 +251,11 @@ void TcpConnection_RequestClose(tcp_connection_t* conn) {
if (conn->sockFd >= 0) { if (conn->sockFd >= 0) {
shutdown(conn->sockFd, SHUT_RDWR); shutdown(conn->sockFd, SHUT_RDWR);
} }
#ifdef USE_IPV6
if (conn->sockFd6 >= 0) {
shutdown(conn->sockFd6, SHUT_RDWR);
}
#endif
} }
pthread_mutex_unlock(&conn->stateLock); pthread_mutex_unlock(&conn->stateLock);
} }

View File

@@ -172,6 +172,9 @@ tcp_server_t* TcpServer_Create() {
svr->isRunning = 0; svr->isRunning = 0;
svr->maxClients = 0; svr->maxClients = 0;
svr->clientsArrPtr = NULL; svr->clientsArrPtr = NULL;
#ifdef USE_IPV6
svr->sockFd6 = -1;
#endif
if (pthread_mutex_init(&svr->clientsMutex, NULL) != 0) { if (pthread_mutex_init(&svr->clientsMutex, NULL) != 0) {
free(svr); free(svr);
@@ -217,6 +220,28 @@ void TcpServer_Init(tcp_server_t* ptr, unsigned short port, const char* addr) {
close(ptr->sockFd); close(ptr->sockFd);
ptr->sockFd = -1; ptr->sockFd = -1;
} }
#ifdef USE_IPV6
// IPv6 support
ptr->sockFd6 = socket(AF_INET6, SOCK_STREAM, 0);
if (ptr->sockFd6 >= 0) {
ptr->opt6 = 1;
setsockopt(ptr->sockFd6, SOL_SOCKET, SO_REUSEADDR, &ptr->opt6, sizeof(int));
memset(&ptr->addr6, 0, sizeof(ptr->addr6));
ptr->addr6.sin6_family = AF_INET6;
ptr->addr6.sin6_port = htons(port);
inet_pton(AF_INET6, addr, &ptr->addr6.sin6_addr);
if (bind(ptr->sockFd6, (struct sockaddr*)&ptr->addr6, sizeof(ptr->addr6)) < 0) {
close(ptr->sockFd6);
ptr->sockFd6 = -1;
}
} else {
ptr->sockFd6 = -1; // IPv6 is optional, so if it isn't available, we just set it to -1
}
#else
// Safety for my future "I forgot the ifdef guard" self
ptr->sockFd6 = -1; // IPv6 not supported in this build
#endif
} }
void TcpServer_Start(tcp_server_t* ptr, int maxcons) { void TcpServer_Start(tcp_server_t* ptr, int maxcons) {
@@ -228,6 +253,15 @@ void TcpServer_Start(tcp_server_t* ptr, int maxcons) {
return; return;
} }
#ifdef USE_IPV6
if (ptr->sockFd6 >= 0) {
if (listen(ptr->sockFd6, maxcons) < 0) {
close(ptr->sockFd6);
ptr->sockFd6 = -1;
}
}
#endif
pthread_mutex_lock(&ptr->clientsMutex); pthread_mutex_lock(&ptr->clientsMutex);
ptr->maxClients = (size_t)maxcons; ptr->maxClients = (size_t)maxcons;
@@ -268,6 +302,14 @@ void TcpServer_Stop(tcp_server_t* ptr) {
ptr->sockFd = -1; ptr->sockFd = -1;
} }
#ifdef USE_IPV6
if (ptr->sockFd6 >= 0) {
shutdown(ptr->sockFd6, SHUT_RDWR);
close(ptr->sockFd6);
ptr->sockFd6 = -1;
}
#endif
if (ptr->svrThread != 0 && !pthread_equal(ptr->svrThread, pthread_self())) { if (ptr->svrThread != 0 && !pthread_equal(ptr->svrThread, pthread_self())) {
pthread_join(ptr->svrThread, NULL); pthread_join(ptr->svrThread, NULL);
} }
@@ -339,6 +381,12 @@ void TcpServer_KillClient(tcp_server_t* ptr, tcp_connection_t* cli) {
so_linger.l_linger = 0; so_linger.l_linger = 0;
setsockopt(cli->sockFd, SOL_SOCKET, SO_LINGER, &so_linger, sizeof(so_linger)); setsockopt(cli->sockFd, SOL_SOCKET, SO_LINGER, &so_linger, sizeof(so_linger));
#ifdef USE_IPV6
if (cli->sockFd6 >= 0) {
setsockopt(cli->sockFd6, SOL_SOCKET, SO_LINGER, &so_linger, sizeof(so_linger));
}
#endif
TcpServer_Disconnect(ptr, cli); TcpServer_Disconnect(ptr, cli);
} }

View File

@@ -1,14 +1,21 @@
#include <txmempool.h> #include <txmempool.h>
#include <pthread.h>
static pthread_mutex_t g_txMempoolLock;
static bool g_txMempoolLockInitialized = false;
khash_t(tx_mempool_map_m)* txMempool = NULL; khash_t(tx_mempool_map_m)* txMempool = NULL;
void TxMempool_Init() { void TxMempool_Init() {
txMempool = kh_init(tx_mempool_map_m); txMempool = kh_init(tx_mempool_map_m);
pthread_mutex_init(&g_txMempoolLock, NULL);
g_txMempoolLockInitialized = true;
} }
int TxMempool_Insert(signed_transaction_t tx) { int TxMempool_Insert(signed_transaction_t tx) {
if (!txMempool) { return -1; } if (!txMempool) { return -1; }
pthread_mutex_lock(&g_txMempoolLock);
uint8_t txHash[32]; uint8_t txHash[32];
Transaction_CalculateHash(&tx, txHash); Transaction_CalculateHash(&tx, txHash);
@@ -18,17 +25,21 @@ int TxMempool_Insert(signed_transaction_t tx) {
int ret; int ret;
khiter_t k = kh_put(tx_mempool_map_m, txMempool, key, &ret); khiter_t k = kh_put(tx_mempool_map_m, txMempool, key, &ret);
if (k == kh_end(txMempool)) { if (k == kh_end(txMempool)) {
pthread_mutex_unlock(&g_txMempoolLock);
return -1; return -1;
} }
kh_value(txMempool, k) = tx; kh_value(txMempool, k) = tx;
pthread_mutex_unlock(&g_txMempoolLock);
return ret; return ret;
} }
bool TxMempool_Lookup(uint8_t* txHash, signed_transaction_t* out) { bool TxMempool_Lookup(uint8_t* txHash, signed_transaction_t* out) {
if (!txMempool || !txHash || !out) { return false; } if (!txMempool || !txHash || !out) { return false; }
pthread_mutex_lock(&g_txMempoolLock);
key32_t key; key32_t key;
memcpy(key.bytes, txHash, 32); memcpy(key.bytes, txHash, 32);
@@ -36,15 +47,65 @@ bool TxMempool_Lookup(uint8_t* txHash, signed_transaction_t* out) {
if (k != kh_end(txMempool)) { if (k != kh_end(txMempool)) {
signed_transaction_t tx = kh_value(txMempool, k); signed_transaction_t tx = kh_value(txMempool, k);
memcpy(out, &tx, sizeof(signed_transaction_t)); memcpy(out, &tx, sizeof(signed_transaction_t));
pthread_mutex_unlock(&g_txMempoolLock);
return true; return true;
} }
pthread_mutex_unlock(&g_txMempoolLock);
return false; return false;
} }
bool TxMempool_Snapshot(signed_transaction_t** outTxs, size_t* outCount) {
if (!outTxs || !outCount) {
return false;
}
*outTxs = NULL;
*outCount = 0;
if (!txMempool) {
return true;
}
pthread_mutex_lock(&g_txMempoolLock);
size_t count = 0;
khiter_t k;
for (k = kh_begin(txMempool); k != kh_end(txMempool); ++k) {
if (kh_exist(txMempool, k)) {
++count;
}
}
if (count == 0) {
pthread_mutex_unlock(&g_txMempoolLock);
return true;
}
signed_transaction_t* snapshot = (signed_transaction_t*)malloc(count * sizeof(signed_transaction_t));
if (!snapshot) {
pthread_mutex_unlock(&g_txMempoolLock);
return false;
}
size_t index = 0;
for (k = kh_begin(txMempool); k != kh_end(txMempool); ++k) {
if (kh_exist(txMempool, k)) {
snapshot[index++] = kh_value(txMempool, k);
}
}
pthread_mutex_unlock(&g_txMempoolLock);
*outTxs = snapshot;
*outCount = count;
return true;
}
void TxMempool_Print() { void TxMempool_Print() {
if (!txMempool) { return; } if (!txMempool) { return; }
pthread_mutex_lock(&g_txMempoolLock);
khiter_t k; khiter_t k;
for (k = kh_begin(txMempool); k != kh_end(txMempool); ++k) { for (k = kh_begin(txMempool); k != kh_end(txMempool); ++k) {
if (kh_exist(txMempool, k)) { if (kh_exist(txMempool, k)) {
@@ -62,10 +123,37 @@ void TxMempool_Print() {
(unsigned long long)tx.transaction.fee); (unsigned long long)tx.transaction.fee);
} }
} }
pthread_mutex_unlock(&g_txMempoolLock);
} }
void TxMempool_Destroy() { void TxMempool_Destroy() {
if (txMempool) { if (txMempool) {
pthread_mutex_lock(&g_txMempoolLock);
kh_destroy(tx_mempool_map_m, txMempool); kh_destroy(tx_mempool_map_m, txMempool);
txMempool = NULL;
pthread_mutex_unlock(&g_txMempoolLock);
}
if (g_txMempoolLockInitialized) {
pthread_mutex_destroy(&g_txMempoolLock);
g_txMempoolLockInitialized = false;
} }
} }
bool TxMempool_Remove(const uint8_t* txHash) {
if (!txMempool || !txHash) { return false; }
pthread_mutex_lock(&g_txMempoolLock);
key32_t key;
memcpy(key.bytes, txHash, 32);
khiter_t k = kh_get(tx_mempool_map_m, txMempool, key);
if (k == kh_end(txMempool)) {
pthread_mutex_unlock(&g_txMempoolLock);
return false;
}
kh_del(tx_mempool_map_m, txMempool, k);
pthread_mutex_unlock(&g_txMempoolLock);
return true;
}