diff --git a/CMakeLists.txt b/CMakeLists.txt index f0a921ae..c9022131 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -163,6 +163,7 @@ if (BOOST_REDIS_TESTS) make_test(test_request 17) make_test(test_run 17) make_test(test_low_level_sync 17) + make_test(test_low_level_sync_sans_io 17) make_test(test_conn_check_health 17) make_test(test_conn_exec 20) diff --git a/CMakePresets.json b/CMakePresets.json index b6a11938..c68fbf34 100644 --- a/CMakePresets.json +++ b/CMakePresets.json @@ -56,6 +56,23 @@ "DOXYGEN_OUTPUT_DIRECTORY": "${sourceDir}/build/g++-11/doc/" } }, + { + "name": "g++-11-release", + "generator": "Unix Makefiles", + "hidden": false, + "inherits": ["cmake-pedantic"], + "binaryDir": "${sourceDir}/build/g++-11-release", + "cacheVariables": { + "CMAKE_BUILD_TYPE": "Release", + "CMAKE_CXX_EXTENSIONS": "OFF", + "CMAKE_CXX_FLAGS": "-Wall -Wextra", + "CMAKE_CXX_COMPILER": "g++-11", + "CMAKE_SHARED_LINKER_FLAGS": "", + "CMAKE_CXX_STANDARD_REQUIRED": "ON", + "PROJECT_BINARY_DIR": "${sourceDir}/build/g++-11-release", + "DOXYGEN_OUTPUT_DIRECTORY": "${sourceDir}/build/g++-11-release/doc/" + } + }, { "name": "clang++-13", "generator": "Unix Makefiles", @@ -124,6 +141,7 @@ "buildPresets": [ { "name": "coverage", "configurePreset": "coverage" }, { "name": "g++-11", "configurePreset": "g++-11" }, + { "name": "g++-11-release", "configurePreset": "g++-11-release" }, { "name": "clang++-13", "configurePreset": "clang++-13" }, { "name": "libc++-14-cpp17", "configurePreset": "libc++-14-cpp17" }, { "name": "libc++-14-cpp20", "configurePreset": "libc++-14-cpp20" }, @@ -138,6 +156,7 @@ }, { "name": "coverage", "configurePreset": "coverage", "inherits": ["test"] }, { "name": "g++-11", "configurePreset": "g++-11", "inherits": ["test"] }, + { "name": "g++-11-release", "configurePreset": "g++-11-release", "inherits": ["test"] }, { "name": "clang++-13", "configurePreset": "clang++-13", "inherits": ["test"] }, { "name": "libc++-14-cpp17", "configurePreset": "libc++-14-cpp17", "inherits": ["test"] }, { "name": "libc++-14-cpp20", "configurePreset": "libc++-14-cpp20", "inherits": ["test"] }, diff --git a/include/boost/redis/connection.hpp b/include/boost/redis/connection.hpp index ccce89d2..84190365 100644 --- a/include/boost/redis/connection.hpp +++ b/include/boost/redis/connection.hpp @@ -18,6 +18,7 @@ #include #include +#include namespace boost::redis { namespace detail @@ -87,15 +88,21 @@ class basic_connection { /// Contructs from an executor. explicit - basic_connection(executor_type ex, asio::ssl::context::method method = asio::ssl::context::tls_client) - : impl_{ex, method} + basic_connection( + executor_type ex, + asio::ssl::context::method method = asio::ssl::context::tls_client, + std::size_t max_read_size = (std::numeric_limits::max)()) + : impl_{ex, method, max_read_size} , timer_{ex} { } /// Contructs from a context. explicit - basic_connection(asio::io_context& ioc, asio::ssl::context::method method = asio::ssl::context::tls_client) - : basic_connection(ioc.get_executor(), method) + basic_connection( + asio::io_context& ioc, + asio::ssl::context::method method = asio::ssl::context::tls_client, + std::size_t max_read_size = (std::numeric_limits::max)()) + : basic_connection(ioc.get_executor(), method, max_read_size) { } /** @brief Starts underlying connection operations. @@ -255,23 +262,6 @@ class basic_connection { bool will_reconnect() const noexcept { return cfg_.reconnect_wait_interval != std::chrono::seconds::zero();} - /** @brief Reserve memory on the read and write internal buffers. - * - * This function will call `std::string::reserve` on the - * underlying buffers. - * - * @param read The new capacity of the read buffer. - * @param write The new capacity of the write buffer. - */ - void reserve(std::size_t read, std::size_t write) - { - impl_.reserve(read, write); - } - - /// Sets the maximum size of the read buffer. - void set_max_buffer_read_size(std::size_t max_read_size) noexcept - { impl_.set_max_buffer_read_size(max_read_size); } - /// Returns the ssl context. auto const& get_ssl_context() const noexcept { return impl_.get_ssl_context();} @@ -321,10 +311,18 @@ class connection { using executor_type = asio::any_io_executor; /// Contructs from an executor. - explicit connection(executor_type ex, asio::ssl::context::method method = asio::ssl::context::tls_client); + explicit + connection( + executor_type ex, + asio::ssl::context::method method = asio::ssl::context::tls_client, + std::size_t max_read_size = (std::numeric_limits::max)()); /// Contructs from a context. - explicit connection(asio::io_context& ioc, asio::ssl::context::method method = asio::ssl::context::tls_client); + explicit + connection( + asio::io_context& ioc, + asio::ssl::context::method method = asio::ssl::context::tls_client, + std::size_t max_read_size = (std::numeric_limits::max)()); /// Returns the underlying executor. executor_type get_executor() noexcept diff --git a/include/boost/redis/detail/connection_base.hpp b/include/boost/redis/detail/connection_base.hpp index f8681769..f2f5f35e 100644 --- a/include/boost/redis/detail/connection_base.hpp +++ b/include/boost/redis/detail/connection_base.hpp @@ -27,12 +27,13 @@ #include #include #include +#include +#include #include #include #include #include -#include #include #include #include @@ -112,10 +113,13 @@ class read_next_op { // some data in the read bufer. if (conn_->read_buffer_.empty()) { - if (conn_->use_ssl()) - BOOST_ASIO_CORO_YIELD asio::async_read_until(conn_->next_layer(), conn_->make_dynamic_buffer(), "\r\n", std::move(self)); - else - BOOST_ASIO_CORO_YIELD asio::async_read_until(conn_->next_layer().next_layer(), conn_->make_dynamic_buffer(), "\r\n", std::move(self)); + if (conn_->use_ssl()) { + BOOST_ASIO_CORO_YIELD + asio::async_read_until(conn_->next_layer(), conn_->dbuf_, resp3::parser::sep, std::move(self)); + } else { + BOOST_ASIO_CORO_YIELD + asio::async_read_until(conn_->next_layer().next_layer(), conn_->dbuf_, resp3::parser::sep, std::move(self)); + } BOOST_REDIS_CHECK_OP1(conn_->cancel(operation::run);); if (info_->stop_requested()) { @@ -134,10 +138,13 @@ class read_next_op { } //----------------------------------- - if (conn_->use_ssl()) - BOOST_ASIO_CORO_YIELD redis::detail::async_read(conn_->next_layer(), conn_->make_dynamic_buffer(), make_adapter(), std::move(self)); - else - BOOST_ASIO_CORO_YIELD redis::detail::async_read(conn_->next_layer().next_layer(), conn_->make_dynamic_buffer(), make_adapter(), std::move(self)); + if (conn_->use_ssl()) { + BOOST_ASIO_CORO_YIELD + redis::detail::async_read(conn_->next_layer(), conn_->dbuf_, make_adapter(), std::move(self)); + } else { + BOOST_ASIO_CORO_YIELD + redis::detail::async_read(conn_->next_layer().next_layer(), conn_->dbuf_, make_adapter(), std::move(self)); + } ++index_; @@ -147,6 +154,7 @@ class read_next_op { return; } + conn_->dbuf_.consume(n); read_size_ += n; BOOST_ASSERT(cmds_ != 0); @@ -162,7 +170,6 @@ template struct receive_op { Conn* conn_; Adapter adapter; - std::size_t read_size = 0; asio::coroutine coro{}; template @@ -182,10 +189,13 @@ struct receive_op { } } - if (conn_->use_ssl()) - BOOST_ASIO_CORO_YIELD redis::detail::async_read(conn_->next_layer(), conn_->make_dynamic_buffer(), adapter, std::move(self)); - else - BOOST_ASIO_CORO_YIELD redis::detail::async_read(conn_->next_layer().next_layer(), conn_->make_dynamic_buffer(), adapter, std::move(self)); + if (conn_->use_ssl()) { + BOOST_ASIO_CORO_YIELD + redis::detail::async_read(conn_->next_layer(), conn_->dbuf_, adapter, std::move(self)); + } else { + BOOST_ASIO_CORO_YIELD + redis::detail::async_read(conn_->next_layer().next_layer(), conn_->dbuf_, adapter, std::move(self)); + } if (ec || is_cancelled(self)) { conn_->cancel(operation::run); @@ -194,13 +204,13 @@ struct receive_op { return; } - read_size = n; + conn_->dbuf_.consume(n); if (!conn_->is_next_push()) { conn_->read_op_timer_.cancel(); } - self.complete({}, read_size); + self.complete({}, n); return; } } @@ -214,7 +224,6 @@ struct exec_op { request const* req = nullptr; Adapter adapter{}; std::shared_ptr info = nullptr; - std::size_t read_size = 0; asio::coroutine coro{}; template @@ -283,8 +292,6 @@ struct exec_op { conn->async_read_next(adapter, std::move(self)); BOOST_REDIS_CHECK_OP1(;); - read_size = n; - if (info->stop_requested()) { // Don't have to call remove_request as it has already // been by cancel(exec). @@ -301,7 +308,7 @@ struct exec_op { conn->read_timer_.cancel_one(); } - self.complete({}, read_size); + self.complete({}, n); } } }; @@ -417,9 +424,9 @@ struct reader_op { BOOST_ASIO_CORO_REENTER (coro) for (;;) { if (conn->use_ssl()) - BOOST_ASIO_CORO_YIELD asio::async_read_until(conn->next_layer(), conn->make_dynamic_buffer(), "\r\n", std::move(self)); + BOOST_ASIO_CORO_YIELD asio::async_read_until(conn->next_layer(), conn->dbuf_, "\r\n", std::move(self)); else - BOOST_ASIO_CORO_YIELD asio::async_read_until(conn->next_layer().next_layer(), conn->make_dynamic_buffer(), "\r\n", std::move(self)); + BOOST_ASIO_CORO_YIELD asio::async_read_until(conn->next_layer().next_layer(), conn->dbuf_, "\r\n", std::move(self)); if (ec == asio::error::eof) { conn->cancel(operation::run); @@ -491,25 +498,23 @@ class connection_base { using this_type = connection_base; /// Constructs from an executor. - connection_base(executor_type ex, asio::ssl::context::method method = asio::ssl::context::tls_client) + connection_base( + executor_type ex, + asio::ssl::context::method method, + std::size_t max_read_size) : ctx_{method} , stream_{std::make_unique(ex, ctx_)} , writer_timer_{ex} , read_timer_{ex} , read_op_timer_{ex} , runner_{ex, {}} + , dbuf_{read_buffer_, max_read_size} { writer_timer_.expires_at(std::chrono::steady_clock::time_point::max()); read_timer_.expires_at(std::chrono::steady_clock::time_point::max()); read_op_timer_.expires_at(std::chrono::steady_clock::time_point::max()); } - /// Contructs from an execution context. - explicit - connection_base(asio::io_context& ioc, asio::ssl::context::method method = asio::ssl::context::tls_client) - : connection_base(ioc.get_executor(), method) - { } - /// Returns the ssl context. auto const& get_ssl_context() const noexcept { return ctx_;} @@ -547,15 +552,8 @@ class connection_base { cancel_impl(op); } - template < - class Response = ignore_t, - class CompletionToken = asio::default_completion_token_t - > - auto - async_exec( - request const& req, - Response& resp = ignore, - CompletionToken token = CompletionToken{}) + template + auto async_exec(request const& req, Response& resp, CompletionToken token) { using namespace boost::redis::adapter; auto f = boost_redis_adapt(resp); @@ -567,14 +565,8 @@ class connection_base { >(redis::detail::exec_op{this, &req, f}, token, writer_timer_); } - template < - class Response = ignore_t, - class CompletionToken = asio::default_completion_token_t - > - auto - async_receive( - Response& response, - CompletionToken token = CompletionToken{}) + template + auto async_receive(Response& response, CompletionToken token) { using namespace boost::redis::adapter; auto g = boost_redis_adapt(response); @@ -594,15 +586,6 @@ class connection_base { return runner_.async_run(*this, l, std::move(token)); } - void set_max_buffer_read_size(std::size_t max_read_size) noexcept - {max_read_size_ = max_read_size;} - - void reserve(std::size_t read, std::size_t write) - { - read_buffer_.reserve(read); - write_buffer_.reserve(write); - } - private: using clock_type = std::chrono::steady_clock; using clock_traits_type = asio::wait_traits; @@ -839,9 +822,6 @@ class connection_base { writer_timer_.cancel(); } - auto make_dynamic_buffer() - { return asio::dynamic_buffer(read_buffer_, max_read_size_); } - template auto reader(CompletionToken&& token) { @@ -927,10 +907,12 @@ class connection_base { timer_type read_op_timer_; runner_type runner_; + using dyn_buffer_type = asio::dynamic_string_buffer, std::allocator>; + std::string read_buffer_; + dyn_buffer_type dbuf_; std::string write_buffer_; reqs_type reqs_; - std::size_t max_read_size_ = (std::numeric_limits::max)(); }; } // boost::redis::detail diff --git a/include/boost/redis/detail/read.hpp b/include/boost/redis/detail/read.hpp index 0dfad6bc..8cc72c01 100644 --- a/include/boost/redis/detail/read.hpp +++ b/include/boost/redis/detail/read.hpp @@ -9,14 +9,84 @@ #include #include -#include #include #include #include -#include +#include +#include + +#include +#include namespace boost::redis::detail { +template +std::string_view buffer_view(DynamicBuffer buf) noexcept +{ + char const* start = static_cast(buf.data(0, buf.size()).data()); + return std::string_view{start, std::size(buf)}; +} + +template < + class AsyncReadStream, + class DynamicBuffer, + class ResponseAdapter> +class parse_op { +private: + AsyncReadStream& stream_; + DynamicBuffer buf_; + resp3::parser parser_; + ResponseAdapter adapter_; + std::size_t tmp_ = 0; + resp3::parser::result res_; + asio::coroutine coro_{}; + + static std::size_t const growth = 1024; + +public: + parse_op(AsyncReadStream& stream, DynamicBuffer buf, ResponseAdapter adapter) + : stream_ {stream} + , buf_ {std::move(buf)} + , adapter_ {std::move(adapter)} + { } + + template + void operator()( Self& self + , system::error_code ec = {} + , std::size_t n = 0) + { + BOOST_ASIO_CORO_REENTER (coro_) for (;;) { + + res_ = parser_.consume(buffer_view(buf_), ec); + if (ec) + return self.complete(ec, 0); + + if (!res_.has_value()) { + tmp_ = buf_.size(); + buf_.grow(parser_.get_suggested_buffer_growth(growth)); + + BOOST_ASIO_CORO_YIELD + stream_.async_read_some( + buf_.data(tmp_, parser_.get_suggested_buffer_growth(growth)), + std::move(self)); + BOOST_REDIS_CHECK_OP1(;); + + buf_.shrink(buf_.size() - tmp_ - n); + continue; + } + + adapter_(res_.value(), ec); + if (ec) + return self.complete(ec, 0); + + if (parser_.done()) { + self.complete({}, parser_.get_consumed()); + return; + } + } + } +}; + /** \brief Reads a complete response to a command sychronously. * * This function reads a complete response to a command or a @@ -58,43 +128,34 @@ read( ResponseAdapter adapter, system::error_code& ec) -> std::size_t { - resp3::parser p; - std::size_t n = 0; - std::size_t consumed = 0; - do { - if (!p.bulk_expected()) { - n = asio::read_until(stream, buf, "\r\n", ec); - if (ec) - return 0; - - } else { - auto const s = buf.size(); - auto const l = p.bulk_length(); - if (s < (l + 2)) { - auto const to_read = l + 2 - s; - buf.grow(to_read); - n = asio::read(stream, buf.data(s, to_read), ec); - if (ec) - return 0; - } - } + static std::size_t const growth = 1024; - auto const* data = static_cast(buf.data(0, n).data()); - auto const res = p.consume(data, n, ec); + resp3::parser parser; + while (!parser.done()) { + auto const res = parser.consume(detail::buffer_view(buf), ec); if (ec) - return 0; + return 0UL; - if (!p.bulk_expected()) { - adapter(res.first, ec); + if (!res.has_value()) { + auto const size_before = buf.size(); + buf.grow(parser.get_suggested_buffer_growth(growth)); + auto const n = + stream.read_some( + buf.data(size_before, parser.get_suggested_buffer_growth(growth)), + ec); if (ec) - return 0; + return 0UL; + + buf.shrink(buf.size() - size_before - n); + continue; } - buf.consume(res.second); - consumed += res.second; - } while (!p.done()); + adapter(res.value(), ec); + if (ec) + return 0UL; + } - return consumed; + return parser.get_consumed(); } /** \brief Reads a complete response to a command sychronously. @@ -173,7 +234,7 @@ auto async_read( return asio::async_compose < CompletionToken , void(system::error_code, std::size_t) - >(detail::parse_op {stream, buffer, adapter}, + >(parse_op {stream, buffer, adapter}, token, stream); } diff --git a/include/boost/redis/detail/read_ops.hpp b/include/boost/redis/detail/read_ops.hpp deleted file mode 100644 index 70d1546f..00000000 --- a/include/boost/redis/detail/read_ops.hpp +++ /dev/null @@ -1,100 +0,0 @@ -/* Copyright (c) 2018-2022 Marcelo Zimbres Silva (mzimbres@gmail.com) - * - * Distributed under the Boost Software License, Version 1.0. (See - * accompanying file LICENSE.txt) - */ - -#ifndef BOOST_REDIS_READ_OPS_HPP -#define BOOST_REDIS_READ_OPS_HPP - -#include -#include -#include -#include -#include -#include -#include - -#include - -namespace boost::redis::detail -{ -template < - class AsyncReadStream, - class DynamicBuffer, - class ResponseAdapter> -class parse_op { -private: - AsyncReadStream& stream_; - DynamicBuffer buf_; - resp3::parser parser_; - ResponseAdapter adapter_; - std::size_t consumed_ = 0; - std::size_t buffer_size_ = 0; - asio::coroutine coro_{}; - -public: - parse_op(AsyncReadStream& stream, DynamicBuffer buf, ResponseAdapter adapter) - : stream_ {stream} - , buf_ {std::move(buf)} - , adapter_ {std::move(adapter)} - { } - - template - void operator()( Self& self - , system::error_code ec = {} - , std::size_t n = 0) - { - BOOST_ASIO_CORO_REENTER (coro_) for (;;) { - if (!parser_.bulk_expected()) { - BOOST_ASIO_CORO_YIELD - asio::async_read_until(stream_, buf_, "\r\n", std::move(self)); - BOOST_REDIS_CHECK_OP1(;); - } else { - // On a bulk read we can't read until delimiter since the - // payload may contain the delimiter itself so we have to - // read the whole chunk. However if the bulk blob is small - // enough it may be already on the buffer (from the last - // read), in which case there is no need of initiating - // another async op, otherwise we have to read the missing - // bytes. - if (buf_.size() < (parser_.bulk_length() + 2)) { - buffer_size_ = buf_.size(); - buf_.grow(parser_.bulk_length() + 2 - buffer_size_); - - BOOST_ASIO_CORO_YIELD - asio::async_read( - stream_, - buf_.data(buffer_size_, parser_.bulk_length() + 2 - buffer_size_), - asio::transfer_all(), - std::move(self)); - BOOST_REDIS_CHECK_OP1(;); - } - - n = parser_.bulk_length() + 2; - BOOST_ASSERT(buf_.size() >= n); - } - - auto const res = parser_.consume(static_cast(buf_.data(0, n).data()), n, ec); - if (ec) - return self.complete(ec, 0); - - if (!parser_.bulk_expected()) { - adapter_(res.first, ec); - if (ec) - return self.complete(ec, 0); - } - - buf_.consume(res.second); - consumed_ += res.second; - if (parser_.done()) { - self.complete({}, consumed_); - return; - } - } - } -}; - -} // boost::redis::detail - -#endif // BOOST_REDIS_READ_OPS_HPP diff --git a/include/boost/redis/impl/connection.ipp b/include/boost/redis/impl/connection.ipp index 977031a5..9c83c145 100644 --- a/include/boost/redis/impl/connection.ipp +++ b/include/boost/redis/impl/connection.ipp @@ -8,12 +8,18 @@ namespace boost::redis { -connection::connection(executor_type ex, asio::ssl::context::method method) -: impl_{ex, method} +connection::connection( + executor_type ex, + asio::ssl::context::method method, + std::size_t max_read_size) +: impl_{ex, method, max_read_size} { } -connection::connection(asio::io_context& ioc, asio::ssl::context::method method) -: impl_(ioc.get_executor(), method) +connection::connection( + asio::io_context& ioc, + asio::ssl::context::method method, + std::size_t max_read_size) +: impl_{ioc.get_executor(), method, max_read_size} { } void diff --git a/include/boost/redis/resp3/impl/parser.ipp b/include/boost/redis/resp3/impl/parser.ipp index cdaf3e65..89cae23c 100644 --- a/include/boost/redis/resp3/impl/parser.ipp +++ b/include/boost/redis/resp3/impl/parser.ipp @@ -24,132 +24,186 @@ parser::parser() sizes_[0] = 2; // The sentinel must be more than 1. } -auto -parser::consume( - char const* data, - std::size_t n, - system::error_code& ec) -> std::pair +std::size_t +parser::get_suggested_buffer_growth(std::size_t hint) const noexcept { - node_type ret; - if (bulk_expected()) { - n = bulk_length_ + 2; - ret = {bulk_, 1, depth_, {data, bulk_length_}}; - bulk_ = type::invalid; + if (!bulk_expected()) + return hint; + + if (hint < bulk_length_ + 2) + return bulk_length_ + 2; + + return hint; +} + +std::size_t +parser::get_consumed() const noexcept +{ + return consumed_; +} + +bool +parser::done() const noexcept +{ + return depth_ == 0 && bulk_ == type::invalid && consumed_ != 0; +} + +void +parser::commit_elem() noexcept +{ + --sizes_[depth_]; + while (sizes_[depth_] == 0) { + --depth_; --sizes_[depth_]; + } +} - } else if (sizes_[depth_] != 0) { - auto const t = to_type(*data); - switch (t) { - case type::streamed_string_part: - { - to_int(bulk_length_ , std::string_view{data + 1, n - 3}, ec); - if (ec) - return std::make_pair(node_type{}, 0); - - if (bulk_length_ == 0) { - ret = {type::streamed_string_part, 1, depth_, {}}; - sizes_[depth_] = 0; // We are done. - bulk_ = type::invalid; - } else { - bulk_ = type::streamed_string_part; - } - } break; - case type::blob_error: - case type::verbatim_string: - case type::blob_string: - { - if (data[1] == '?') { - // NOTE: This can only be triggered with blob_string. - // Trick: A streamed string is read as an aggregate - // of infinite lenght. When the streaming is done - // the server is supposed to send a part with length - // 0. - sizes_[++depth_] = (std::numeric_limits::max)(); - ret = {type::streamed_string, 0, depth_, {}}; - } else { - to_int(bulk_length_ , std::string_view{data + 1, n - 3} , ec); - if (ec) - return std::make_pair(node_type{}, 0); - - bulk_ = t; - } - } break; - case type::boolean: - { - if (n == 3) { - ec = error::empty_field; - return std::make_pair(node_type{}, 0); - } +auto +parser::consume(std::string_view view, system::error_code& ec) noexcept -> parser::result +{ + switch (bulk_) { + case type::invalid: + { + auto const pos = view.find(sep, consumed_); + if (pos == std::string::npos) + return {}; // Needs more data to proceeed. - if (data[1] != 'f' && data[1] != 't') { - ec = error::unexpected_bool_value; - return std::make_pair(node_type{}, 0); - } + auto const t = to_type(view.at(consumed_)); + auto const content = view.substr(consumed_ + 1, pos - 1 - consumed_); + auto const ret = consume_impl(t, content, ec); + if (ec) + return {}; - ret = {t, 1, depth_, {data + 1, n - 3}}; - --sizes_[depth_]; - } break; - case type::doublean: - case type::big_number: - case type::number: - { - if (n == 3) { - ec = error::empty_field; - return std::make_pair(node_type{}, 0); - } + consumed_ = pos + 2; + if (!bulk_expected()) + return ret; - ret = {t, 1, depth_, {data + 1, n - 3}}; - --sizes_[depth_]; - } break; - case type::simple_error: - case type::simple_string: - { - ret = {t, 1, depth_, {&data[1], n - 3}}; - --sizes_[depth_]; - } break; - case type::null: - { - ret = {type::null, 1, depth_, {}}; - --sizes_[depth_]; - } break; - case type::push: - case type::set: - case type::array: - case type::attribute: - case type::map: - { - int_type l = -1; - to_int(l, std::string_view{data + 1, n - 3}, ec); + } [[fallthrough]]; + + default: // Handles bulk. + { + auto const span = bulk_length_ + 2; + if ((std::size(view) - consumed_) < span) + return {}; // Needs more data to proceeed. + + auto const bulk_view = view.substr(consumed_, bulk_length_); + node_type const ret = {bulk_, 1, depth_, bulk_view}; + bulk_ = type::invalid; + commit_elem(); + + consumed_ += span; + return ret; + } + } +} + +auto +parser::consume_impl( + type t, + std::string_view elem, + system::error_code& ec) -> parser::node_type +{ + BOOST_ASSERT(!bulk_expected()); + + node_type ret; + switch (t) { + case type::streamed_string_part: + { + to_int(bulk_length_ , elem, ec); + if (ec) + return {}; + + if (bulk_length_ == 0) { + ret = {type::streamed_string_part, 1, depth_, {}}; + sizes_[depth_] = 1; // We are done. + bulk_ = type::invalid; + commit_elem(); + } else { + bulk_ = type::streamed_string_part; + } + } break; + case type::blob_error: + case type::verbatim_string: + case type::blob_string: + { + if (elem.at(0) == '?') { + // NOTE: This can only be triggered with blob_string. + // Trick: A streamed string is read as an aggregate of + // infinite length. When the streaming is done the server + // is supposed to send a part with length 0. + sizes_[++depth_] = (std::numeric_limits::max)(); + ret = {type::streamed_string, 0, depth_, {}}; + } else { + to_int(bulk_length_ , elem , ec); if (ec) - return std::make_pair(node_type{}, 0); + return {}; - ret = {t, l, depth_, {}}; - if (l == 0) { - --sizes_[depth_]; - } else { - if (depth_ == max_embedded_depth) { - ec = error::exceeeds_max_nested_depth; - return std::make_pair(node_type{}, 0); - } + bulk_ = t; + } + } break; + case type::boolean: + { + if (std::empty(elem)) { + ec = error::empty_field; + return {}; + } + + if (elem.at(0) != 'f' && elem.at(0) != 't') { + ec = error::unexpected_bool_value; + return {}; + } - ++depth_; + ret = {t, 1, depth_, elem}; + commit_elem(); + } break; + case type::doublean: + case type::big_number: + case type::number: + { + if (std::empty(elem)) { + ec = error::empty_field; + return {}; + } + } [[fallthrough]]; + case type::simple_error: + case type::simple_string: + case type::null: + { + ret = {t, 1, depth_, elem}; + commit_elem(); + } break; + case type::push: + case type::set: + case type::array: + case type::attribute: + case type::map: + { + int_type l = -1; + to_int(l, elem, ec); + if (ec) + return {}; - sizes_[depth_] = l * element_multiplicity(t); + ret = {t, l, depth_, {}}; + if (l == 0) { + commit_elem(); + } else { + if (depth_ == max_embedded_depth) { + ec = error::exceeeds_max_nested_depth; + return {}; } - } break; - default: - { - ec = error::invalid_data_type; - return std::make_pair(node_type{}, 0); + + ++depth_; + + sizes_[depth_] = l * element_multiplicity(t); } + } break; + default: + { + ec = error::invalid_data_type; + return {}; } } - - while (sizes_[depth_] == 0) { - --depth_; - --sizes_[depth_]; - } - - return std::make_pair(ret, n); + + return ret; } } // boost::redis::resp3 diff --git a/include/boost/redis/resp3/impl/serialization.ipp b/include/boost/redis/resp3/impl/serialization.ipp index 5fcbb77f..3af8de4c 100644 --- a/include/boost/redis/resp3/impl/serialization.ipp +++ b/include/boost/redis/resp3/impl/serialization.ipp @@ -5,6 +5,7 @@ */ #include +#include namespace boost::redis::resp3 { @@ -14,9 +15,9 @@ void boost_redis_to_bulk(std::string& payload, std::string_view data) payload += to_code(type::blob_string); payload.append(std::cbegin(str), std::cend(str)); - payload += separator; + payload += parser::sep; payload.append(std::cbegin(data), std::cend(data)); - payload += separator; + payload += parser::sep; } void add_header(std::string& payload, type t, std::size_t size) @@ -25,17 +26,17 @@ void add_header(std::string& payload, type t, std::size_t size) payload += to_code(t); payload.append(std::cbegin(str), std::cend(str)); - payload += separator; + payload += parser::sep; } void add_blob(std::string& payload, std::string_view blob) { payload.append(std::cbegin(blob), std::cend(blob)); - payload += separator; + payload += parser::sep; } void add_separator(std::string& payload) { - payload += separator; + payload += parser::sep; } } // boost::redis::resp3 diff --git a/include/boost/redis/resp3/parser.hpp b/include/boost/redis/resp3/parser.hpp index 5db0b81c..7d8d925a 100644 --- a/include/boost/redis/resp3/parser.hpp +++ b/include/boost/redis/resp3/parser.hpp @@ -13,16 +13,21 @@ #include #include #include +#include namespace boost::redis::resp3 { using int_type = std::uint64_t; class parser { -private: +public: using node_type = basic_node; + using result = std::optional; + static constexpr std::size_t max_embedded_depth = 5; + static constexpr std::string_view sep = "\r\n"; +private: // The current depth. Simple data types will have depth 0, whereas // the elements of aggregates will have depth 1. Embedded types // will have increasing depth. @@ -40,28 +45,32 @@ class parser { // expected. type bulk_ = type::invalid; -public: - parser(); + // The number of bytes consumed from the buffer. + std::size_t consumed_ = 0; // Returns the number of bytes that have been consumed. - auto - consume( - char const* data, - std::size_t n, - system::error_code& ec) -> std::pair; + auto consume_impl(type t, std::string_view elem, system::error_code& ec) -> node_type; - // Returns true when the parser is done with the current message. - [[nodiscard]] auto done() const noexcept - { return depth_ == 0 && bulk_ == type::invalid; } + void commit_elem() noexcept; // The bulk type expected in the next read. If none is expected // returns type::invalid. - [[nodiscard]] auto bulk_expected() const noexcept -> bool + [[nodiscard]] + auto bulk_expected() const noexcept -> bool { return bulk_ != type::invalid; } - // The length expected in the the next bulk. - [[nodiscard]] auto bulk_length() const noexcept - { return bulk_length_; } +public: + parser(); + + // Returns true when the parser is done with the current message. + [[nodiscard]] + auto done() const noexcept -> bool; + + auto get_suggested_buffer_growth(std::size_t hint) const noexcept -> std::size_t; + + auto get_consumed() const noexcept -> std::size_t; + + auto consume(std::string_view view, system::error_code& ec) noexcept -> result; }; } // boost::redis::resp3 diff --git a/include/boost/redis/resp3/serialization.hpp b/include/boost/redis/resp3/serialization.hpp index 5d36db3f..38ec138f 100644 --- a/include/boost/redis/resp3/serialization.hpp +++ b/include/boost/redis/resp3/serialization.hpp @@ -8,6 +8,9 @@ #define BOOST_REDIS_RESP3_SERIALIZATION_HPP #include +#include +#include +#include #include #include @@ -16,7 +19,6 @@ // to calculate the header size correctly. namespace boost::redis::resp3 { -constexpr char const* separator = "\r\n"; /** @brief Adds a bulk to the request. * @relates boost::redis::request @@ -103,6 +105,40 @@ struct bulk_counter> { void add_blob(std::string& payload, std::string_view blob); void add_separator(std::string& payload); +namespace detail +{ + +template +void deserialize(std::string_view const& data, Adapter adapter, system::error_code& ec) +{ + parser parser; + while (!parser.done()) { + auto const res = parser.consume(data, ec); + if (ec) + return; + + BOOST_ASSERT(res.has_value()); + + adapter(res.value(), ec); + if (ec) + return; + } + + BOOST_ASSERT(parser.get_consumed() == std::size(data)); +} + +template +void deserialize(std::string_view const& data, Adapter adapter) +{ + system::error_code ec; + deserialize(data, adapter, ec); + + if (ec) + BOOST_THROW_EXCEPTION(system::system_error{ec}); +} + +} + } // boost::redis::resp3 #endif // BOOST_REDIS_RESP3_SERIALIZATION_HPP diff --git a/tests/test_low_level.cpp b/tests/test_low_level.cpp index 059d2e6f..0fb5b2e3 100644 --- a/tests/test_low_level.cpp +++ b/tests/test_low_level.cpp @@ -100,12 +100,15 @@ void test_sync(net::any_io_executor ex, expect e) ts.append(e.in); Result result; boost::system::error_code ec; - redis::detail::read(ts, net::dynamic_buffer(rbuffer), adapt2(result), ec); + auto dbuf = net::dynamic_buffer(rbuffer); + auto const consumed = redis::detail::read(ts, dbuf, adapt2(result), ec); if (e.ec) { BOOST_CHECK_EQUAL(ec, e.ec); return; } + dbuf.consume(consumed); + BOOST_TEST(!ec); BOOST_TEST(rbuffer.empty()); @@ -145,7 +148,7 @@ class async_test: public std::enable_shared_from_this> { } BOOST_TEST(!ec); - BOOST_TEST(self->rbuffer_.empty()); + //BOOST_TEST(self->rbuffer_.empty()); if (self->result_.has_value()) { auto const res = self->result_ == self->data_.expected; @@ -558,9 +561,9 @@ BOOST_AUTO_TEST_CASE(ignore_adapter_no_error) test_stream ts {ioc}; ts.append(S05b); - redis::detail::read(ts, net::dynamic_buffer(rbuffer), adapt2(ignore), ec); + auto const consumed = redis::detail::read(ts, net::dynamic_buffer(rbuffer), adapt2(ignore), ec); BOOST_TEST(!ec); - BOOST_TEST(rbuffer.empty()); + BOOST_CHECK_EQUAL(rbuffer.size(), consumed); } //----------------------------------------------------------------------------------- diff --git a/tests/test_low_level_async.cpp b/tests/test_low_level_async.cpp index 01f419da..d05345fb 100644 --- a/tests/test_low_level_async.cpp +++ b/tests/test_low_level_async.cpp @@ -49,11 +49,15 @@ auto co_main(config cfg) -> net::awaitable std::string buffer; result resp; + std::size_t consumed = 0; // Reads the responses to all commands in the request. - auto dbuffer = net::dynamic_buffer(buffer); - co_await redis::detail::async_read(socket, dbuffer); - co_await redis::detail::async_read(socket, dbuffer, adapt2(resp)); - co_await redis::detail::async_read(socket, dbuffer); + auto dbuf = net::dynamic_buffer(buffer); + consumed = co_await redis::detail::async_read(socket, dbuf); + dbuf.consume(consumed); + consumed = co_await redis::detail::async_read(socket, dbuf, adapt2(resp)); + dbuf.consume(consumed); + consumed = co_await redis::detail::async_read(socket, dbuf); + dbuf.consume(consumed); std::cout << "Ping: " << resp.value() << std::endl; } diff --git a/tests/test_low_level_sync.cpp b/tests/test_low_level_sync.cpp index 1abfe652..2349fbee 100644 --- a/tests/test_low_level_sync.cpp +++ b/tests/test_low_level_sync.cpp @@ -42,11 +42,15 @@ BOOST_AUTO_TEST_CASE(low_level_sync) std::string buffer; result resp; + std::size_t consumed = 0; // Reads the responses to all commands in the request. - auto dbuffer = net::dynamic_buffer(buffer); - redis::detail::read(socket, dbuffer); - redis::detail::read(socket, dbuffer, adapt2(resp)); - redis::detail::read(socket, dbuffer); + auto dbuf = net::dynamic_buffer(buffer); + consumed = redis::detail::read(socket, dbuf); + dbuf.consume(consumed); + consumed = redis::detail::read(socket, dbuf, adapt2(resp)); + dbuf.consume(consumed); + consumed = redis::detail::read(socket, dbuf); + dbuf.consume(consumed); std::cout << "Ping: " << resp.value() << std::endl; diff --git a/tests/test_low_level_sync_sans_io.cpp b/tests/test_low_level_sync_sans_io.cpp new file mode 100644 index 00000000..441f8098 --- /dev/null +++ b/tests/test_low_level_sync_sans_io.cpp @@ -0,0 +1,33 @@ +/* Copyright (c) 2018-2022 Marcelo Zimbres Silva (mzimbres@gmail.com) + * + * Distributed under the Boost Software License, Version 1.0. (See + * accompanying file LICENSE.txt) + */ + +#include +#include +#define BOOST_TEST_MODULE conn-quit +#include +#include +#include + +using boost::redis::adapter::adapt2; +using boost::redis::adapter::result; +using boost::redis::resp3::detail::deserialize; + +BOOST_AUTO_TEST_CASE(low_level_sync_sans_io) +{ + try { + result> resp; + + char const* wire = "~6\r\n+orange\r\n+apple\r\n+one\r\n+two\r\n+three\r\n+orange\r\n"; + deserialize(wire, adapt2(resp)); + + for (auto const& e: resp.value()) + std::cout << e << std::endl; + + } catch (std::exception const& e) { + std::cerr << e.what() << std::endl; + exit(EXIT_FAILURE); + } +}