From f5f57e370bd2d944e1dff78f4af7dc0d62d5ae5e Mon Sep 17 00:00:00 2001 From: Marcelo Zimbres Date: Sat, 20 May 2023 21:14:50 +0200 Subject: [PATCH 1/3] Improvements in the redis-push stress test. --- tests/common.cpp | 5 +-- tests/common.hpp | 3 +- tests/test_conn_echo_stress.cpp | 54 ++++++++++++++++++++++++++------- 3 files changed, 48 insertions(+), 14 deletions(-) diff --git a/tests/common.cpp b/tests/common.cpp index 366cd1d8..f4972ba2 100644 --- a/tests/common.cpp +++ b/tests/common.cpp @@ -25,9 +25,10 @@ run( std::shared_ptr conn, boost::redis::config cfg, boost::system::error_code ec, - boost::redis::operation op) + boost::redis::operation op, + boost::redis::logger::level l) { - conn->async_run(cfg, {}, run_callback{conn, op, ec}); + conn->async_run(cfg, {l}, run_callback{conn, op, ec}); } #ifdef BOOST_ASIO_HAS_CO_AWAIT diff --git a/tests/common.hpp b/tests/common.hpp index 4b917b54..b5bc7dab 100644 --- a/tests/common.hpp +++ b/tests/common.hpp @@ -20,5 +20,6 @@ run( std::shared_ptr conn, boost::redis::config cfg = {}, boost::system::error_code ec = boost::asio::error::operation_aborted, - boost::redis::operation op = boost::redis::operation::receive); + boost::redis::operation op = boost::redis::operation::receive, + boost::redis::logger::level l = boost::redis::logger::level::info); diff --git a/tests/test_conn_echo_stress.cpp b/tests/test_conn_echo_stress.cpp index 902e0cbd..a967a32f 100644 --- a/tests/test_conn_echo_stress.cpp +++ b/tests/test_conn_echo_stress.cpp @@ -7,6 +7,7 @@ #include #include #include +#include #include #define BOOST_TEST_MODULE echo-stress #include @@ -38,25 +39,33 @@ auto push_consumer(std::shared_ptr conn, int expected) -> net::await conn->cancel(); } -auto echo_session(std::shared_ptr conn, std::string id, int n) -> net::awaitable +auto +echo_session( + std::shared_ptr conn, + std::shared_ptr pubs, + std::string id, + int n) -> net::awaitable { auto ex = co_await net::this_coro::executor; request req; - response resp; + response resp; for (auto i = 0; i < n; ++i) { auto const msg = id + "/" + std::to_string(i); //std::cout << msg << std::endl; - req.push("HELLO", 3); + req.push("HELLO", 3); // Just to mess around. req.push("PING", msg); - req.push("SUBSCRIBE", "channel"); + req.push("PING", "lsls"); // TODO: Change to HELLO after fixing issue 105. boost::system::error_code ec; co_await conn->async_exec(req, resp, redir(ec)); - BOOST_CHECK_EQUAL(ec, boost::system::error_code{}); - BOOST_CHECK_EQUAL(msg, std::get<1>(resp).value()); + + BOOST_REQUIRE_EQUAL(ec, boost::system::error_code{}); + BOOST_REQUIRE_EQUAL(msg, std::get<1>(resp).value()); req.clear(); std::get<1>(resp).value().clear(); + + co_await conn->async_exec(*pubs, ignore, net::deferred); } } @@ -64,18 +73,41 @@ auto async_echo_stress() -> net::awaitable { auto ex = co_await net::this_coro::executor; auto conn = std::make_shared(ex); + config cfg; + cfg.health_check_interval = std::chrono::seconds::zero(); + run(conn, cfg, + boost::asio::error::operation_aborted, + boost::redis::operation::receive, + boost::redis::logger::level::crit); + + request req; + req.push("SUBSCRIBE", "channel"); + co_await conn->async_exec(req, ignore, net::deferred); + // Number of coroutines that will send pings sharing the same + // connection to redis. int const sessions = 500; + + // The number of pings that will be sent by each session. int const msgs = 1000; - int total = sessions * msgs; - net::co_spawn(ex, push_consumer(conn, total), net::detached); + // The number of publishes that will be sent by each session with + // each message. + int const n_pubs = 10; - for (int i = 0; i < sessions; ++i) - net::co_spawn(ex, echo_session(conn, std::to_string(i), msgs), net::detached); + // This is the total number of pushes we will receive. + int total_pushes = sessions * msgs * n_pubs + 1; + auto pubs = std::make_shared(); + for (int i = 0; i < n_pubs; ++i) + pubs->push("PUBLISH", "channel", "payload"); - run(conn); + // Op that will consume the pushes counting down until all expected + // pushes have been received. + net::co_spawn(ex, push_consumer(conn, total_pushes), net::detached); + + for (int i = 0; i < sessions; ++i) + net::co_spawn(ex, echo_session(conn, pubs, std::to_string(i), msgs), net::detached); } BOOST_AUTO_TEST_CASE(echo_stress) From 538ab8f35fd4e8288da120b8fe683205c8c63264 Mon Sep 17 00:00:00 2001 From: Marcelo Zimbres Date: Sun, 21 May 2023 14:29:41 +0200 Subject: [PATCH 2/3] Reduces the number of rescheduling needed to process a server sent push. Performance improved by close to 10%. --- CMakeLists.txt | 19 ++++----- CMakePresets.json | 2 +- include/boost/redis/connection_base.hpp | 52 ++++++++++++++++--------- tests/test_conn_check_health.cpp | 5 +++ tests/test_conn_exec.cpp | 6 +-- 5 files changed, 52 insertions(+), 32 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 19e92366..43938a63 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -79,15 +79,16 @@ add_executable(echo_server_client benchmarks/cpp/asio/echo_server_client.cpp) add_executable(echo_server_direct benchmarks/cpp/asio/echo_server_direct.cpp) set(tests_cpp17 - test_conn_quit - test_conn_tls - test_low_level - test_conn_exec_retry - test_conn_exec_error - test_request - test_run - test_low_level_sync - test_conn_check_health) + test_conn_quit + test_conn_tls + test_low_level + test_conn_exec_retry + test_conn_exec_error + test_request + test_run + test_low_level_sync + test_conn_check_health +) set(tests_cpp20 test_conn_exec diff --git a/CMakePresets.json b/CMakePresets.json index 3fba81f0..b6a11938 100644 --- a/CMakePresets.json +++ b/CMakePresets.json @@ -66,7 +66,7 @@ "CMAKE_BUILD_TYPE": "Debug", "CMAKE_CXX_EXTENSIONS": "OFF", "CMAKE_CXX_FLAGS": "-Wall -Wextra -fsanitize=address", - "CMAKE_CXX_COMPILER": "g++-11", + "CMAKE_CXX_COMPILER": "clang++-13", "CMAKE_SHARED_LINKER_FLAGS": "-fsanitize=address", "CMAKE_CXX_STANDARD_REQUIRED": "ON", "PROJECT_BINARY_DIR": "${sourceDir}/build/clang++-13", diff --git a/include/boost/redis/connection_base.hpp b/include/boost/redis/connection_base.hpp index a67f8507..5e3af4d0 100644 --- a/include/boost/redis/connection_base.hpp +++ b/include/boost/redis/connection_base.hpp @@ -43,7 +43,7 @@ namespace detail { template struct wait_receive_op { - Conn* conn; + Conn* conn_; asio::coroutine coro{}; template @@ -52,14 +52,14 @@ struct wait_receive_op { { BOOST_ASIO_CORO_REENTER (coro) { - BOOST_ASIO_CORO_YIELD - conn->channel_.async_send(system::error_code{}, 0, std::move(self)); - BOOST_REDIS_CHECK_OP0(;); + conn_->channel_.cancel(); BOOST_ASIO_CORO_YIELD - conn->channel_.async_send(system::error_code{}, 0, std::move(self)); - BOOST_REDIS_CHECK_OP0(;); - + conn_->channel_.async_send(system::error_code{}, 0, std::move(self)); + if (!conn_->is_open() || is_cancelled(self)) { + self.complete(!!ec ? ec : asio::error::operation_aborted); + return; + } self.complete({}); } } @@ -158,7 +158,7 @@ class read_next_op { template struct receive_op { - Conn* conn; + Conn* conn_; Adapter adapter; std::size_t read_size = 0; asio::coroutine coro{}; @@ -171,27 +171,32 @@ struct receive_op { { BOOST_ASIO_CORO_REENTER (coro) { - BOOST_ASIO_CORO_YIELD - conn->channel_.async_receive(std::move(self)); - BOOST_REDIS_CHECK_OP1(;); + if (conn_->wait_read_op_notification_) { + BOOST_ASIO_CORO_YIELD + conn_->channel_.async_receive(std::move(self)); + if (!conn_->is_open() || is_cancelled(self)) { + self.complete(!!ec ? ec : asio::error::operation_aborted, 0); + return; + } + } - if (conn->use_ssl()) - BOOST_ASIO_CORO_YIELD redis::detail::async_read(conn->next_layer(), conn->make_dynamic_buffer(), adapter, std::move(self)); + if (conn_->use_ssl()) + BOOST_ASIO_CORO_YIELD redis::detail::async_read(conn_->next_layer(), conn_->make_dynamic_buffer(), adapter, std::move(self)); else - BOOST_ASIO_CORO_YIELD redis::detail::async_read(conn->next_layer().next_layer(), conn->make_dynamic_buffer(), adapter, std::move(self)); + BOOST_ASIO_CORO_YIELD redis::detail::async_read(conn_->next_layer().next_layer(), conn_->make_dynamic_buffer(), adapter, std::move(self)); if (ec || is_cancelled(self)) { - conn->cancel(operation::run); - conn->cancel(operation::receive); + conn_->cancel(operation::run); + conn_->cancel(operation::receive); self.complete(!!ec ? ec : asio::error::operation_aborted, {}); return; } read_size = n; - BOOST_ASIO_CORO_YIELD - conn->channel_.async_receive(std::move(self)); - BOOST_REDIS_CHECK_OP1(;); + conn_->wait_read_op_notification_ = !conn_->is_next_maybe_push(); + if (conn_->wait_read_op_notification_) + conn_->channel_.cancel(); self.complete({}, read_size); return; @@ -315,6 +320,7 @@ struct run_op { { conn->write_buffer_.clear(); conn->read_buffer_.clear(); + conn->wait_read_op_notification_ = true; BOOST_ASIO_CORO_YIELD asio::experimental::make_parallel_group( @@ -991,6 +997,11 @@ class connection_base { stream_->next_layer().close(); } + bool is_next_maybe_push() const noexcept + { + return !read_buffer_.empty() && (resp3::to_type(read_buffer_.front()) == resp3::type::push); + } + auto is_open() const noexcept { return stream_->next_layer().is_open(); } auto& lowest_layer() noexcept { return stream_->lowest_layer(); } @@ -1009,6 +1020,9 @@ class connection_base { std::string write_buffer_; reqs_type reqs_; std::size_t max_read_size_ = (std::numeric_limits::max)(); + + // Flag that optimizes reading pushes. + bool wait_read_op_notification_ = true; }; } // boost::redis diff --git a/tests/test_conn_check_health.cpp b/tests/test_conn_check_health.cpp index 494f22c7..dc8fa078 100644 --- a/tests/test_conn_check_health.cpp +++ b/tests/test_conn_check_health.cpp @@ -9,6 +9,7 @@ #define BOOST_TEST_MODULE check-health #include #include +#include #include "common.hpp" namespace net = boost::asio; @@ -119,5 +120,9 @@ BOOST_AUTO_TEST_CASE(check_health) BOOST_TEST(!!res1); BOOST_TEST(!!res2); + + // Waits before exiting otherwise it might cause subsequent tests + // to fail. + std::this_thread::sleep_for(std::chrono::seconds{10}); } diff --git a/tests/test_conn_exec.cpp b/tests/test_conn_exec.cpp index b5fe3a1e..73be44cb 100644 --- a/tests/test_conn_exec.cpp +++ b/tests/test_conn_exec.cpp @@ -51,7 +51,7 @@ BOOST_AUTO_TEST_CASE(hello_priority) conn->async_exec(req1, ignore, [&](auto ec, auto){ // Second callback to the called. std::cout << "req1" << std::endl; - BOOST_TEST(!ec); + BOOST_CHECK_EQUAL(ec, boost::system::error_code{}); BOOST_TEST(!seen2); BOOST_TEST(seen3); seen1 = true; @@ -60,7 +60,7 @@ BOOST_AUTO_TEST_CASE(hello_priority) conn->async_exec(req2, ignore, [&](auto ec, auto){ // Last callback to the called. std::cout << "req2" << std::endl; - BOOST_TEST(!ec); + BOOST_CHECK_EQUAL(ec, boost::system::error_code{}); BOOST_TEST(seen1); BOOST_TEST(seen3); seen2 = true; @@ -71,7 +71,7 @@ BOOST_AUTO_TEST_CASE(hello_priority) conn->async_exec(req3, ignore, [&](auto ec, auto){ // Callback that will be called first. std::cout << "req3" << std::endl; - BOOST_TEST(!ec); + BOOST_CHECK_EQUAL(ec, boost::system::error_code{}); BOOST_TEST(!seen1); BOOST_TEST(!seen2); seen3 = true; From 3c02a7662bee311c97cd2300b8acbd76f424c9e0 Mon Sep 17 00:00:00 2001 From: Marcelo Zimbres Date: Sun, 21 May 2023 21:13:58 +0200 Subject: [PATCH 3/3] Replaces connection channel with a timer. --- include/boost/redis/connection_base.hpp | 39 ++++++++++++------------- tests/test_conn_exec_error.cpp | 21 +++++++++---- tests/test_conn_push.cpp | 4 +-- 3 files changed, 37 insertions(+), 27 deletions(-) diff --git a/include/boost/redis/connection_base.hpp b/include/boost/redis/connection_base.hpp index 5e3af4d0..554efe47 100644 --- a/include/boost/redis/connection_base.hpp +++ b/include/boost/redis/connection_base.hpp @@ -20,7 +20,6 @@ #include #include #include -#include #include #include #include @@ -52,10 +51,10 @@ struct wait_receive_op { { BOOST_ASIO_CORO_REENTER (coro) { - conn_->channel_.cancel(); + conn_->read_op_timer_.cancel(); BOOST_ASIO_CORO_YIELD - conn_->channel_.async_send(system::error_code{}, 0, std::move(self)); + conn_->read_op_timer_.async_wait(std::move(self)); if (!conn_->is_open() || is_cancelled(self)) { self.complete(!!ec ? ec : asio::error::operation_aborted); return; @@ -143,7 +142,11 @@ class read_next_op { ++index_; - BOOST_REDIS_CHECK_OP1(conn_->cancel(operation::run);); + if (ec || redis::detail::is_cancelled(self)) { + conn_->cancel(operation::run); + self.complete(!!ec ? ec : asio::error::operation_aborted, {}); + return; + } read_size_ += n; @@ -171,9 +174,9 @@ struct receive_op { { BOOST_ASIO_CORO_REENTER (coro) { - if (conn_->wait_read_op_notification_) { + if (!conn_->is_next_push()) { BOOST_ASIO_CORO_YIELD - conn_->channel_.async_receive(std::move(self)); + conn_->read_op_timer_.async_wait(std::move(self)); if (!conn_->is_open() || is_cancelled(self)) { self.complete(!!ec ? ec : asio::error::operation_aborted, 0); return; @@ -194,9 +197,9 @@ struct receive_op { read_size = n; - conn_->wait_read_op_notification_ = !conn_->is_next_maybe_push(); - if (conn_->wait_read_op_notification_) - conn_->channel_.cancel(); + if (!conn_->is_next_push()) { + conn_->read_op_timer_.cancel(); + } self.complete({}, read_size); return; @@ -320,7 +323,6 @@ struct run_op { { conn->write_buffer_.clear(); conn->read_buffer_.clear(); - conn->wait_read_op_notification_ = true; BOOST_ASIO_CORO_YIELD asio::experimental::make_parallel_group( @@ -496,11 +498,12 @@ class connection_base { , stream_{std::make_unique(ex, ctx_)} , writer_timer_{ex} , read_timer_{ex} - , channel_{ex} + , read_op_timer_{ex} , runner_{ex, {}} { writer_timer_.expires_at(std::chrono::steady_clock::time_point::max()); read_timer_.expires_at(std::chrono::steady_clock::time_point::max()); + read_op_timer_.expires_at(std::chrono::steady_clock::time_point::max()); } /// Contructs from an execution context. @@ -628,7 +631,7 @@ class connection_base { return asio::async_compose < CompletionToken , void(system::error_code, std::size_t) - >(redis::detail::receive_op{this, f}, token, channel_); + >(redis::detail::receive_op{this, f}, token, read_op_timer_); } /** @brief Starts underlying connection operations. @@ -695,7 +698,6 @@ class connection_base { using clock_type = std::chrono::steady_clock; using clock_traits_type = asio::wait_traits; using timer_type = asio::basic_waitable_timer; - using channel_type = asio::experimental::channel; using runner_type = redis::detail::runner; auto use_ssl() const noexcept @@ -767,7 +769,7 @@ class connection_base { } break; case operation::receive: { - channel_.cancel(); + read_op_timer_.cancel(); } break; default: /* ignore */; } @@ -891,7 +893,7 @@ class connection_base { return asio::async_compose < CompletionToken , void(system::error_code) - >(redis::detail::wait_receive_op{this}, token, channel_); + >(redis::detail::wait_receive_op{this}, token, read_op_timer_); } void cancel_push_requests() @@ -997,7 +999,7 @@ class connection_base { stream_->next_layer().close(); } - bool is_next_maybe_push() const noexcept + bool is_next_push() const noexcept { return !read_buffer_.empty() && (resp3::to_type(read_buffer_.front()) == resp3::type::push); } @@ -1013,16 +1015,13 @@ class connection_base { // not suspend. timer_type writer_timer_; timer_type read_timer_; - channel_type channel_; + timer_type read_op_timer_; runner_type runner_; std::string read_buffer_; std::string write_buffer_; reqs_type reqs_; std::size_t max_read_size_ = (std::numeric_limits::max)(); - - // Flag that optimizes reading pushes. - bool wait_read_op_notification_ = true; }; } // boost::redis diff --git a/tests/test_conn_exec_error.cpp b/tests/test_conn_exec_error.cpp index 16f53631..fe465325 100644 --- a/tests/test_conn_exec_error.cpp +++ b/tests/test_conn_exec_error.cpp @@ -200,14 +200,25 @@ BOOST_AUTO_TEST_CASE(error_in_transaction) ioc.run(); } -// This test is important because a subscriber has no response on -// success, but on error, for example when using a wrong syntax, the -// server will send a simple error response the client is not -// expecting. +// This test is important because a SUBSCRIBE command has no response +// on success, but does on error. for example when using a wrong +// syntax, the server will send a simple error response the client is +// not expecting. +// +// Sending the subscribe after the ping command below is just a +// convenience to avoid have it merged in a pipeline making things +// even more complex. For example, without a ping, we might get the +// sequence HELLO + SUBSCRIBE + PING where the hello and ping are +// automatically sent by the implementation. In this case, if the +// subscribe synthax is wrong, redis will send a response, which does +// not exist on success. That response will be interprested as the +// response to the PING command that comes thereafter and won't be +// forwarded to the receive_op, resulting in a difficult to handle +// error. BOOST_AUTO_TEST_CASE(subscriber_wrong_syntax) { request req1; - req1.push("HELLO", 3); + req1.push("PING"); request req2; req2.push("SUBSCRIBE"); // Wrong command synthax. diff --git a/tests/test_conn_push.cpp b/tests/test_conn_push.cpp index c5770026..91d1f274 100644 --- a/tests/test_conn_push.cpp +++ b/tests/test_conn_push.cpp @@ -152,7 +152,7 @@ push_consumer1(std::shared_ptr conn, bool& push_received) { auto [ec, ev] = co_await conn->async_receive(ignore, as_tuple(net::use_awaitable)); - BOOST_CHECK_EQUAL(ec, net::experimental::channel_errc::channel_cancelled); + BOOST_CHECK_EQUAL(ec, boost::system::errc::errc_t::operation_canceled); } push_received = true; @@ -196,7 +196,7 @@ BOOST_AUTO_TEST_CASE(test_push_adapter) }); conn->async_exec(req, ignore, [](auto ec, auto){ - BOOST_CHECK_EQUAL(ec, net::experimental::error::channel_errors::channel_cancelled); + BOOST_CHECK_EQUAL(ec, boost::system::errc::errc_t::operation_canceled); }); run(conn);