Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions modules/platforms/cpp/cmake/dependencies.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,29 @@ function(fetch_dependency NAME URL MD5)
endif()
endfunction()

function(add_asio_dependency)
message(STATUS "Download dependency: asio")
# FetchContent_Declare(
# asio
# GIT_REPOSITORY https://github.com/chriskohlhoff/asio.git
# GIT_TAG asio-1-36-0
# )
Comment on lines +48 to +52
Copy link

Copilot AI Feb 19, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Commented-out code should be removed rather than left in the codebase. If this alternative implementation using GIT_REPOSITORY is needed for reference, consider documenting it in a comment or external documentation instead.

Suggested change
# FetchContent_Declare(
# asio
# GIT_REPOSITORY https://github.com/chriskohlhoff/asio.git
# GIT_TAG asio-1-36-0
# )

Copilot uses AI. Check for mistakes.

FetchContent_Declare(
asio
URL https://github.com/chriskohlhoff/asio/archive/refs/tags/asio-1-36-0.tar.gz
URL_HASH MD5=6699ac1dea111c20d024f25e06e573db
)

FetchContent_Populate(asio)

add_library(asio INTERFACE)

target_include_directories(asio INTERFACE ${asio_SOURCE_DIR}/asio/include)

target_compile_definitions(asio INTERFACE ASIO_STANDALONE)
endfunction()

if (${USE_LOCAL_DEPS})
find_package(msgpack REQUIRED)
if (${msgpack_FOUND})
Expand Down Expand Up @@ -76,6 +99,7 @@ else()
fetch_dependency(uni-algo https://github.com/uni-algo/uni-algo/archive/v1.2.0.tar.gz 6e0cce94a6b45ebee7b904316df9f87f)
if (${ENABLE_TESTS})
fetch_dependency(googletest https://github.com/google/googletest/archive/refs/tags/v1.14.0.tar.gz c8340a482851ef6a3fe618a082304cfc)
add_asio_dependency(https://github.com/chriskohlhoff/asio/archive/refs/tags/asio-1-36-0.tar.gz 6699ac1dea111c20d024f25e06e573db)
Copy link

Copilot AI Feb 19, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The add_asio_dependency function is called with two arguments (URL and MD5 hash) but is defined to take no parameters. This will cause a CMake error. Remove the arguments from the function call or update the function definition to accept and use these parameters.

Copilot uses AI. Check for mistakes.
endif()
endif()

Expand Down
3 changes: 2 additions & 1 deletion modules/platforms/cpp/tests/fake_server/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ set(SOURCES
fake_server.cpp
tcp_client_channel.cpp
connection_test.cpp
proxy/asio_proxy.cpp
Copy link

Copilot AI Feb 19, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The kgb_proxy.cpp file is not included in the SOURCES list in CMakeLists.txt, which means it won't be compiled or linked. This makes the kgb_proxy.h header and kgb_proxy.cpp implementation dead code. Either add proxy/kgb_proxy.cpp to the SOURCES list if it's intended to be used, or remove the kgb_proxy files entirely.

Suggested change
proxy/asio_proxy.cpp
proxy/asio_proxy.cpp
proxy/kgb_proxy.cpp

Copilot uses AI. Check for mistakes.
)

ignite_test(${TARGET} SOURCES ${SOURCES} LIBS ignite-test-common ignite3-client msgpack-c ignite-protocol ignite-tuple)
ignite_test(${TARGET} SOURCES ${SOURCES} LIBS asio ignite-test-common ignite3-client msgpack-c ignite-protocol ignite-tuple)
44 changes: 42 additions & 2 deletions modules/platforms/cpp/tests/fake_server/connection_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,16 @@
* limitations under the License.
*/

#include "tests/client-test/ignite_runner_suite.h"
#include "ignite/client/ignite_client.h"
#include "fake_server.h"
#include "ignite/client/ignite_client.h"
#include "proxy/kgb_proxy.h"
#include "proxy/asio_proxy.h"
#include "tests/client-test/ignite_runner_suite.h"

#include <gtest/gtest.h>
#include <thread>


using namespace ignite;
using namespace std::chrono_literals;

Expand Down Expand Up @@ -76,3 +79,40 @@ TEST_F(connection_test, request_timeout) {
EXPECT_EQ(error::code::OPERATION_TIMEOUT, err.get_status_code());
}
}

// TEST_F(connection_test, using_proxy) {
// fake_server fs{50900, get_logger()};
// proxy::kgb_proxy proxy{50800, 50900};
//
// fs.start();
// proxy.start();
//
// ignite_client_configuration cfg;
// cfg.set_logger(get_logger());
// cfg.set_endpoints(get_endpoints());
//
// auto cl = ignite_client::start(cfg, 5s);
//
// auto cluster_nodes = cl.get_cluster_nodes();
//
// ASSERT_EQ(1, cluster_nodes.size());
// }

Comment on lines +83 to +100
Copy link

Copilot AI Feb 19, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The kgb_proxy class implementation is added but there is no active test for it. The test_F(connection_test, using_proxy) is commented out. While asio_proxy has a working test, leaving kgb_proxy untested could allow bugs to remain undetected. Either uncomment and fix the test, or remove the kgb_proxy implementation if it's not intended to be used.

Suggested change
// TEST_F(connection_test, using_proxy) {
// fake_server fs{50900, get_logger()};
// proxy::kgb_proxy proxy{50800, 50900};
//
// fs.start();
// proxy.start();
//
// ignite_client_configuration cfg;
// cfg.set_logger(get_logger());
// cfg.set_endpoints(get_endpoints());
//
// auto cl = ignite_client::start(cfg, 5s);
//
// auto cluster_nodes = cl.get_cluster_nodes();
//
// ASSERT_EQ(1, cluster_nodes.size());
// }
TEST_F(connection_test, using_proxy) {
fake_server fs{50900, get_logger()};
proxy::kgb_proxy proxy{50800, 50900};
fs.start();
proxy.start();
ignite_client_configuration cfg;
cfg.set_logger(get_logger());
cfg.set_endpoints(get_endpoints());
auto cl = ignite_client::start(cfg, 5s);
auto cluster_nodes = cl.get_cluster_nodes();
ASSERT_EQ(1, cluster_nodes.size());
}

Copilot uses AI. Check for mistakes.
TEST_F(connection_test, using_asio) {
fake_server fs{50900, get_logger()};
fs.start();

proxy::asio_proxy proxy{static_cast<short>(50800)};

ignite_client_configuration cfg;
cfg.set_logger(get_logger());
cfg.set_endpoints(get_endpoints());

auto cl = ignite_client::start(cfg, 500s);
Copy link

Copilot AI Feb 19, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The timeout value of 500 seconds seems excessively long for a unit test. This is inconsistent with other tests in the same file that use 5s timeout (line 50, 94). Consider using a more reasonable timeout value consistent with other tests.

Suggested change
auto cl = ignite_client::start(cfg, 500s);
auto cl = ignite_client::start(cfg, 5s);

Copilot uses AI. Check for mistakes.

auto cluster_nodes = cl.get_cluster_nodes();

ASSERT_EQ(1, cluster_nodes.size());
Copy link

Copilot AI Feb 19, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The io_context.run() will continue executing until all work is completed or the io_context is stopped. When the test ends, there's no mechanism to stop the io_context, which means the thread will continue running indefinitely. The test should call io_context.stop() and then join the thread to ensure proper cleanup. Consider adding a test fixture teardown or ensuring cleanup at the end of the test.

Suggested change
ASSERT_EQ(1, cluster_nodes.size());
ASSERT_EQ(1, cluster_nodes.size());
io_context.stop();
t.join();

Copilot uses AI. Check for mistakes.

// t.join();
}
19 changes: 19 additions & 0 deletions modules/platforms/cpp/tests/fake_server/proxy/asio_proxy.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//

//

#include "asio_proxy.h"
214 changes: 214 additions & 0 deletions modules/platforms/cpp/tests/fake_server/proxy/asio_proxy.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,214 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//

#pragma once

#include <iostream>
#include <queue>
#include <tuple>

#include <asio.hpp>
#include <asio/ts/buffer.hpp>
#include <asio/ts/internet.hpp>

namespace ignite::proxy {

using asio::ip::tcp;

struct message {
char *m_arr;
size_t m_size;

message(char *arr, size_t size)
: m_arr(nullptr)
, m_size(size) {
m_arr = new char[m_size];
std::memcpy(m_arr, arr, size);
}

Copy link

Copilot AI Feb 19, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The message struct is missing copy constructor and copy assignment operator, but they should be deleted since the class manages raw memory via new/delete. Without explicit deletion, the default copy operations will perform shallow copies, leading to double-free errors. Add: message(const message&) = delete; and message& operator=(const message&) = delete;. Alternatively, implement proper deep copy semantics or use move semantics only.

Suggested change
message(const message &) = delete;
message &operator=(const message &) = delete;
message(message &&other) noexcept
: m_arr(other.m_arr)
, m_size(other.m_size) {
other.m_arr = nullptr;
other.m_size = 0;
}
message &operator=(message &&other) noexcept {
if (this != &other) {
delete[] m_arr;
m_arr = other.m_arr;
m_size = other.m_size;
other.m_arr = nullptr;
other.m_size = 0;
}
return *this;
}

Copilot uses AI. Check for mistakes.
~message() { delete[] m_arr; }
};

class session : public std::enable_shared_from_this<session> {
public:
session(tcp::socket in_sock, tcp::socket out_sock, std::atomic_bool& stopped)
: m_in_sock(std::move(in_sock))
, m_out_sock(std::move(out_sock))
, m_stopped(stopped) { }

~session() {
std::cout << "Session destructed " << this << std::endl;
}

void start() { do_serve(); }

tcp::socket &get_out_sock() { return m_out_sock; }

void set_writable(bool writable) {
m_in_to_out_writable = writable;
m_out_to_in_writable = writable;
}

enum direction { forward, reverse };

private:
void do_serve() {
do_read(forward);
do_read(reverse);
}

void do_read(direction direction) {
if (m_stopped.load())
return;

tcp::socket &src = direction == forward ? m_in_sock : m_out_sock;
std::queue<message> &queue = direction == forward ? m_in_to_out : m_out_to_in;
bool &writable = direction == forward ? m_in_to_out_writable : m_out_to_in_writable;

auto self(shared_from_this());

src.async_read_some(asio::buffer(buf, BUFF_SIZE),
[&queue, direction, &writable, self](const asio::error_code& ec, size_t len) {
if (ec) {
if (ec == asio::error::eof) {
return;
}
throw std::runtime_error("Error while reading from socket " + ec.message());
}

// we have one-threaded executor no synchronization is needed
queue.emplace(self->buf, len);

if (writable) { // there are pending write operation on this socket
self->do_write(direction);
}

self->do_read(direction);
});
Comment on lines 83 to 100
Copy link

Copilot AI Feb 19, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The lambda captures references to queue and writable (&queue, &writable), which are local references in the do_read function that point to members of 'this'. If 'this' (the session) is destroyed before the async operation completes, these references will be dangling. The session should be kept alive by capturing shared_from_this() in the lambda to ensure the object outlives the async operation.

Copilot uses AI. Check for mistakes.
}

void do_write(direction direction) {
tcp::socket &dst = direction == forward ? m_out_sock : m_in_sock;
std::queue<message> &queue = direction == forward ? m_in_to_out : m_out_to_in;
bool &writable = direction == forward ? m_in_to_out_writable : m_out_to_in_writable;

writable = false; // protects from writing same buffer twice (from head of queue).

auto self(shared_from_this());
if (!queue.empty()) {
message &msg = queue.front();

asio::async_write(
dst, asio::buffer(msg.m_arr, msg.m_size),
[&queue, direction, &writable, self](asio::error_code ec, size_t) {
if (ec) {
throw std::runtime_error("Error while writing to socket " + ec.message());
}

queue.pop();

if (!queue.empty()) {
// makes writes on the same socket strictly ordered
self->do_write(direction);
} else {
writable = true; // now read operation can initiate writes
}
});
Comment on lines 114 to 129
Copy link

Copilot AI Feb 19, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar to the issue in do_read, the lambda captures references (&queue, &writable) which point to members of 'this'. If the session is destroyed before the async operation completes, these references will be dangling. The session should be kept alive by capturing shared_from_this() in the lambda.

Copilot uses AI. Check for mistakes.
}
}

tcp::socket m_in_sock;
tcp::socket m_out_sock;

bool m_in_to_out_writable{false};
bool m_out_to_in_writable{false};

std::queue<message> m_in_to_out;
std::queue<message> m_out_to_in;

static constexpr size_t BUFF_SIZE = 4096;

char buf[BUFF_SIZE];
Comment on lines 83 to 144
Copy link

Copilot AI Feb 19, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The session buffer is reused across multiple async read operations without proper synchronization. If multiple read operations are in progress (forward and reverse), they may both write to the same 'buf' array simultaneously, causing data corruption. Each direction should have its own buffer to prevent race conditions.

Copilot uses AI. Check for mistakes.

std::atomic_bool& m_stopped;
};

class asio_proxy {
public:
asio_proxy(short port)
: m_acceptor(m_io_context, tcp::endpoint(tcp::v4(), port))
, m_resolver(m_io_context)
, m_in_sock(m_io_context)
{
do_accept();

m_executor = std::make_unique<std::thread>([this]() {
m_io_context.run();
});
}

~asio_proxy() {
m_stopped.store(true);
m_io_context.stop();

m_executor->join();
}

private:
void do_accept() {
if (m_stopped.load()) {
return;
}

m_acceptor.async_accept(m_in_sock, [this](asio::error_code ec) {
if (ec) {
throw std::runtime_error("Error accepting incoming connection " + ec.message());
}

auto ses = std::make_shared<session>(std::move(m_in_sock), tcp::socket{m_io_context}, m_stopped);

m_resolver.async_resolve(
"127.0.0.1", "50900", [ses](asio::error_code ec, tcp::resolver::results_type endpoints) {
if (ec) {
throw std::runtime_error("Error resolving server's address " + ec.message());
}

asio::async_connect(
ses->get_out_sock(), endpoints, [ses](const asio::error_code &ec, const tcp::endpoint &e) {
if (ec) {
std::cout << e.port();
throw std::runtime_error("Error connecting to server " + ec.message());
}

ses->set_writable(true);
ses->start();
});
});

do_accept();
});
}

asio::io_context m_io_context{};
std::unique_ptr<std::thread> m_executor{};

tcp::acceptor m_acceptor;
tcp::resolver m_resolver;
tcp::socket m_in_sock;

std::atomic_bool m_stopped{false};
};
} // namespace ignite::proxy
Loading