Skip to content

Commit

Permalink
Merge pull request #130 from boostorg/119-simplify-the-parser-op
Browse files Browse the repository at this point in the history
Simplifications in the parser
  • Loading branch information
mzimbres authored Jul 30, 2023
2 parents 4652537 + 9dec635 commit 7d16259
Show file tree
Hide file tree
Showing 15 changed files with 476 additions and 365 deletions.
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@ if (BOOST_REDIS_TESTS)
make_test(test_request 17)
make_test(test_run 17)
make_test(test_low_level_sync 17)
make_test(test_low_level_sync_sans_io 17)
make_test(test_conn_check_health 17)

make_test(test_conn_exec 20)
Expand Down
19 changes: 19 additions & 0 deletions CMakePresets.json
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,23 @@
"DOXYGEN_OUTPUT_DIRECTORY": "${sourceDir}/build/g++-11/doc/"
}
},
{
"name": "g++-11-release",
"generator": "Unix Makefiles",
"hidden": false,
"inherits": ["cmake-pedantic"],
"binaryDir": "${sourceDir}/build/g++-11-release",
"cacheVariables": {
"CMAKE_BUILD_TYPE": "Release",
"CMAKE_CXX_EXTENSIONS": "OFF",
"CMAKE_CXX_FLAGS": "-Wall -Wextra",
"CMAKE_CXX_COMPILER": "g++-11",
"CMAKE_SHARED_LINKER_FLAGS": "",
"CMAKE_CXX_STANDARD_REQUIRED": "ON",
"PROJECT_BINARY_DIR": "${sourceDir}/build/g++-11-release",
"DOXYGEN_OUTPUT_DIRECTORY": "${sourceDir}/build/g++-11-release/doc/"
}
},
{
"name": "clang++-13",
"generator": "Unix Makefiles",
Expand Down Expand Up @@ -124,6 +141,7 @@
"buildPresets": [
{ "name": "coverage", "configurePreset": "coverage" },
{ "name": "g++-11", "configurePreset": "g++-11" },
{ "name": "g++-11-release", "configurePreset": "g++-11-release" },
{ "name": "clang++-13", "configurePreset": "clang++-13" },
{ "name": "libc++-14-cpp17", "configurePreset": "libc++-14-cpp17" },
{ "name": "libc++-14-cpp20", "configurePreset": "libc++-14-cpp20" },
Expand All @@ -138,6 +156,7 @@
},
{ "name": "coverage", "configurePreset": "coverage", "inherits": ["test"] },
{ "name": "g++-11", "configurePreset": "g++-11", "inherits": ["test"] },
{ "name": "g++-11-release", "configurePreset": "g++-11-release", "inherits": ["test"] },
{ "name": "clang++-13", "configurePreset": "clang++-13", "inherits": ["test"] },
{ "name": "libc++-14-cpp17", "configurePreset": "libc++-14-cpp17", "inherits": ["test"] },
{ "name": "libc++-14-cpp20", "configurePreset": "libc++-14-cpp20", "inherits": ["test"] },
Expand Down
44 changes: 21 additions & 23 deletions include/boost/redis/connection.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

#include <chrono>
#include <memory>
#include <limits>

namespace boost::redis {
namespace detail
Expand Down Expand Up @@ -87,15 +88,21 @@ class basic_connection {

/// Contructs from an executor.
explicit
basic_connection(executor_type ex, asio::ssl::context::method method = asio::ssl::context::tls_client)
: impl_{ex, method}
basic_connection(
executor_type ex,
asio::ssl::context::method method = asio::ssl::context::tls_client,
std::size_t max_read_size = (std::numeric_limits<std::size_t>::max)())
: impl_{ex, method, max_read_size}
, timer_{ex}
{ }

/// Contructs from a context.
explicit
basic_connection(asio::io_context& ioc, asio::ssl::context::method method = asio::ssl::context::tls_client)
: basic_connection(ioc.get_executor(), method)
basic_connection(
asio::io_context& ioc,
asio::ssl::context::method method = asio::ssl::context::tls_client,
std::size_t max_read_size = (std::numeric_limits<std::size_t>::max)())
: basic_connection(ioc.get_executor(), method, max_read_size)
{ }

/** @brief Starts underlying connection operations.
Expand Down Expand Up @@ -255,23 +262,6 @@ class basic_connection {
bool will_reconnect() const noexcept
{ return cfg_.reconnect_wait_interval != std::chrono::seconds::zero();}

/** @brief Reserve memory on the read and write internal buffers.
*
* This function will call `std::string::reserve` on the
* underlying buffers.
*
* @param read The new capacity of the read buffer.
* @param write The new capacity of the write buffer.
*/
void reserve(std::size_t read, std::size_t write)
{
impl_.reserve(read, write);
}

/// Sets the maximum size of the read buffer.
void set_max_buffer_read_size(std::size_t max_read_size) noexcept
{ impl_.set_max_buffer_read_size(max_read_size); }

/// Returns the ssl context.
auto const& get_ssl_context() const noexcept
{ return impl_.get_ssl_context();}
Expand Down Expand Up @@ -321,10 +311,18 @@ class connection {
using executor_type = asio::any_io_executor;

/// Contructs from an executor.
explicit connection(executor_type ex, asio::ssl::context::method method = asio::ssl::context::tls_client);
explicit
connection(
executor_type ex,
asio::ssl::context::method method = asio::ssl::context::tls_client,
std::size_t max_read_size = (std::numeric_limits<std::size_t>::max)());

/// Contructs from a context.
explicit connection(asio::io_context& ioc, asio::ssl::context::method method = asio::ssl::context::tls_client);
explicit
connection(
asio::io_context& ioc,
asio::ssl::context::method method = asio::ssl::context::tls_client,
std::size_t max_read_size = (std::numeric_limits<std::size_t>::max)());

/// Returns the underlying executor.
executor_type get_executor() noexcept
Expand Down
100 changes: 41 additions & 59 deletions include/boost/redis/detail/connection_base.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,13 @@
#include <boost/assert.hpp>
#include <boost/core/ignore_unused.hpp>
#include <boost/asio/ssl/stream.hpp>
#include <boost/asio/read_until.hpp>
#include <boost/asio/buffer.hpp>

#include <algorithm>
#include <array>
#include <chrono>
#include <deque>
#include <limits>
#include <memory>
#include <string_view>
#include <type_traits>
Expand Down Expand Up @@ -112,10 +113,13 @@ class read_next_op {
// some data in the read bufer.
if (conn_->read_buffer_.empty()) {

if (conn_->use_ssl())
BOOST_ASIO_CORO_YIELD asio::async_read_until(conn_->next_layer(), conn_->make_dynamic_buffer(), "\r\n", std::move(self));
else
BOOST_ASIO_CORO_YIELD asio::async_read_until(conn_->next_layer().next_layer(), conn_->make_dynamic_buffer(), "\r\n", std::move(self));
if (conn_->use_ssl()) {
BOOST_ASIO_CORO_YIELD
asio::async_read_until(conn_->next_layer(), conn_->dbuf_, resp3::parser::sep, std::move(self));
} else {
BOOST_ASIO_CORO_YIELD
asio::async_read_until(conn_->next_layer().next_layer(), conn_->dbuf_, resp3::parser::sep, std::move(self));
}

BOOST_REDIS_CHECK_OP1(conn_->cancel(operation::run););
if (info_->stop_requested()) {
Expand All @@ -134,10 +138,13 @@ class read_next_op {
}
//-----------------------------------

if (conn_->use_ssl())
BOOST_ASIO_CORO_YIELD redis::detail::async_read(conn_->next_layer(), conn_->make_dynamic_buffer(), make_adapter(), std::move(self));
else
BOOST_ASIO_CORO_YIELD redis::detail::async_read(conn_->next_layer().next_layer(), conn_->make_dynamic_buffer(), make_adapter(), std::move(self));
if (conn_->use_ssl()) {
BOOST_ASIO_CORO_YIELD
redis::detail::async_read(conn_->next_layer(), conn_->dbuf_, make_adapter(), std::move(self));
} else {
BOOST_ASIO_CORO_YIELD
redis::detail::async_read(conn_->next_layer().next_layer(), conn_->dbuf_, make_adapter(), std::move(self));
}

++index_;

Expand All @@ -147,6 +154,7 @@ class read_next_op {
return;
}

conn_->dbuf_.consume(n);
read_size_ += n;

BOOST_ASSERT(cmds_ != 0);
Expand All @@ -162,7 +170,6 @@ template <class Conn, class Adapter>
struct receive_op {
Conn* conn_;
Adapter adapter;
std::size_t read_size = 0;
asio::coroutine coro{};

template <class Self>
Expand All @@ -182,10 +189,13 @@ struct receive_op {
}
}

if (conn_->use_ssl())
BOOST_ASIO_CORO_YIELD redis::detail::async_read(conn_->next_layer(), conn_->make_dynamic_buffer(), adapter, std::move(self));
else
BOOST_ASIO_CORO_YIELD redis::detail::async_read(conn_->next_layer().next_layer(), conn_->make_dynamic_buffer(), adapter, std::move(self));
if (conn_->use_ssl()) {
BOOST_ASIO_CORO_YIELD
redis::detail::async_read(conn_->next_layer(), conn_->dbuf_, adapter, std::move(self));
} else {
BOOST_ASIO_CORO_YIELD
redis::detail::async_read(conn_->next_layer().next_layer(), conn_->dbuf_, adapter, std::move(self));
}

if (ec || is_cancelled(self)) {
conn_->cancel(operation::run);
Expand All @@ -194,13 +204,13 @@ struct receive_op {
return;
}

read_size = n;
conn_->dbuf_.consume(n);

if (!conn_->is_next_push()) {
conn_->read_op_timer_.cancel();
}

self.complete({}, read_size);
self.complete({}, n);
return;
}
}
Expand All @@ -214,7 +224,6 @@ struct exec_op {
request const* req = nullptr;
Adapter adapter{};
std::shared_ptr<req_info_type> info = nullptr;
std::size_t read_size = 0;
asio::coroutine coro{};

template <class Self>
Expand Down Expand Up @@ -283,8 +292,6 @@ struct exec_op {
conn->async_read_next(adapter, std::move(self));
BOOST_REDIS_CHECK_OP1(;);

read_size = n;

if (info->stop_requested()) {
// Don't have to call remove_request as it has already
// been by cancel(exec).
Expand All @@ -301,7 +308,7 @@ struct exec_op {
conn->read_timer_.cancel_one();
}

self.complete({}, read_size);
self.complete({}, n);
}
}
};
Expand Down Expand Up @@ -417,9 +424,9 @@ struct reader_op {
BOOST_ASIO_CORO_REENTER (coro) for (;;)
{
if (conn->use_ssl())
BOOST_ASIO_CORO_YIELD asio::async_read_until(conn->next_layer(), conn->make_dynamic_buffer(), "\r\n", std::move(self));
BOOST_ASIO_CORO_YIELD asio::async_read_until(conn->next_layer(), conn->dbuf_, "\r\n", std::move(self));
else
BOOST_ASIO_CORO_YIELD asio::async_read_until(conn->next_layer().next_layer(), conn->make_dynamic_buffer(), "\r\n", std::move(self));
BOOST_ASIO_CORO_YIELD asio::async_read_until(conn->next_layer().next_layer(), conn->dbuf_, "\r\n", std::move(self));

if (ec == asio::error::eof) {
conn->cancel(operation::run);
Expand Down Expand Up @@ -491,25 +498,23 @@ class connection_base {
using this_type = connection_base<Executor>;

/// Constructs from an executor.
connection_base(executor_type ex, asio::ssl::context::method method = asio::ssl::context::tls_client)
connection_base(
executor_type ex,
asio::ssl::context::method method,
std::size_t max_read_size)
: ctx_{method}
, stream_{std::make_unique<next_layer_type>(ex, ctx_)}
, writer_timer_{ex}
, read_timer_{ex}
, read_op_timer_{ex}
, runner_{ex, {}}
, dbuf_{read_buffer_, max_read_size}
{
writer_timer_.expires_at(std::chrono::steady_clock::time_point::max());
read_timer_.expires_at(std::chrono::steady_clock::time_point::max());
read_op_timer_.expires_at(std::chrono::steady_clock::time_point::max());
}

/// Contructs from an execution context.
explicit
connection_base(asio::io_context& ioc, asio::ssl::context::method method = asio::ssl::context::tls_client)
: connection_base(ioc.get_executor(), method)
{ }

/// Returns the ssl context.
auto const& get_ssl_context() const noexcept
{ return ctx_;}
Expand Down Expand Up @@ -547,15 +552,8 @@ class connection_base {
cancel_impl(op);
}

template <
class Response = ignore_t,
class CompletionToken = asio::default_completion_token_t<executor_type>
>
auto
async_exec(
request const& req,
Response& resp = ignore,
CompletionToken token = CompletionToken{})
template <class Response, class CompletionToken>
auto async_exec(request const& req, Response& resp, CompletionToken token)
{
using namespace boost::redis::adapter;
auto f = boost_redis_adapt(resp);
Expand All @@ -567,14 +565,8 @@ class connection_base {
>(redis::detail::exec_op<this_type, decltype(f)>{this, &req, f}, token, writer_timer_);
}

template <
class Response = ignore_t,
class CompletionToken = asio::default_completion_token_t<executor_type>
>
auto
async_receive(
Response& response,
CompletionToken token = CompletionToken{})
template <class Response, class CompletionToken>
auto async_receive(Response& response, CompletionToken token)
{
using namespace boost::redis::adapter;
auto g = boost_redis_adapt(response);
Expand All @@ -594,15 +586,6 @@ class connection_base {
return runner_.async_run(*this, l, std::move(token));
}

void set_max_buffer_read_size(std::size_t max_read_size) noexcept
{max_read_size_ = max_read_size;}

void reserve(std::size_t read, std::size_t write)
{
read_buffer_.reserve(read);
write_buffer_.reserve(write);
}

private:
using clock_type = std::chrono::steady_clock;
using clock_traits_type = asio::wait_traits<clock_type>;
Expand Down Expand Up @@ -839,9 +822,6 @@ class connection_base {
writer_timer_.cancel();
}

auto make_dynamic_buffer()
{ return asio::dynamic_buffer(read_buffer_, max_read_size_); }

template <class CompletionToken>
auto reader(CompletionToken&& token)
{
Expand Down Expand Up @@ -927,10 +907,12 @@ class connection_base {
timer_type read_op_timer_;
runner_type runner_;

using dyn_buffer_type = asio::dynamic_string_buffer<char, std::char_traits<char>, std::allocator<char>>;

std::string read_buffer_;
dyn_buffer_type dbuf_;
std::string write_buffer_;
reqs_type reqs_;
std::size_t max_read_size_ = (std::numeric_limits<std::size_t>::max)();
};

} // boost::redis::detail
Expand Down
Loading

0 comments on commit 7d16259

Please sign in to comment.