todo update, forward block broadcasts, optional echo connect
This commit is contained in:
30
src/main.c
30
src/main.c
@@ -651,6 +651,11 @@ int main(int argc, char* argv[]) {
|
||||
|
||||
free(block); // Chain stores block by value and owns copied transaction array.
|
||||
|
||||
// Broadcast newly mined block to outbound peers
|
||||
if (node) {
|
||||
Node_BroadcastChainRange(node, Chain_Size(chain) - 1, NULL);
|
||||
}
|
||||
|
||||
if (i % 50 == 0) {
|
||||
// Mid-mine flush
|
||||
(void)FlushChainAndSheet(chain, chainDataDir, currentSupply, currentReward);
|
||||
@@ -733,6 +738,9 @@ int main(int argc, char* argv[]) {
|
||||
FlushChainAndSheet(chain, chainDataDir, currentSupply, currentReward);
|
||||
|
||||
free(block);
|
||||
if (node) {
|
||||
Node_BroadcastChainRange(node, Chain_Size(chain) - 1, NULL);
|
||||
}
|
||||
printf("send committed in mined block\n");
|
||||
continue;
|
||||
}
|
||||
@@ -744,24 +752,14 @@ int main(int argc, char* argv[]) {
|
||||
}
|
||||
|
||||
// Choose the best outbound peer by advertised height
|
||||
int bestIdx = -1;
|
||||
uint64_t bestHeight = 0;
|
||||
for (size_t i = 0; i < MAX_CONS; ++i) {
|
||||
if (node->outboundClients[i].connection) {
|
||||
if (node->outboundClients[i].peerBlockHeight > bestHeight) {
|
||||
bestHeight = node->outboundClients[i].peerBlockHeight;
|
||||
bestIdx = (int)i;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (bestIdx < 0) {
|
||||
tcp_connection_t* peerConn = NULL;
|
||||
uint64_t peerHeight = 0;
|
||||
if (Node_GetBestOutboundPeer(node, &peerConn, &peerHeight) != 0 || !peerConn) {
|
||||
printf("no outbound peers to sync from\n");
|
||||
continue;
|
||||
}
|
||||
|
||||
uint64_t localHeight = (uint64_t)Chain_Size(chain);
|
||||
uint64_t peerHeight = node->outboundClients[bestIdx].peerBlockHeight;
|
||||
|
||||
// Determine if this is an initial sync. If so, do not apply penalty.
|
||||
bool isInitialSync = (localHeight == 0);
|
||||
@@ -776,10 +774,8 @@ int main(int argc, char* argv[]) {
|
||||
continue;
|
||||
}
|
||||
|
||||
printf("syncing from peer %d: peerHeight=%" PRIu64 " adjusted=%" PRIu64 " local=%" PRIu64 " penalty=%" PRIu64 "\n",
|
||||
bestIdx, peerHeight, adjustedPeerHeight, localHeight, penalty);
|
||||
|
||||
tcp_connection_t* peerConn = node->outboundClients[bestIdx].connection;
|
||||
printf("syncing: peerHeight=%" PRIu64 " adjusted=%" PRIu64 " local=%" PRIu64 " penalty=%" PRIu64 "\n",
|
||||
peerHeight, adjustedPeerHeight, localHeight, penalty);
|
||||
|
||||
// Windowed parallel fetch
|
||||
uint64_t start = localHeight;
|
||||
|
||||
@@ -198,6 +198,11 @@ net_node_t* Node_Create() {
|
||||
}
|
||||
}
|
||||
|
||||
// Initialize outbound lock and seen-block cache
|
||||
pthread_mutex_init(&node->seenLock, NULL);
|
||||
pthread_mutex_init(&node->outboundLock, NULL);
|
||||
node->seenBlocks = DynSet_Create(32); // 32-byte canonical hashes
|
||||
|
||||
TcpServer_Init(node->server, LISTEN_PORT, "0.0.0.0");
|
||||
|
||||
node->server->owner = node;
|
||||
@@ -243,6 +248,13 @@ void Node_Destroy(net_node_t* node) {
|
||||
|
||||
OrphanPool_Destroy();
|
||||
|
||||
if (node->seenBlocks) {
|
||||
DynSet_Destroy(node->seenBlocks);
|
||||
node->seenBlocks = NULL;
|
||||
}
|
||||
pthread_mutex_destroy(&node->seenLock);
|
||||
pthread_mutex_destroy(&node->outboundLock);
|
||||
|
||||
free(node);
|
||||
}
|
||||
|
||||
@@ -340,6 +352,37 @@ void Node_Server_OnConnect(tcp_connection_t* client) {
|
||||
net_node_t* node = Node_FromConnection(client);
|
||||
Node_ForwardConnect(node, client);
|
||||
printf("Inbound node connected: %u\n", client ? client->connectionId : 0U);
|
||||
|
||||
#if ECHO_PEERS
|
||||
if (node && client) {
|
||||
// Attempt to create an outbound connection back to the peer's IP on our LISTEN_PORT.
|
||||
// We avoid connecting if we already have an outbound to the same IP.
|
||||
char ipbuf[INET_ADDRSTRLEN];
|
||||
if (inet_ntop(AF_INET, &client->peerAddr.sin_addr, ipbuf, sizeof(ipbuf))) {
|
||||
unsigned short peerPort = (unsigned short)ntohs(client->peerAddr.sin_port);
|
||||
// Use LISTEN_PORT as target port for peer's listening service, not the ephemeral source port.
|
||||
unsigned short targetPort = LISTEN_PORT;
|
||||
|
||||
int shouldConnect = 1;
|
||||
pthread_mutex_lock(&node->outboundLock);
|
||||
for (size_t i = 0; i < MAX_CONS; ++i) {
|
||||
if (node->outboundClients[i].connection) {
|
||||
struct in_addr otherAddr = node->outboundClients[i].connection->peerAddr.sin_addr;
|
||||
if (otherAddr.s_addr == client->peerAddr.sin_addr.s_addr) {
|
||||
shouldConnect = 0;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
pthread_mutex_unlock(&node->outboundLock);
|
||||
|
||||
if (shouldConnect) {
|
||||
// Try to connect; ignore failure silently
|
||||
(void)Node_ConnectPeer(node, ipbuf, targetPort);
|
||||
}
|
||||
}
|
||||
}
|
||||
#endif
|
||||
}
|
||||
|
||||
void Node_Server_OnData(tcp_connection_t* client) {
|
||||
@@ -498,10 +541,18 @@ void Node_Server_OnData(tcp_connection_t* client) {
|
||||
}
|
||||
case PACKET_TYPE_BROADCAST_BLOCK: {
|
||||
// Accept broadcast blocks from peers and try to append
|
||||
if (Node_ParseAndAcceptBlock(payload, payloadLen, true)) {
|
||||
printf("Accepted BROADCAST_BLOCK from node %u\n", client ? client->connectionId : 0U);
|
||||
} else {
|
||||
printf("Rejected BROADCAST_BLOCK from node %u\n", client ? client->connectionId : 0U);
|
||||
if (payloadLen >= sizeof(uint64_t)) {
|
||||
uint64_t blockHeight = 0;
|
||||
memcpy(&blockHeight, payload, sizeof(blockHeight));
|
||||
if (Node_ParseAndAcceptBlock(payload, payloadLen, true)) {
|
||||
printf("Accepted BROADCAST_BLOCK from node %u\n", client ? client->connectionId : 0U);
|
||||
net_node_t* node = Node_FromConnection(client);
|
||||
if (node) {
|
||||
Node_BroadcastChainRange(node, (size_t)blockHeight, client);
|
||||
}
|
||||
} else {
|
||||
printf("Rejected BROADCAST_BLOCK from node %u\n", client ? client->connectionId : 0U);
|
||||
}
|
||||
}
|
||||
break;
|
||||
}
|
||||
@@ -594,12 +645,14 @@ void Node_Client_OnData(tcp_connection_t* client) {
|
||||
// Store peer-advertised height on matching outbound client
|
||||
net_node_t* node = Node_FromConnection(client);
|
||||
if (node) {
|
||||
pthread_mutex_lock(&node->outboundLock);
|
||||
for (size_t i = 0; i < MAX_CONS; ++i) {
|
||||
if (node->outboundClients[i].connection == client) {
|
||||
node->outboundClients[i].peerBlockHeight = blockHeight;
|
||||
break;
|
||||
}
|
||||
}
|
||||
pthread_mutex_unlock(&node->outboundLock);
|
||||
}
|
||||
break;
|
||||
}
|
||||
@@ -614,18 +667,58 @@ void Node_Client_OnData(tcp_connection_t* client) {
|
||||
return;
|
||||
}
|
||||
case PACKET_TYPE_BLOCK_DATA: {
|
||||
if (Node_ParseAndAcceptBlock(payload, payloadLen, true)) {
|
||||
printf("Accepted BLOCK_DATA from node %u\n", client ? client->connectionId : 0U);
|
||||
} else {
|
||||
printf("Rejected BLOCK_DATA from node %u\n", client ? client->connectionId : 0U);
|
||||
if (payloadLen >= sizeof(uint64_t)) {
|
||||
uint64_t blockHeight = 0;
|
||||
memcpy(&blockHeight, payload, sizeof(blockHeight));
|
||||
if (Node_ParseAndAcceptBlock(payload, payloadLen, true)) {
|
||||
printf("Accepted BLOCK_DATA from node %u\n", client ? client->connectionId : 0U);
|
||||
net_node_t* node = Node_FromConnection(client);
|
||||
if (node) {
|
||||
// Update peer advertised height
|
||||
pthread_mutex_lock(&node->outboundLock);
|
||||
for (size_t i = 0; i < MAX_CONS; ++i) {
|
||||
if (node->outboundClients[i].connection == client) {
|
||||
if (node->outboundClients[i].peerBlockHeight < blockHeight) {
|
||||
node->outboundClients[i].peerBlockHeight = blockHeight;
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
pthread_mutex_unlock(&node->outboundLock);
|
||||
|
||||
Node_BroadcastChainRange(node, (size_t)blockHeight, client);
|
||||
}
|
||||
} else {
|
||||
printf("Rejected BLOCK_DATA from node %u\n", client ? client->connectionId : 0U);
|
||||
}
|
||||
}
|
||||
break;
|
||||
}
|
||||
case PACKET_TYPE_BROADCAST_BLOCK: {
|
||||
if (Node_ParseAndAcceptBlock(payload, payloadLen, true)) {
|
||||
printf("Accepted BROADCAST_BLOCK from node %u\n", client ? client->connectionId : 0U);
|
||||
} else {
|
||||
printf("Rejected BROADCAST_BLOCK from node %u\n", client ? client->connectionId : 0U);
|
||||
if (payloadLen >= sizeof(uint64_t)) {
|
||||
uint64_t blockHeight = 0;
|
||||
memcpy(&blockHeight, payload, sizeof(blockHeight));
|
||||
if (Node_ParseAndAcceptBlock(payload, payloadLen, true)) {
|
||||
printf("Accepted BROADCAST_BLOCK from node %u\n", client ? client->connectionId : 0U);
|
||||
net_node_t* node = Node_FromConnection(client);
|
||||
if (node) {
|
||||
// Update peer advertised height
|
||||
pthread_mutex_lock(&node->outboundLock);
|
||||
for (size_t i = 0; i < MAX_CONS; ++i) {
|
||||
if (node->outboundClients[i].connection == client) {
|
||||
if (node->outboundClients[i].peerBlockHeight < blockHeight) {
|
||||
node->outboundClients[i].peerBlockHeight = blockHeight;
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
pthread_mutex_unlock(&node->outboundLock);
|
||||
|
||||
Node_BroadcastChainRange(node, (size_t)blockHeight, client);
|
||||
}
|
||||
} else {
|
||||
printf("Rejected BROADCAST_BLOCK from node %u\n", client ? client->connectionId : 0U);
|
||||
}
|
||||
}
|
||||
break;
|
||||
}
|
||||
@@ -656,10 +749,125 @@ void Node_Client_OnData(tcp_connection_t* client) {
|
||||
|
||||
void Node_Client_OnDisconnect(tcp_connection_t* client) {
|
||||
net_node_t* node = Node_FromConnection(client);
|
||||
if (node && node->outboundCount > 0) {
|
||||
node->outboundCount--;
|
||||
if (node) {
|
||||
// Clear peer advertised height for this outbound slot
|
||||
pthread_mutex_lock(&node->outboundLock);
|
||||
for (size_t i = 0; i < MAX_CONS; ++i) {
|
||||
if (node->outboundClients[i].connection == client) {
|
||||
node->outboundClients[i].peerBlockHeight = 0;
|
||||
break;
|
||||
}
|
||||
}
|
||||
pthread_mutex_unlock(&node->outboundLock);
|
||||
|
||||
if (node->outboundCount > 0) {
|
||||
node->outboundCount--;
|
||||
}
|
||||
}
|
||||
|
||||
Node_ForwardDisconnect(node, client);
|
||||
printf("Outbound node disconnected: %u\n", client ? client->connectionId : 0U);
|
||||
}
|
||||
|
||||
int Node_GetBestOutboundPeer(net_node_t* node, tcp_connection_t** outConn, uint64_t* outHeight) {
|
||||
if (!node || !outConn || !outHeight) return -1;
|
||||
|
||||
tcp_connection_t* best = NULL;
|
||||
uint64_t bestH = 0;
|
||||
|
||||
pthread_mutex_lock(&node->outboundLock);
|
||||
for (size_t i = 0; i < MAX_CONS; ++i) {
|
||||
if (node->outboundClients[i].connection) {
|
||||
if (node->outboundClients[i].peerBlockHeight > bestH || best == NULL) {
|
||||
best = node->outboundClients[i].connection;
|
||||
bestH = node->outboundClients[i].peerBlockHeight;
|
||||
}
|
||||
}
|
||||
}
|
||||
pthread_mutex_unlock(&node->outboundLock);
|
||||
|
||||
if (!best) return -1;
|
||||
*outConn = best;
|
||||
*outHeight = bestH;
|
||||
return 0;
|
||||
}
|
||||
|
||||
void Node_BroadcastChainRange(net_node_t* node, size_t startHeightInclusive, tcp_connection_t* sourceConn) {
|
||||
if (!node || !currentChain) return;
|
||||
|
||||
size_t chainSize = Chain_Size(currentChain);
|
||||
if (startHeightInclusive >= chainSize) return;
|
||||
|
||||
for (size_t h = startHeightInclusive; h < chainSize; ++h) {
|
||||
block_t* blk = NULL;
|
||||
bool loadedFromDisk = false;
|
||||
if (!Chain_GetBlockCopy(currentChain, h, &blk) || !blk) {
|
||||
if (!Chain_LoadBlockFromFile(chainDataDir, h, true, &blk, NULL) || !blk) {
|
||||
continue;
|
||||
}
|
||||
loadedFromDisk = true;
|
||||
} else if (!blk->transactions) {
|
||||
block_t* full = NULL;
|
||||
if (Chain_LoadBlockFromFile(chainDataDir, h, true, &full, NULL) && full) {
|
||||
Block_Destroy(blk);
|
||||
blk = full;
|
||||
loadedFromDisk = true;
|
||||
}
|
||||
}
|
||||
|
||||
if (!blk || !blk->transactions) {
|
||||
if (blk) Block_Destroy(blk);
|
||||
continue;
|
||||
}
|
||||
|
||||
unsigned char hash[32];
|
||||
Block_CalculateHash(blk, hash);
|
||||
|
||||
// Dedupe using seenBlocks
|
||||
int seen = 0;
|
||||
pthread_mutex_lock(&node->seenLock);
|
||||
if (DynSet_Contains(node->seenBlocks, hash)) {
|
||||
seen = 1;
|
||||
} else {
|
||||
DynSet_Insert(node->seenBlocks, hash);
|
||||
}
|
||||
pthread_mutex_unlock(&node->seenLock);
|
||||
|
||||
if (seen) {
|
||||
Block_Destroy(blk);
|
||||
continue;
|
||||
}
|
||||
|
||||
// Serialize payload: [uint64_t height][block_header_t][uint64_t txCount][transactions...]
|
||||
size_t txCount = DynArr_size(blk->transactions);
|
||||
size_t payloadLen = sizeof(uint64_t) + sizeof(block_header_t) + sizeof(uint64_t) + (txCount * sizeof(signed_transaction_t));
|
||||
unsigned char* payload = (unsigned char*)malloc(payloadLen);
|
||||
if (!payload) {
|
||||
Block_Destroy(blk);
|
||||
continue;
|
||||
}
|
||||
size_t off = 0;
|
||||
uint64_t h64 = (uint64_t)h;
|
||||
memcpy(payload + off, &h64, sizeof(h64)); off += sizeof(h64);
|
||||
memcpy(payload + off, &blk->header, sizeof(block_header_t)); off += sizeof(block_header_t);
|
||||
uint64_t txCount64 = (uint64_t)txCount;
|
||||
memcpy(payload + off, &txCount64, sizeof(txCount64)); off += sizeof(txCount64);
|
||||
for (size_t ti = 0; ti < txCount; ++ti) {
|
||||
signed_transaction_t* tx = (signed_transaction_t*)DynArr_at(blk->transactions, ti);
|
||||
memcpy(payload + off, tx, sizeof(signed_transaction_t)); off += sizeof(signed_transaction_t);
|
||||
}
|
||||
|
||||
// Snapshot outbound clients and send
|
||||
pthread_mutex_lock(&node->outboundLock);
|
||||
for (size_t i = 0; i < MAX_CONS; ++i) {
|
||||
tcp_connection_t* conn = node->outboundClients[i].connection;
|
||||
if (!conn) continue;
|
||||
if (conn == sourceConn) continue;
|
||||
Node_SendPacket(node, conn, PACKET_TYPE_BROADCAST_BLOCK, payload, off);
|
||||
}
|
||||
pthread_mutex_unlock(&node->outboundLock);
|
||||
|
||||
free(payload);
|
||||
Block_Destroy(blk);
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user