Addressing some bugs regarding lifetimes and callbacks that could trigger random-ish crashed - wip

This commit is contained in:
2026-03-17 17:00:20 +01:00
parent 1136892c5d
commit 604e4ace0f
9 changed files with 92 additions and 51 deletions

View File

@@ -6,7 +6,7 @@ cmake_minimum_required(VERSION 3.16)
# If MAJOR is 0, and MINOR > 0, Version is BETA
project(ColumnLynx
VERSION 1.1.0
VERSION 1.1.1
LANGUAGES CXX
)

View File

@@ -11,6 +11,7 @@
#include <columnlynx/common/utils.hpp>
#include <columnlynx/common/libsodium_wrapper.hpp>
#include <array>
#include <atomic>
#include <algorithm>
#include <vector>
#include <unordered_map>
@@ -89,8 +90,8 @@ namespace ColumnLynx::Net::TCP {
// TODO: Move ptrs to smart ptrs
bool mConnected = false;
bool mHandshakeComplete = false;
std::atomic<bool> mConnected{false};
std::atomic<bool> mHandshakeComplete{false};
tcp::resolver mResolver;
tcp::socket mSocket;
std::shared_ptr<MessageHandler> mHandler;

View File

@@ -39,6 +39,6 @@ namespace ColumnLynx::Net::TCP {
std::array<uint8_t, 3> mHeader{}; // [type][lenHigh][lenLow]
std::vector<uint8_t> mBody;
std::function<void(AnyMessageType, std::string)> mOnMessage;
std::function<void(asio::error_code&)> mOnDisconnect;
std::function<void(const asio::error_code&)> mOnDisconnect;
};
}

View File

@@ -14,19 +14,25 @@ namespace ColumnLynx::Net::TCP {
asio::async_connect(mSocket, endpoints,
[this, self](asio::error_code ec, const tcp::endpoint&) {
if (!ec) {
mConnected = true;
mConnected.store(true, std::memory_order_relaxed);
Utils::log("Client connected.");
mHandler = std::make_shared<MessageHandler>(std::move(mSocket));
mHandler->onMessage([this](AnyMessageType type, const std::string& data) {
mHandleMessage(static_cast<ServerMessageType>(MessageHandler::toUint8(type)), data);
mHandler->onMessage([weakSelf = weak_from_this()](AnyMessageType type, const std::string& data) {
if (auto self = weakSelf.lock()) {
self->mHandleMessage(static_cast<ServerMessageType>(MessageHandler::toUint8(type)), data);
}
});
// Close only after peer FIN to avoid RSTs
mHandler->onDisconnect([this](const asio::error_code& ec) {
asio::error_code ec2;
if (mHandler) {
mHandler->socket().close(ec2);
mHandler->onDisconnect([weakSelf = weak_from_this()](const asio::error_code& ec) {
auto self = weakSelf.lock();
if (!self) {
return;
}
mConnected = false;
asio::error_code ec2;
if (self->mHandler) {
self->mHandler->socket().close(ec2);
}
self->mConnected.store(false, std::memory_order_relaxed);
Utils::log(std::string("Server disconnected: ") + ec.message());
});
mHandler->start();
@@ -74,7 +80,7 @@ namespace ColumnLynx::Net::TCP {
}
void TCPClient::sendMessage(ClientMessageType type, const std::string& data) {
if (!mConnected) {
if (!mConnected.load(std::memory_order_relaxed)) {
Utils::error("Cannot send message, client not connected.");
return;
}
@@ -87,7 +93,7 @@ namespace ColumnLynx::Net::TCP {
}
void TCPClient::disconnect(bool echo) {
if (mConnected && mHandler) {
if (mConnected.load(std::memory_order_relaxed) && mHandler) {
if (echo) {
mHandler->sendMessage(ClientMessageType::GRACEFUL_DISCONNECT, "Goodbye");
}
@@ -107,17 +113,17 @@ namespace ColumnLynx::Net::TCP {
}
bool TCPClient::isHandshakeComplete() const {
return mHandshakeComplete;
return mHandshakeComplete.load(std::memory_order_relaxed);
}
bool TCPClient::isConnected() const {
return mConnected;
return mConnected.load(std::memory_order_relaxed);
}
void TCPClient::mStartHeartbeat() {
auto self = shared_from_this();
mHeartbeatTimer.expires_after(std::chrono::seconds(5));
mHeartbeatTimer.async_wait([this, self](const asio::error_code& ec) {
mHeartbeatTimer.async_wait([self](const asio::error_code& ec) {
if (ec == asio::error::operation_aborted) {
return; // Timer was cancelled
}
@@ -130,9 +136,11 @@ namespace ColumnLynx::Net::TCP {
// Close sockets forcefully, server is dead
asio::error_code ec;
mHandler->socket().shutdown(tcp::socket::shutdown_both, ec);
mHandler->socket().close(ec);
mConnected = false;
if (self->mHandler) {
self->mHandler->socket().shutdown(tcp::socket::shutdown_both, ec);
self->mHandler->socket().close(ec);
}
self->mConnected.store(false, std::memory_order_relaxed);
ClientSession::getInstance().setAESKey({}); // Clear AES key with all zeros
ClientSession::getInstance().setSessionID(0);
@@ -261,7 +269,7 @@ namespace ColumnLynx::Net::TCP {
mTun->configureIP(clientIP, serverIP, prefixLen, mtu);
}
mHandshakeComplete = true;
mHandshakeComplete.store(true, std::memory_order_relaxed);
}
break;
@@ -276,13 +284,13 @@ namespace ColumnLynx::Net::TCP {
break;
case ServerMessageType::GRACEFUL_DISCONNECT:
Utils::log("Server is disconnecting: " + data);
if (mConnected) { // Prevent Recursion
if (mConnected.load(std::memory_order_relaxed)) { // Prevent Recursion
disconnect(false);
}
break;
case ServerMessageType::KILL_CONNECTION:
Utils::warn("Server is killing the connection: " + data);
if (mConnected) {
if (mConnected.load(std::memory_order_relaxed)) {
disconnect(false);
}
break;

View File

@@ -86,10 +86,14 @@ namespace ColumnLynx::Net {
std::unique_lock lock(mMutex);
mSessionIPs[sessionID] = ip;
/*if (mIPSessions.find(sessionID) == mIPSessions.end()) {
Utils::debug("yikes");
}*/
mIPSessions[ip] = mSessions.find(sessionID)->second;
auto it = mSessions.find(sessionID);
if (it == mSessions.end() || !it->second) {
Utils::warn("SessionRegistry::lockIP called for unknown session " + std::to_string(sessionID));
mSessionIPs.erase(sessionID);
return;
}
mIPSessions[ip] = it->second;
}
void SessionRegistry::deallocIP(uint32_t sessionID) {

View File

@@ -5,6 +5,7 @@
#include <columnlynx/common/net/tcp/tcp_message_handler.hpp>
#include <columnlynx/common/net/tcp/net_helper.hpp>
#include <columnlynx/common/utils.hpp>
#include <memory>
namespace ColumnLynx::Net::TCP {
void MessageHandler::start() {
@@ -17,17 +18,17 @@ namespace ColumnLynx::Net::TCP {
return static_cast<uint8_t>(type);
}, type);
std::vector<uint8_t> data;
data.push_back(typeByte);
auto data = std::make_shared<std::vector<uint8_t>>();
data->push_back(typeByte);
uint16_t length = payload.size();
data.push_back(length >> 8);
data.push_back(length & 0xFF);
data->push_back(length >> 8);
data->push_back(length & 0xFF);
data.insert(data.end(), payload.begin(), payload.end());
data->insert(data->end(), payload.begin(), payload.end());
auto self = shared_from_this();
asio::async_write(mSocket, asio::buffer(data),
[self](asio::error_code ec, std::size_t) {
asio::async_write(mSocket, asio::buffer(*data),
[self, data](asio::error_code ec, std::size_t) {
if (ec) {
Utils::error("Send failed: " + ec.message());
}

View File

@@ -85,7 +85,7 @@ namespace ColumnLynx::Utils {
}
std::string getVersion() {
return "1.1.0";
return "1.1.1";
}
unsigned short serverPort() {

View File

@@ -6,6 +6,7 @@
#include <iostream>
#include <thread>
#include <chrono>
#include <cstring>
#include <columnlynx/common/utils.hpp>
#include <columnlynx/common/panic_handler.hpp>
#include <columnlynx/server/net/tcp/tcp_server.hpp>
@@ -172,9 +173,24 @@ int main(int argc, char** argv) {
continue;
}
if (packet.size() < 20) {
Utils::warn("TUN: Dropping packet smaller than IPv4 header (" + std::to_string(packet.size()) + " bytes)");
continue;
}
const uint8_t* ip = packet.data();
uint32_t srcIP = ntohl(*(uint32_t*)(ip + 12)); // IPv4 source address offset
uint32_t dstIP = ntohl(*(uint32_t*)(ip + 16)); // IPv4 destination address offset
uint8_t ipVersion = (ip[0] >> 4);
if (ipVersion != 4) {
Utils::debug("TUN: Non-IPv4 packet received (version=" + std::to_string(ipVersion) + "), skipping server IPv4 routing path.");
continue;
}
uint32_t srcIPNet = 0;
uint32_t dstIPNet = 0;
std::memcpy(&srcIPNet, ip + 12, sizeof(srcIPNet)); // IPv4 source address offset
std::memcpy(&dstIPNet, ip + 16, sizeof(dstIPNet)); // IPv4 destination address offset
uint32_t srcIP = ntohl(srcIPNet);
uint32_t dstIP = ntohl(dstIPNet);
// First, check if destination IP is a registered client (e.g., server responding to client or client-to-client)
auto dstSession = SessionRegistry::getInstance().getByIP(dstIP);

View File

@@ -14,23 +14,31 @@ namespace ColumnLynx::Net::TCP {
Utils::warn("Failed to get remote endpoint: " + std::string(e.what()));
}
mHandler->onMessage([this](AnyMessageType type, const std::string& data) {
mHandleMessage(static_cast<ClientMessageType>(MessageHandler::toUint8(type)), data);
mHandler->onMessage([weakSelf = weak_from_this()](AnyMessageType type, const std::string& data) {
if (auto self = weakSelf.lock()) {
self->mHandleMessage(static_cast<ClientMessageType>(MessageHandler::toUint8(type)), data);
}
});
mHandler->onDisconnect([this](const asio::error_code& ec) {
mHandler->onDisconnect([weakSelf = weak_from_this()](const asio::error_code& ec) {
auto self = weakSelf.lock();
if (!self) {
return;
}
// Peer has closed; finalize locally without sending RST
Utils::log("Client disconnected: " + mRemoteIP + " - " + ec.message());
Utils::log("Client disconnected: " + self->mRemoteIP + " - " + ec.message());
asio::error_code ec2;
mHandler->socket().close(ec2);
if (self->mHandler) {
self->mHandler->socket().close(ec2);
}
SessionRegistry::getInstance().erase(mConnectionSessionID);
SessionRegistry::getInstance().deallocIP(mConnectionSessionID);
SessionRegistry::getInstance().erase(self->mConnectionSessionID);
SessionRegistry::getInstance().deallocIP(self->mConnectionSessionID);
Utils::log("Closed connection to " + mRemoteIP);
Utils::log("Closed connection to " + self->mRemoteIP);
if (mOnDisconnect) {
mOnDisconnect(shared_from_this());
if (self->mOnDisconnect) {
self->mOnDisconnect(self);
}
});
@@ -77,7 +85,7 @@ namespace ColumnLynx::Net::TCP {
void TCPConnection::mStartHeartbeat() {
auto self = shared_from_this();
mHeartbeatTimer.expires_after(std::chrono::seconds(5));
mHeartbeatTimer.async_wait([this, self](const asio::error_code& ec) {
mHeartbeatTimer.async_wait([self](const asio::error_code& ec) {
if (ec == asio::error::operation_aborted) {
return; // Timer was cancelled
}
@@ -90,10 +98,13 @@ namespace ColumnLynx::Net::TCP {
// Remove socket forcefully, client is dead
asio::error_code ec;
mHandler->socket().shutdown(asio::ip::tcp::socket::shutdown_both, ec);
mHandler->socket().close(ec);
if (self->mHandler) {
self->mHandler->socket().shutdown(asio::ip::tcp::socket::shutdown_both, ec);
self->mHandler->socket().close(ec);
}
SessionRegistry::getInstance().erase(self->mConnectionSessionID);
SessionRegistry::getInstance().deallocIP(self->mConnectionSessionID);
return;
}