Skip to content

Commit

Permalink
Merge pull request #105 from boostorg/95-improve-the-performance-of-c…
Browse files Browse the repository at this point in the history
…onnectionasync_receive

95 improve the performance of connectionasync receive
  • Loading branch information
mzimbres authored May 28, 2023
2 parents 7abfc5f + 3c02a76 commit ec8a1c7
Show file tree
Hide file tree
Showing 10 changed files with 125 additions and 61 deletions.
19 changes: 10 additions & 9 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -79,15 +79,16 @@ add_executable(echo_server_client benchmarks/cpp/asio/echo_server_client.cpp)
add_executable(echo_server_direct benchmarks/cpp/asio/echo_server_direct.cpp)

set(tests_cpp17
test_conn_quit
test_conn_tls
test_low_level
test_conn_exec_retry
test_conn_exec_error
test_request
test_run
test_low_level_sync
test_conn_check_health)
test_conn_quit
test_conn_tls
test_low_level
test_conn_exec_retry
test_conn_exec_error
test_request
test_run
test_low_level_sync
test_conn_check_health
)

set(tests_cpp20
test_conn_exec
Expand Down
2 changes: 1 addition & 1 deletion CMakePresets.json
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@
"CMAKE_BUILD_TYPE": "Debug",
"CMAKE_CXX_EXTENSIONS": "OFF",
"CMAKE_CXX_FLAGS": "-Wall -Wextra -fsanitize=address",
"CMAKE_CXX_COMPILER": "g++-11",
"CMAKE_CXX_COMPILER": "clang++-13",
"CMAKE_SHARED_LINKER_FLAGS": "-fsanitize=address",
"CMAKE_CXX_STANDARD_REQUIRED": "ON",
"PROJECT_BINARY_DIR": "${sourceDir}/build/clang++-13",
Expand Down
67 changes: 40 additions & 27 deletions include/boost/redis/connection_base.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
#include <boost/system.hpp>
#include <boost/asio/basic_stream_socket.hpp>
#include <boost/asio/bind_executor.hpp>
#include <boost/asio/experimental/channel.hpp>
#include <boost/asio/experimental/parallel_group.hpp>
#include <boost/asio/ip/tcp.hpp>
#include <boost/asio/steady_timer.hpp>
Expand All @@ -43,7 +42,7 @@ namespace detail {

template <class Conn>
struct wait_receive_op {
Conn* conn;
Conn* conn_;
asio::coroutine coro{};

template <class Self>
Expand All @@ -52,14 +51,14 @@ struct wait_receive_op {
{
BOOST_ASIO_CORO_REENTER (coro)
{
BOOST_ASIO_CORO_YIELD
conn->channel_.async_send(system::error_code{}, 0, std::move(self));
BOOST_REDIS_CHECK_OP0(;);
conn_->read_op_timer_.cancel();

BOOST_ASIO_CORO_YIELD
conn->channel_.async_send(system::error_code{}, 0, std::move(self));
BOOST_REDIS_CHECK_OP0(;);

conn_->read_op_timer_.async_wait(std::move(self));
if (!conn_->is_open() || is_cancelled(self)) {
self.complete(!!ec ? ec : asio::error::operation_aborted);
return;
}
self.complete({});
}
}
Expand Down Expand Up @@ -143,7 +142,11 @@ class read_next_op {

++index_;

BOOST_REDIS_CHECK_OP1(conn_->cancel(operation::run););
if (ec || redis::detail::is_cancelled(self)) {
conn_->cancel(operation::run);
self.complete(!!ec ? ec : asio::error::operation_aborted, {});
return;
}

read_size_ += n;

Expand All @@ -158,7 +161,7 @@ class read_next_op {

template <class Conn, class Adapter>
struct receive_op {
Conn* conn;
Conn* conn_;
Adapter adapter;
std::size_t read_size = 0;
asio::coroutine coro{};
Expand All @@ -171,27 +174,32 @@ struct receive_op {
{
BOOST_ASIO_CORO_REENTER (coro)
{
BOOST_ASIO_CORO_YIELD
conn->channel_.async_receive(std::move(self));
BOOST_REDIS_CHECK_OP1(;);
if (!conn_->is_next_push()) {
BOOST_ASIO_CORO_YIELD
conn_->read_op_timer_.async_wait(std::move(self));
if (!conn_->is_open() || is_cancelled(self)) {
self.complete(!!ec ? ec : asio::error::operation_aborted, 0);
return;
}
}

if (conn->use_ssl())
BOOST_ASIO_CORO_YIELD redis::detail::async_read(conn->next_layer(), conn->make_dynamic_buffer(), adapter, std::move(self));
if (conn_->use_ssl())
BOOST_ASIO_CORO_YIELD redis::detail::async_read(conn_->next_layer(), conn_->make_dynamic_buffer(), adapter, std::move(self));
else
BOOST_ASIO_CORO_YIELD redis::detail::async_read(conn->next_layer().next_layer(), conn->make_dynamic_buffer(), adapter, std::move(self));
BOOST_ASIO_CORO_YIELD redis::detail::async_read(conn_->next_layer().next_layer(), conn_->make_dynamic_buffer(), adapter, std::move(self));

if (ec || is_cancelled(self)) {
conn->cancel(operation::run);
conn->cancel(operation::receive);
conn_->cancel(operation::run);
conn_->cancel(operation::receive);
self.complete(!!ec ? ec : asio::error::operation_aborted, {});
return;
}

read_size = n;

BOOST_ASIO_CORO_YIELD
conn->channel_.async_receive(std::move(self));
BOOST_REDIS_CHECK_OP1(;);
if (!conn_->is_next_push()) {
conn_->read_op_timer_.cancel();
}

self.complete({}, read_size);
return;
Expand Down Expand Up @@ -490,11 +498,12 @@ class connection_base {
, stream_{std::make_unique<next_layer_type>(ex, ctx_)}
, writer_timer_{ex}
, read_timer_{ex}
, channel_{ex}
, read_op_timer_{ex}
, runner_{ex, {}}
{
writer_timer_.expires_at(std::chrono::steady_clock::time_point::max());
read_timer_.expires_at(std::chrono::steady_clock::time_point::max());
read_op_timer_.expires_at(std::chrono::steady_clock::time_point::max());
}

/// Contructs from an execution context.
Expand Down Expand Up @@ -622,7 +631,7 @@ class connection_base {
return asio::async_compose
< CompletionToken
, void(system::error_code, std::size_t)
>(redis::detail::receive_op<this_type, decltype(f)>{this, f}, token, channel_);
>(redis::detail::receive_op<this_type, decltype(f)>{this, f}, token, read_op_timer_);
}

/** @brief Starts underlying connection operations.
Expand Down Expand Up @@ -689,7 +698,6 @@ class connection_base {
using clock_type = std::chrono::steady_clock;
using clock_traits_type = asio::wait_traits<clock_type>;
using timer_type = asio::basic_waitable_timer<clock_type, clock_traits_type, executor_type>;
using channel_type = asio::experimental::channel<executor_type, void(system::error_code, std::size_t)>;
using runner_type = redis::detail::runner<executor_type>;

auto use_ssl() const noexcept
Expand Down Expand Up @@ -761,7 +769,7 @@ class connection_base {
} break;
case operation::receive:
{
channel_.cancel();
read_op_timer_.cancel();
} break;
default: /* ignore */;
}
Expand Down Expand Up @@ -885,7 +893,7 @@ class connection_base {
return asio::async_compose
< CompletionToken
, void(system::error_code)
>(redis::detail::wait_receive_op<this_type>{this}, token, channel_);
>(redis::detail::wait_receive_op<this_type>{this}, token, read_op_timer_);
}

void cancel_push_requests()
Expand Down Expand Up @@ -991,6 +999,11 @@ class connection_base {
stream_->next_layer().close();
}

bool is_next_push() const noexcept
{
return !read_buffer_.empty() && (resp3::to_type(read_buffer_.front()) == resp3::type::push);
}

auto is_open() const noexcept { return stream_->next_layer().is_open(); }
auto& lowest_layer() noexcept { return stream_->lowest_layer(); }

Expand All @@ -1002,7 +1015,7 @@ class connection_base {
// not suspend.
timer_type writer_timer_;
timer_type read_timer_;
channel_type channel_;
timer_type read_op_timer_;
runner_type runner_;

std::string read_buffer_;
Expand Down
5 changes: 3 additions & 2 deletions tests/common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,10 @@ run(
std::shared_ptr<boost::redis::connection> conn,
boost::redis::config cfg,
boost::system::error_code ec,
boost::redis::operation op)
boost::redis::operation op,
boost::redis::logger::level l)
{
conn->async_run(cfg, {}, run_callback{conn, op, ec});
conn->async_run(cfg, {l}, run_callback{conn, op, ec});
}

#ifdef BOOST_ASIO_HAS_CO_AWAIT
Expand Down
3 changes: 2 additions & 1 deletion tests/common.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,6 @@ run(
std::shared_ptr<boost::redis::connection> conn,
boost::redis::config cfg = {},
boost::system::error_code ec = boost::asio::error::operation_aborted,
boost::redis::operation op = boost::redis::operation::receive);
boost::redis::operation op = boost::redis::operation::receive,
boost::redis::logger::level l = boost::redis::logger::level::info);

5 changes: 5 additions & 0 deletions tests/test_conn_check_health.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#define BOOST_TEST_MODULE check-health
#include <boost/test/included/unit_test.hpp>
#include <iostream>
#include <thread>
#include "common.hpp"

namespace net = boost::asio;
Expand Down Expand Up @@ -119,5 +120,9 @@ BOOST_AUTO_TEST_CASE(check_health)

BOOST_TEST(!!res1);
BOOST_TEST(!!res2);

// Waits before exiting otherwise it might cause subsequent tests
// to fail.
std::this_thread::sleep_for(std::chrono::seconds{10});
}

54 changes: 43 additions & 11 deletions tests/test_conn_echo_stress.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include <boost/redis/connection.hpp>
#include <boost/asio/co_spawn.hpp>
#include <boost/asio/detached.hpp>
#include <boost/asio/deferred.hpp>
#include <boost/system/errc.hpp>
#define BOOST_TEST_MODULE echo-stress
#include <boost/test/included/unit_test.hpp>
Expand Down Expand Up @@ -38,44 +39,75 @@ auto push_consumer(std::shared_ptr<connection> conn, int expected) -> net::await
conn->cancel();
}

auto echo_session(std::shared_ptr<connection> conn, std::string id, int n) -> net::awaitable<void>
auto
echo_session(
std::shared_ptr<connection> conn,
std::shared_ptr<request> pubs,
std::string id,
int n) -> net::awaitable<void>
{
auto ex = co_await net::this_coro::executor;

request req;
response<ignore_t, std::string> resp;
response<ignore_t, std::string, ignore_t> resp;

for (auto i = 0; i < n; ++i) {
auto const msg = id + "/" + std::to_string(i);
//std::cout << msg << std::endl;
req.push("HELLO", 3);
req.push("HELLO", 3); // Just to mess around.
req.push("PING", msg);
req.push("SUBSCRIBE", "channel");
req.push("PING", "lsls"); // TODO: Change to HELLO after fixing issue 105.
boost::system::error_code ec;
co_await conn->async_exec(req, resp, redir(ec));
BOOST_CHECK_EQUAL(ec, boost::system::error_code{});
BOOST_CHECK_EQUAL(msg, std::get<1>(resp).value());

BOOST_REQUIRE_EQUAL(ec, boost::system::error_code{});
BOOST_REQUIRE_EQUAL(msg, std::get<1>(resp).value());
req.clear();
std::get<1>(resp).value().clear();

co_await conn->async_exec(*pubs, ignore, net::deferred);
}
}

auto async_echo_stress() -> net::awaitable<void>
{
auto ex = co_await net::this_coro::executor;
auto conn = std::make_shared<connection>(ex);
config cfg;
cfg.health_check_interval = std::chrono::seconds::zero();
run(conn, cfg,
boost::asio::error::operation_aborted,
boost::redis::operation::receive,
boost::redis::logger::level::crit);

request req;
req.push("SUBSCRIBE", "channel");
co_await conn->async_exec(req, ignore, net::deferred);

// Number of coroutines that will send pings sharing the same
// connection to redis.
int const sessions = 500;

// The number of pings that will be sent by each session.
int const msgs = 1000;
int total = sessions * msgs;

net::co_spawn(ex, push_consumer(conn, total), net::detached);
// The number of publishes that will be sent by each session with
// each message.
int const n_pubs = 10;

for (int i = 0; i < sessions; ++i)
net::co_spawn(ex, echo_session(conn, std::to_string(i), msgs), net::detached);
// This is the total number of pushes we will receive.
int total_pushes = sessions * msgs * n_pubs + 1;

auto pubs = std::make_shared<request>();
for (int i = 0; i < n_pubs; ++i)
pubs->push("PUBLISH", "channel", "payload");

run(conn);
// Op that will consume the pushes counting down until all expected
// pushes have been received.
net::co_spawn(ex, push_consumer(conn, total_pushes), net::detached);

for (int i = 0; i < sessions; ++i)
net::co_spawn(ex, echo_session(conn, pubs, std::to_string(i), msgs), net::detached);
}

BOOST_AUTO_TEST_CASE(echo_stress)
Expand Down
6 changes: 3 additions & 3 deletions tests/test_conn_exec.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ BOOST_AUTO_TEST_CASE(hello_priority)
conn->async_exec(req1, ignore, [&](auto ec, auto){
// Second callback to the called.
std::cout << "req1" << std::endl;
BOOST_TEST(!ec);
BOOST_CHECK_EQUAL(ec, boost::system::error_code{});
BOOST_TEST(!seen2);
BOOST_TEST(seen3);
seen1 = true;
Expand All @@ -60,7 +60,7 @@ BOOST_AUTO_TEST_CASE(hello_priority)
conn->async_exec(req2, ignore, [&](auto ec, auto){
// Last callback to the called.
std::cout << "req2" << std::endl;
BOOST_TEST(!ec);
BOOST_CHECK_EQUAL(ec, boost::system::error_code{});
BOOST_TEST(seen1);
BOOST_TEST(seen3);
seen2 = true;
Expand All @@ -71,7 +71,7 @@ BOOST_AUTO_TEST_CASE(hello_priority)
conn->async_exec(req3, ignore, [&](auto ec, auto){
// Callback that will be called first.
std::cout << "req3" << std::endl;
BOOST_TEST(!ec);
BOOST_CHECK_EQUAL(ec, boost::system::error_code{});
BOOST_TEST(!seen1);
BOOST_TEST(!seen2);
seen3 = true;
Expand Down
Loading

0 comments on commit ec8a1c7

Please sign in to comment.