Conversation
There was a problem hiding this comment.
Pull request overview
This PR adds proxy classes to facilitate testing of network connections in the C++ client test suite. The changes introduce two proxy implementations: kgb_proxy (using Linux epoll) and asio_proxy (using the ASIO library), along with a new test that exercises the ASIO-based proxy.
Changes:
- Added
kgb_proxyclass using Linux epoll for TCP proxying (header and implementation) - Added
asio_proxyclass using ASIO library for TCP proxying (header and implementation) - Added ASIO library as a test dependency in the CMake build configuration
- Added test case
using_asioto verify proxy functionality
Reviewed changes
Copilot reviewed 7 out of 7 changed files in this pull request and generated 29 comments.
Show a summary per file
| File | Description |
|---|---|
| modules/platforms/cpp/tests/fake_server/proxy/kgb_proxy.h | Header defining kgb_proxy class, message_chunk, and proxy_connection structures |
| modules/platforms/cpp/tests/fake_server/proxy/kgb_proxy.cpp | Implementation of kgb_proxy using Linux epoll for TCP proxying |
| modules/platforms/cpp/tests/fake_server/proxy/asio_proxy.h | Header defining asio_proxy and session classes using ASIO library |
| modules/platforms/cpp/tests/fake_server/proxy/asio_proxy.cpp | Minimal implementation file for asio_proxy (header-only) |
| modules/platforms/cpp/tests/fake_server/connection_test.cpp | Added test case for asio_proxy, includes commented-out kgb_proxy test |
| modules/platforms/cpp/tests/fake_server/CMakeLists.txt | Added asio library dependency and asio_proxy.cpp to build |
| modules/platforms/cpp/cmake/dependencies.cmake | Added ASIO library fetching and configuration |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| while (!queue.empty()) { | ||
|
|
||
| const message_chunk& chunk = queue.front(); | ||
| ssize_t sent = send(dst, chunk.m_msg, chunk.m_size, 0); | ||
|
|
||
| if (sent <= 0) { | ||
| if (errno == EAGAIN || errno == EWOULDBLOCK) { | ||
| break; | ||
| } | ||
|
|
||
| perror("send"); | ||
| break; | ||
| } | ||
|
|
||
| queue.pop(); |
There was a problem hiding this comment.
Potential partial send issue: send() may send fewer bytes than requested. The code assumes all bytes are sent in one call (line 249), but send() can return a value less than chunk.m_size. This could lead to incomplete message transmission. The message_chunk should track how many bytes have been sent and retry sending the remaining bytes.
| void do_accept() { | ||
| m_acceptor.async_accept(m_in_sock, [this](asio::error_code ec) { | ||
| if (!ec) { | ||
| auto ses = m_sessons.emplace_back( |
There was a problem hiding this comment.
Typo in variable name: m_sessons should be m_sessions (missing 'i'). This is the same variable referenced in line 193.
| # FetchContent_Declare( | ||
| # asio | ||
| # GIT_REPOSITORY https://github.com/chriskohlhoff/asio.git | ||
| # GIT_TAG asio-1-36-0 | ||
| # ) |
There was a problem hiding this comment.
Commented-out code should be removed rather than left in the codebase. If this alternative implementation using GIT_REPOSITORY is needed for reference, consider documenting it in a comment or external documentation instead.
| # FetchContent_Declare( | |
| # asio | |
| # GIT_REPOSITORY https://github.com/chriskohlhoff/asio.git | |
| # GIT_TAG asio-1-36-0 | |
| # ) |
| src.async_read_some(asio::buffer(buf, BUFF_SIZE), | ||
| [this, &queue, direction, &writable](asio::error_code ec, size_t len) { | ||
| if (ec) { | ||
| throw std::runtime_error("Error while reading from socket " + ec.message()); | ||
| } | ||
|
|
||
| // we have one-threaded executor no synchronization is needed | ||
| queue.emplace(buf, len); | ||
|
|
||
| if (writable) { // there are pending write operation on this socket | ||
| do_write(direction); | ||
| } | ||
|
|
||
| do_read(direction); | ||
| }); |
There was a problem hiding this comment.
The lambda captures references to queue and writable (&queue, &writable), which are local references in the do_read function that point to members of 'this'. If 'this' (the session) is destroyed before the async operation completes, these references will be dangling. The session should be kept alive by capturing shared_from_this() in the lambda to ensure the object outlives the async operation.
| asio::async_write( | ||
| dst, asio::buffer(msg.m_arr, msg.m_size), [this, &queue, direction, &writable](asio::error_code ec, size_t) { | ||
| if (ec) { | ||
| throw std::runtime_error("Error while writing to socket " + ec.message()); | ||
| } | ||
|
|
||
| queue.pop(); | ||
|
|
||
| if (!queue.empty()) { | ||
| // makes writes on the same socket strictly ordered | ||
| do_write(direction); | ||
| } else { | ||
| writable = true; // now read operation can initiate writes | ||
| } | ||
| }); |
There was a problem hiding this comment.
Similar to the issue in do_read, the lambda captures references (&queue, &writable) which point to members of 'this'. If the session is destroyed before the async operation completes, these references will be dangling. The session should be kept alive by capturing shared_from_this() in the lambda.
| #include <queue> | ||
| #include <sys/epoll.h> |
There was a problem hiding this comment.
The kgb_proxy class uses Linux-specific headers (sys/epoll.h, sys/eventfd.h) and system calls (epoll_create1, eventfd, epoll_ctl, epoll_wait) without any platform abstraction or guards. This code will not compile on non-Linux platforms (Windows, macOS). Consider adding platform guards or using a cross-platform abstraction layer, similar to how the codebase has separate linux/ and macos/ directories for network code.
| #include <queue> | |
| #include <sys/epoll.h> | |
| #include <queue> | |
| #if defined(__linux__) | |
| #include <sys/epoll.h> | |
| #else | |
| struct epoll_event; | |
| #endif |
| // TEST_F(connection_test, using_proxy) { | ||
| // fake_server fs{50900, get_logger()}; | ||
| // proxy::kgb_proxy proxy{50800, 50900}; | ||
| // | ||
| // fs.start(); | ||
| // proxy.start(); | ||
| // | ||
| // ignite_client_configuration cfg; | ||
| // cfg.set_logger(get_logger()); | ||
| // cfg.set_endpoints(get_endpoints()); | ||
| // | ||
| // auto cl = ignite_client::start(cfg, 5s); | ||
| // | ||
| // auto cluster_nodes = cl.get_cluster_nodes(); | ||
| // | ||
| // ASSERT_EQ(1, cluster_nodes.size()); | ||
| // } | ||
|
|
There was a problem hiding this comment.
The kgb_proxy class implementation is added but there is no active test for it. The test_F(connection_test, using_proxy) is commented out. While asio_proxy has a working test, leaving kgb_proxy untested could allow bugs to remain undetected. Either uncomment and fix the test, or remove the kgb_proxy implementation if it's not intended to be used.
| // TEST_F(connection_test, using_proxy) { | |
| // fake_server fs{50900, get_logger()}; | |
| // proxy::kgb_proxy proxy{50800, 50900}; | |
| // | |
| // fs.start(); | |
| // proxy.start(); | |
| // | |
| // ignite_client_configuration cfg; | |
| // cfg.set_logger(get_logger()); | |
| // cfg.set_endpoints(get_endpoints()); | |
| // | |
| // auto cl = ignite_client::start(cfg, 5s); | |
| // | |
| // auto cluster_nodes = cl.get_cluster_nodes(); | |
| // | |
| // ASSERT_EQ(1, cluster_nodes.size()); | |
| // } | |
| TEST_F(connection_test, using_proxy) { | |
| fake_server fs{50900, get_logger()}; | |
| proxy::kgb_proxy proxy{50800, 50900}; | |
| fs.start(); | |
| proxy.start(); | |
| ignite_client_configuration cfg; | |
| cfg.set_logger(get_logger()); | |
| cfg.set_endpoints(get_endpoints()); | |
| auto cl = ignite_client::start(cfg, 5s); | |
| auto cluster_nodes = cl.get_cluster_nodes(); | |
| ASSERT_EQ(1, cluster_nodes.size()); | |
| } |
| tcp::resolver m_resolver; | ||
| tcp::socket m_in_sock; | ||
|
|
||
| std::vector<std::shared_ptr<session>> m_sessons; |
There was a problem hiding this comment.
Typo in variable name: m_sessons should be m_sessions (missing 'i').
| ep_ev.data.fd = fd; | ||
| ep_ev.events = EPOLLIN | EPOLLOUT | EPOLLET; | ||
|
|
||
| epoll_ctl(epoll_fd, EPOLL_CTL_MOD, fd, &ep_ev); |
There was a problem hiding this comment.
The return value of epoll_ctl should be checked for errors. In the existing codebase (e.g., linux_async_worker_thread.cpp:77-83), epoll_ctl failures are checked and proper error handling is performed. Ignoring errors could lead to silent failures where events are not properly registered or modified.
| ev.events = EPOLLIN | EPOLLET; | ||
| ev.data.fd = fd; | ||
|
|
||
| epoll_ctl(epoll_fd, EPOLL_CTL_ADD, fd, &ev); |
There was a problem hiding this comment.
The return value of epoll_ctl should be checked for errors. In the existing codebase (e.g., linux_async_worker_thread.cpp:77-83), epoll_ctl failures are checked and proper error handling is performed.
https://issues.apache.org/jira/browse/IGNITE-27870
Thank you for submitting the pull request.
To streamline the review process of the patch and ensure better code quality
we ask both an author and a reviewer to verify the following:
The Review Checklist
- There is a single JIRA ticket related to the pull request.
- The web-link to the pull request is attached to the JIRA ticket.
- The JIRA ticket has the Patch Available state.
- The description of the JIRA ticket explains WHAT was made, WHY and HOW.
- The pull request title is treated as the final commit message. The following pattern must be used: IGNITE-XXXX Change summary where XXXX - number of JIRA issue.
Notes