Skip to content

Commit

Permalink
unify connect and reconnect (#583)
Browse files Browse the repository at this point in the history
  • Loading branch information
qicosmos authored May 20, 2024
1 parent 42da1f7 commit ee8c3d0
Show file tree
Hide file tree
Showing 6 changed files with 50 additions and 50 deletions.
12 changes: 7 additions & 5 deletions include/cinatra/coro_http_client.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,12 @@ class coro_http_client : public std::enable_shared_from_this<coro_http_client> {

// only make socket connet(or handshake) to the host
async_simple::coro::Lazy<resp_data> connect(std::string uri) {
if (should_reset_) {
reset();
}
else {
should_reset_ = true;
}
resp_data data{};
bool no_schema = !has_schema(uri);
std::string append_uri;
Expand Down Expand Up @@ -896,11 +902,6 @@ class coro_http_client : public std::enable_shared_from_this<coro_http_client> {
resp_chunk_str_.clear();
}

async_simple::coro::Lazy<resp_data> reconnect(std::string uri) {
reset();
co_return co_await connect(std::move(uri));
}

std::string_view get_host() { return host_; }

std::string_view get_port() { return port_; }
Expand Down Expand Up @@ -2126,6 +2127,7 @@ class coro_http_client : public std::enable_shared_from_this<coro_http_client> {
bool enable_tcp_no_delay_ = true;
std::string resp_chunk_str_;
std::span<char> out_buf_;
bool should_reset_ = false;

#ifdef CINATRA_ENABLE_GZIP
bool enable_ws_deflate_ = false;
Expand Down
73 changes: 36 additions & 37 deletions include/cinatra/ylt/coro_io/client_pool.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,11 +77,11 @@ class client_pool : public std::enable_shared_from_this<
break;
}
while (true) {
CINATRA_LOG_DEBUG << "start collect timeout client of pool{"
CINATRA_LOG_TRACE << "start collect timeout client of pool{"
<< self->host_name_
<< "}, now client count: " << clients.size();
std::size_t is_all_cleared = clients.clear_old(clear_cnt);
CINATRA_LOG_DEBUG << "finish collect timeout client of pool{"
CINATRA_LOG_TRACE << "finish collect timeout client of pool{"
<< self->host_name_
<< "}, now client cnt: " << clients.size();
if (is_all_cleared != 0) [[unlikely]] {
Expand All @@ -108,37 +108,43 @@ class client_pool : public std::enable_shared_from_this<

static auto rand_time(std::chrono::milliseconds ms) {
static thread_local std::default_random_engine r;
std::uniform_real_distribution e(0.7f, 1.3f);
std::uniform_real_distribution e(1.0f, 1.2f);
return std::chrono::milliseconds{static_cast<long>(e(r) * ms.count())};
}

async_simple::coro::Lazy<void> reconnect(std::unique_ptr<client_t>& client) {
static async_simple::coro::Lazy<void> reconnect(
std::unique_ptr<client_t>& client, std::weak_ptr<client_pool> watcher) {
using namespace std::chrono_literals;
for (unsigned int i = 0; i < pool_config_.connect_retry_count; ++i) {
CINATRA_LOG_DEBUG << "try to reconnect client{" << client.get()
std::shared_ptr<client_pool> self = watcher.lock();
uint32_t i = UINT32_MAX; // (at least connect once)
do {
CINATRA_LOG_TRACE << "try to reconnect client{" << client.get()
<< "},host:{" << client->get_host() << ":"
<< client->get_port() << "}, try count:" << i
<< "max retry limit:"
<< pool_config_.connect_retry_count;
<< self->pool_config_.connect_retry_count;
auto pre_time_point = std::chrono::steady_clock::now();
bool ok = client_t::is_ok(co_await client->reconnect(host_name_));
bool ok = client_t::is_ok(co_await client->connect(self->host_name_));
auto post_time_point = std::chrono::steady_clock::now();
auto cost_time = post_time_point - pre_time_point;
CINATRA_LOG_DEBUG << "reconnect client{" << client.get()
CINATRA_LOG_TRACE << "reconnect client{" << client.get()
<< "} cost time: "
<< cost_time / std::chrono::milliseconds{1} << "ms";
if (ok) {
CINATRA_LOG_DEBUG << "reconnect client{" << client.get() << "} success";
CINATRA_LOG_TRACE << "reconnect client{" << client.get() << "} success";
co_return;
}
CINATRA_LOG_DEBUG << "reconnect client{" << client.get()
CINATRA_LOG_TRACE << "reconnect client{" << client.get()
<< "} failed. If client close:{" << client->has_closed()
<< "}";
auto wait_time = rand_time(
(pool_config_.reconnect_wait_time * (i + 1) - cost_time) / 1ms * 1ms);
(self->pool_config_.reconnect_wait_time - cost_time) / 1ms * 1ms);
self = nullptr;
if (wait_time.count() > 0)
co_await coro_io::sleep_for(wait_time, &client->get_executor());
}
self = watcher.lock();
++i;
} while (i < self->pool_config_.connect_retry_count);
CINATRA_LOG_WARNING << "reconnect client{" << client.get() << "},host:{"
<< client->get_host() << ":" << client->get_port()
<< "} out of max limit, stop retry. connect failed";
Expand All @@ -150,30 +156,23 @@ class client_pool : public std::enable_shared_from_this<
async_simple::Promise<std::unique_ptr<client_t>> promise_;
};

async_simple::coro::Lazy<void> connect_client(
static async_simple::coro::Lazy<void> connect_client(
std::unique_ptr<client_t> client, std::weak_ptr<client_pool> watcher,
std::shared_ptr<promise_handler> handler) {
CINATRA_LOG_DEBUG << "try to connect client{" << client.get()
<< "} to host:" << host_name_;
auto result = co_await client->connect(host_name_);
std::shared_ptr<client_pool> self = watcher.lock();
if (!client_t::is_ok(result)) {
CINATRA_LOG_DEBUG << "connect client{" << client.get() << "} to failed. ";
if (self) {
co_await reconnect(client);
}
}
if (client) {
CINATRA_LOG_DEBUG << "connect client{" << client.get() << "} successful!";
}
co_await reconnect(client, watcher);
auto has_get_connect = handler->flag_.exchange(true);
if (!has_get_connect) {
handler->promise_.setValue(std::move(client));
}
else {
auto conn_lim = std::min<unsigned>(10u, pool_config_.max_connection);
if (self && free_clients_.size() < conn_lim && client) {
enqueue(free_clients_, std::move(client), pool_config_.idle_timeout);
if (client) {
auto self = watcher.lock();
auto conn_lim =
std::min<unsigned>(10u, self->pool_config_.max_connection);
if (self && self->free_clients_.size() < conn_lim) {
self->enqueue(self->free_clients_, std::move(client),
self->pool_config_.idle_timeout);
}
}
}
}
Expand Down Expand Up @@ -226,7 +225,7 @@ class client_pool : public std::enable_shared_from_this<
}
}
});
CINATRA_LOG_DEBUG << "wait client by promise {" << &handler->promise_
CINATRA_LOG_TRACE << "wait client by promise {" << &handler->promise_
<< "}";
client = co_await handler->promise_.getFuture();
if (client) {
Expand All @@ -237,7 +236,7 @@ class client_pool : public std::enable_shared_from_this<
}
}
else {
CINATRA_LOG_DEBUG << "get free client{" << client.get()
CINATRA_LOG_TRACE << "get free client{" << client.get()
<< "}. from queue";
}
co_return std::move(client);
Expand All @@ -250,7 +249,7 @@ class client_pool : public std::enable_shared_from_this<
if (clients.enqueue(std::move(client)) == 1) {
std::size_t expected = 0;
if (clients.collecter_cnt_.compare_exchange_strong(expected, 1)) {
CINATRA_LOG_DEBUG << "start timeout client collecter of client_pool{"
CINATRA_LOG_TRACE << "start timeout client collecter of client_pool{"
<< host_name_ << "}";
collect_idle_timeout_client(
this->weak_from_this(), clients,
Expand All @@ -274,7 +273,7 @@ class client_pool : public std::enable_shared_from_this<
if (!has_get_connect) {
handler->promise_.setValue(std::move(client));
promise_cnt_ -= cnt;
CINATRA_LOG_DEBUG << "collect free client{" << client.get()
CINATRA_LOG_TRACE << "collect free client{" << client.get()
<< "} and wake up promise{" << &handler->promise_
<< "}";
return;
Expand All @@ -285,13 +284,13 @@ class client_pool : public std::enable_shared_from_this<

if (free_clients_.size() < pool_config_.max_connection) {
if (client) {
CINATRA_LOG_DEBUG << "collect free client{" << client.get()
CINATRA_LOG_TRACE << "collect free client{" << client.get()
<< "} enqueue";
enqueue(free_clients_, std::move(client), pool_config_.idle_timeout);
}
}
else {
CINATRA_LOG_DEBUG << "out of max connection limit <<"
CINATRA_LOG_TRACE << "out of max connection limit <<"
<< pool_config_.max_connection
<< ", collect free client{" << client.get()
<< "} enqueue short connect queue";
Expand All @@ -300,7 +299,7 @@ class client_pool : public std::enable_shared_from_this<
}
}
else {
CINATRA_LOG_DEBUG << "client{" << client.get()
CINATRA_LOG_TRACE << "client{" << client.get()
<< "} is closed. we won't collect it";
}

Expand Down
3 changes: 1 addition & 2 deletions include/cinatra/ylt/coro_io/coro_io.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
#include <async_simple/coro/Lazy.h>

#include <asio/connect.hpp>
#include <asio/dispatch.hpp>
#include <asio/experimental/channel.hpp>
#include <asio/ip/tcp.hpp>
#include <asio/read.hpp>
Expand Down Expand Up @@ -303,7 +302,7 @@ inline async_simple::coro::Lazy<void> sleep_for(Duration d) {
template <typename R, typename Func, typename Executor>
struct post_helper {
void operator()(auto handler) {
asio::dispatch(e, [this, handler]() {
asio::post(e, [this, handler]() {
try {
if constexpr (std::is_same_v<R, async_simple::Try<void>>) {
func();
Expand Down
2 changes: 1 addition & 1 deletion include/cinatra/ylt/coro_io/io_context_pool.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ class ExecutorWrapper : public async_simple::Executor {

context_t &context() { return executor_.context(); }

auto get_asio_executor() { return executor_; }
auto get_asio_executor() const { return executor_; }

operator ExecutorImpl() { return executor_; }

Expand Down
4 changes: 2 additions & 2 deletions lang/coro_http_client_introduction.md
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ async_simple::coro::Lazy<void> test_async_client() {
```

# http 重连
当http 请求失败之后,这个http client是不允许复用的,因为内部的socket 都已经关闭了,除非你调用reconnect 去重连host,这样就可以复用http client 了。
当http 请求失败之后,这个http client是不允许复用的,因为内部的socket 都已经关闭了,除非你调用connect 去重连host,这样就可以复用http client 了。

```c++
coro_http_client client1{};
Expand All @@ -250,7 +250,7 @@ async_simple::coro::Lazy<void> test_async_client() {
CHECK(r.status != 200);

// 通过重连复用client1
r = async_simple::coro::syncAwait(client1.reconnect("http://cn.bing.com"));
r = async_simple::coro::syncAwait(client1.connect("http://cn.bing.com"));
CHECK(client1.get_host() == "cn.bing.com");
CHECK(client1.get_port() == "http");
CHECK(r.status == 200);
Expand Down
6 changes: 3 additions & 3 deletions tests/test_cinatra.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -574,15 +574,15 @@ TEST_CASE("test coro_http_client async_http_connect") {
client1.async_http_connect("http//www.badurl.com"));
CHECK(r.status != 200);

r = async_simple::coro::syncAwait(client1.reconnect("http://cn.bing.com"));
r = async_simple::coro::syncAwait(client1.connect("http://cn.bing.com"));
CHECK(client1.get_host() == "cn.bing.com");
CHECK(client1.get_port() == "http");
CHECK(r.status >= 200);

r = async_simple::coro::syncAwait(client1.reconnect("http://www.baidu.com"));
r = async_simple::coro::syncAwait(client1.connect("http://www.baidu.com"));

CHECK(r.status >= 200);
r = async_simple::coro::syncAwait(client1.reconnect("http://cn.bing.com"));
r = async_simple::coro::syncAwait(client1.connect("http://cn.bing.com"));
CHECK(r.status == 200);
}

Expand Down

0 comments on commit ee8c3d0

Please sign in to comment.