From ee8c3d0f314b4314075636507acb958541ddf29f Mon Sep 17 00:00:00 2001 From: qicosmos Date: Mon, 20 May 2024 13:50:07 +0800 Subject: [PATCH] unify connect and reconnect (#583) --- include/cinatra/coro_http_client.hpp | 12 +-- include/cinatra/ylt/coro_io/client_pool.hpp | 73 +++++++++---------- include/cinatra/ylt/coro_io/coro_io.hpp | 3 +- .../cinatra/ylt/coro_io/io_context_pool.hpp | 2 +- lang/coro_http_client_introduction.md | 4 +- tests/test_cinatra.cpp | 6 +- 6 files changed, 50 insertions(+), 50 deletions(-) diff --git a/include/cinatra/coro_http_client.hpp b/include/cinatra/coro_http_client.hpp index 590c9c7f..2c3501e3 100644 --- a/include/cinatra/coro_http_client.hpp +++ b/include/cinatra/coro_http_client.hpp @@ -284,6 +284,12 @@ class coro_http_client : public std::enable_shared_from_this { // only make socket connet(or handshake) to the host async_simple::coro::Lazy connect(std::string uri) { + if (should_reset_) { + reset(); + } + else { + should_reset_ = true; + } resp_data data{}; bool no_schema = !has_schema(uri); std::string append_uri; @@ -896,11 +902,6 @@ class coro_http_client : public std::enable_shared_from_this { resp_chunk_str_.clear(); } - async_simple::coro::Lazy reconnect(std::string uri) { - reset(); - co_return co_await connect(std::move(uri)); - } - std::string_view get_host() { return host_; } std::string_view get_port() { return port_; } @@ -2126,6 +2127,7 @@ class coro_http_client : public std::enable_shared_from_this { bool enable_tcp_no_delay_ = true; std::string resp_chunk_str_; std::span out_buf_; + bool should_reset_ = false; #ifdef CINATRA_ENABLE_GZIP bool enable_ws_deflate_ = false; diff --git a/include/cinatra/ylt/coro_io/client_pool.hpp b/include/cinatra/ylt/coro_io/client_pool.hpp index 0bb82b59..f51fca9a 100644 --- a/include/cinatra/ylt/coro_io/client_pool.hpp +++ b/include/cinatra/ylt/coro_io/client_pool.hpp @@ -77,11 +77,11 @@ class client_pool : public std::enable_shared_from_this< break; } while (true) { - CINATRA_LOG_DEBUG << "start collect timeout client of pool{" + CINATRA_LOG_TRACE << "start collect timeout client of pool{" << self->host_name_ << "}, now client count: " << clients.size(); std::size_t is_all_cleared = clients.clear_old(clear_cnt); - CINATRA_LOG_DEBUG << "finish collect timeout client of pool{" + CINATRA_LOG_TRACE << "finish collect timeout client of pool{" << self->host_name_ << "}, now client cnt: " << clients.size(); if (is_all_cleared != 0) [[unlikely]] { @@ -108,37 +108,43 @@ class client_pool : public std::enable_shared_from_this< static auto rand_time(std::chrono::milliseconds ms) { static thread_local std::default_random_engine r; - std::uniform_real_distribution e(0.7f, 1.3f); + std::uniform_real_distribution e(1.0f, 1.2f); return std::chrono::milliseconds{static_cast(e(r) * ms.count())}; } - async_simple::coro::Lazy reconnect(std::unique_ptr& client) { + static async_simple::coro::Lazy reconnect( + std::unique_ptr& client, std::weak_ptr watcher) { using namespace std::chrono_literals; - for (unsigned int i = 0; i < pool_config_.connect_retry_count; ++i) { - CINATRA_LOG_DEBUG << "try to reconnect client{" << client.get() + std::shared_ptr self = watcher.lock(); + uint32_t i = UINT32_MAX; // (at least connect once) + do { + CINATRA_LOG_TRACE << "try to reconnect client{" << client.get() << "},host:{" << client->get_host() << ":" << client->get_port() << "}, try count:" << i << "max retry limit:" - << pool_config_.connect_retry_count; + << self->pool_config_.connect_retry_count; auto pre_time_point = std::chrono::steady_clock::now(); - bool ok = client_t::is_ok(co_await client->reconnect(host_name_)); + bool ok = client_t::is_ok(co_await client->connect(self->host_name_)); auto post_time_point = std::chrono::steady_clock::now(); auto cost_time = post_time_point - pre_time_point; - CINATRA_LOG_DEBUG << "reconnect client{" << client.get() + CINATRA_LOG_TRACE << "reconnect client{" << client.get() << "} cost time: " << cost_time / std::chrono::milliseconds{1} << "ms"; if (ok) { - CINATRA_LOG_DEBUG << "reconnect client{" << client.get() << "} success"; + CINATRA_LOG_TRACE << "reconnect client{" << client.get() << "} success"; co_return; } - CINATRA_LOG_DEBUG << "reconnect client{" << client.get() + CINATRA_LOG_TRACE << "reconnect client{" << client.get() << "} failed. If client close:{" << client->has_closed() << "}"; auto wait_time = rand_time( - (pool_config_.reconnect_wait_time * (i + 1) - cost_time) / 1ms * 1ms); + (self->pool_config_.reconnect_wait_time - cost_time) / 1ms * 1ms); + self = nullptr; if (wait_time.count() > 0) co_await coro_io::sleep_for(wait_time, &client->get_executor()); - } + self = watcher.lock(); + ++i; + } while (i < self->pool_config_.connect_retry_count); CINATRA_LOG_WARNING << "reconnect client{" << client.get() << "},host:{" << client->get_host() << ":" << client->get_port() << "} out of max limit, stop retry. connect failed"; @@ -150,30 +156,23 @@ class client_pool : public std::enable_shared_from_this< async_simple::Promise> promise_; }; - async_simple::coro::Lazy connect_client( + static async_simple::coro::Lazy connect_client( std::unique_ptr client, std::weak_ptr watcher, std::shared_ptr handler) { - CINATRA_LOG_DEBUG << "try to connect client{" << client.get() - << "} to host:" << host_name_; - auto result = co_await client->connect(host_name_); - std::shared_ptr self = watcher.lock(); - if (!client_t::is_ok(result)) { - CINATRA_LOG_DEBUG << "connect client{" << client.get() << "} to failed. "; - if (self) { - co_await reconnect(client); - } - } - if (client) { - CINATRA_LOG_DEBUG << "connect client{" << client.get() << "} successful!"; - } + co_await reconnect(client, watcher); auto has_get_connect = handler->flag_.exchange(true); if (!has_get_connect) { handler->promise_.setValue(std::move(client)); } else { - auto conn_lim = std::min(10u, pool_config_.max_connection); - if (self && free_clients_.size() < conn_lim && client) { - enqueue(free_clients_, std::move(client), pool_config_.idle_timeout); + if (client) { + auto self = watcher.lock(); + auto conn_lim = + std::min(10u, self->pool_config_.max_connection); + if (self && self->free_clients_.size() < conn_lim) { + self->enqueue(self->free_clients_, std::move(client), + self->pool_config_.idle_timeout); + } } } } @@ -226,7 +225,7 @@ class client_pool : public std::enable_shared_from_this< } } }); - CINATRA_LOG_DEBUG << "wait client by promise {" << &handler->promise_ + CINATRA_LOG_TRACE << "wait client by promise {" << &handler->promise_ << "}"; client = co_await handler->promise_.getFuture(); if (client) { @@ -237,7 +236,7 @@ class client_pool : public std::enable_shared_from_this< } } else { - CINATRA_LOG_DEBUG << "get free client{" << client.get() + CINATRA_LOG_TRACE << "get free client{" << client.get() << "}. from queue"; } co_return std::move(client); @@ -250,7 +249,7 @@ class client_pool : public std::enable_shared_from_this< if (clients.enqueue(std::move(client)) == 1) { std::size_t expected = 0; if (clients.collecter_cnt_.compare_exchange_strong(expected, 1)) { - CINATRA_LOG_DEBUG << "start timeout client collecter of client_pool{" + CINATRA_LOG_TRACE << "start timeout client collecter of client_pool{" << host_name_ << "}"; collect_idle_timeout_client( this->weak_from_this(), clients, @@ -274,7 +273,7 @@ class client_pool : public std::enable_shared_from_this< if (!has_get_connect) { handler->promise_.setValue(std::move(client)); promise_cnt_ -= cnt; - CINATRA_LOG_DEBUG << "collect free client{" << client.get() + CINATRA_LOG_TRACE << "collect free client{" << client.get() << "} and wake up promise{" << &handler->promise_ << "}"; return; @@ -285,13 +284,13 @@ class client_pool : public std::enable_shared_from_this< if (free_clients_.size() < pool_config_.max_connection) { if (client) { - CINATRA_LOG_DEBUG << "collect free client{" << client.get() + CINATRA_LOG_TRACE << "collect free client{" << client.get() << "} enqueue"; enqueue(free_clients_, std::move(client), pool_config_.idle_timeout); } } else { - CINATRA_LOG_DEBUG << "out of max connection limit <<" + CINATRA_LOG_TRACE << "out of max connection limit <<" << pool_config_.max_connection << ", collect free client{" << client.get() << "} enqueue short connect queue"; @@ -300,7 +299,7 @@ class client_pool : public std::enable_shared_from_this< } } else { - CINATRA_LOG_DEBUG << "client{" << client.get() + CINATRA_LOG_TRACE << "client{" << client.get() << "} is closed. we won't collect it"; } diff --git a/include/cinatra/ylt/coro_io/coro_io.hpp b/include/cinatra/ylt/coro_io/coro_io.hpp index 54f5957a..012adfe1 100644 --- a/include/cinatra/ylt/coro_io/coro_io.hpp +++ b/include/cinatra/ylt/coro_io/coro_io.hpp @@ -13,7 +13,6 @@ #include #include -#include #include #include #include @@ -303,7 +302,7 @@ inline async_simple::coro::Lazy sleep_for(Duration d) { template struct post_helper { void operator()(auto handler) { - asio::dispatch(e, [this, handler]() { + asio::post(e, [this, handler]() { try { if constexpr (std::is_same_v>) { func(); diff --git a/include/cinatra/ylt/coro_io/io_context_pool.hpp b/include/cinatra/ylt/coro_io/io_context_pool.hpp index df3fad53..18c8c61d 100644 --- a/include/cinatra/ylt/coro_io/io_context_pool.hpp +++ b/include/cinatra/ylt/coro_io/io_context_pool.hpp @@ -75,7 +75,7 @@ class ExecutorWrapper : public async_simple::Executor { context_t &context() { return executor_.context(); } - auto get_asio_executor() { return executor_; } + auto get_asio_executor() const { return executor_; } operator ExecutorImpl() { return executor_; } diff --git a/lang/coro_http_client_introduction.md b/lang/coro_http_client_introduction.md index 6123b160..9bcdebc6 100644 --- a/lang/coro_http_client_introduction.md +++ b/lang/coro_http_client_introduction.md @@ -240,7 +240,7 @@ async_simple::coro::Lazy test_async_client() { ``` # http 重连 -当http 请求失败之后,这个http client是不允许复用的,因为内部的socket 都已经关闭了,除非你调用reconnect 去重连host,这样就可以复用http client 了。 +当http 请求失败之后,这个http client是不允许复用的,因为内部的socket 都已经关闭了,除非你调用connect 去重连host,这样就可以复用http client 了。 ```c++ coro_http_client client1{}; @@ -250,7 +250,7 @@ async_simple::coro::Lazy test_async_client() { CHECK(r.status != 200); // 通过重连复用client1 - r = async_simple::coro::syncAwait(client1.reconnect("http://cn.bing.com")); + r = async_simple::coro::syncAwait(client1.connect("http://cn.bing.com")); CHECK(client1.get_host() == "cn.bing.com"); CHECK(client1.get_port() == "http"); CHECK(r.status == 200); diff --git a/tests/test_cinatra.cpp b/tests/test_cinatra.cpp index 45d623eb..a66b8fa3 100644 --- a/tests/test_cinatra.cpp +++ b/tests/test_cinatra.cpp @@ -574,15 +574,15 @@ TEST_CASE("test coro_http_client async_http_connect") { client1.async_http_connect("http//www.badurl.com")); CHECK(r.status != 200); - r = async_simple::coro::syncAwait(client1.reconnect("http://cn.bing.com")); + r = async_simple::coro::syncAwait(client1.connect("http://cn.bing.com")); CHECK(client1.get_host() == "cn.bing.com"); CHECK(client1.get_port() == "http"); CHECK(r.status >= 200); - r = async_simple::coro::syncAwait(client1.reconnect("http://www.baidu.com")); + r = async_simple::coro::syncAwait(client1.connect("http://www.baidu.com")); CHECK(r.status >= 200); - r = async_simple::coro::syncAwait(client1.reconnect("http://cn.bing.com")); + r = async_simple::coro::syncAwait(client1.connect("http://cn.bing.com")); CHECK(r.status == 200); }