Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

unify connect and reconnect #583

Merged
merged 1 commit into from
May 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 7 additions & 5 deletions include/cinatra/coro_http_client.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,12 @@ class coro_http_client : public std::enable_shared_from_this<coro_http_client> {

// only make socket connet(or handshake) to the host
async_simple::coro::Lazy<resp_data> connect(std::string uri) {
if (should_reset_) {
reset();
}
else {
should_reset_ = true;
}
resp_data data{};
bool no_schema = !has_schema(uri);
std::string append_uri;
Expand Down Expand Up @@ -896,11 +902,6 @@ class coro_http_client : public std::enable_shared_from_this<coro_http_client> {
resp_chunk_str_.clear();
}

async_simple::coro::Lazy<resp_data> reconnect(std::string uri) {
reset();
co_return co_await connect(std::move(uri));
}

std::string_view get_host() { return host_; }

std::string_view get_port() { return port_; }
Expand Down Expand Up @@ -2126,6 +2127,7 @@ class coro_http_client : public std::enable_shared_from_this<coro_http_client> {
bool enable_tcp_no_delay_ = true;
std::string resp_chunk_str_;
std::span<char> out_buf_;
bool should_reset_ = false;

#ifdef CINATRA_ENABLE_GZIP
bool enable_ws_deflate_ = false;
Expand Down
73 changes: 36 additions & 37 deletions include/cinatra/ylt/coro_io/client_pool.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,11 +77,11 @@ class client_pool : public std::enable_shared_from_this<
break;
}
while (true) {
CINATRA_LOG_DEBUG << "start collect timeout client of pool{"
CINATRA_LOG_TRACE << "start collect timeout client of pool{"
<< self->host_name_
<< "}, now client count: " << clients.size();
std::size_t is_all_cleared = clients.clear_old(clear_cnt);
CINATRA_LOG_DEBUG << "finish collect timeout client of pool{"
CINATRA_LOG_TRACE << "finish collect timeout client of pool{"
<< self->host_name_
<< "}, now client cnt: " << clients.size();
if (is_all_cleared != 0) [[unlikely]] {
Expand All @@ -108,37 +108,43 @@ class client_pool : public std::enable_shared_from_this<

static auto rand_time(std::chrono::milliseconds ms) {
static thread_local std::default_random_engine r;
std::uniform_real_distribution e(0.7f, 1.3f);
std::uniform_real_distribution e(1.0f, 1.2f);
return std::chrono::milliseconds{static_cast<long>(e(r) * ms.count())};
}

async_simple::coro::Lazy<void> reconnect(std::unique_ptr<client_t>& client) {
static async_simple::coro::Lazy<void> reconnect(
std::unique_ptr<client_t>& client, std::weak_ptr<client_pool> watcher) {
using namespace std::chrono_literals;
for (unsigned int i = 0; i < pool_config_.connect_retry_count; ++i) {
CINATRA_LOG_DEBUG << "try to reconnect client{" << client.get()
std::shared_ptr<client_pool> self = watcher.lock();
uint32_t i = UINT32_MAX; // (at least connect once)
do {
CINATRA_LOG_TRACE << "try to reconnect client{" << client.get()
<< "},host:{" << client->get_host() << ":"
<< client->get_port() << "}, try count:" << i
<< "max retry limit:"
<< pool_config_.connect_retry_count;
<< self->pool_config_.connect_retry_count;
auto pre_time_point = std::chrono::steady_clock::now();
bool ok = client_t::is_ok(co_await client->reconnect(host_name_));
bool ok = client_t::is_ok(co_await client->connect(self->host_name_));
auto post_time_point = std::chrono::steady_clock::now();
auto cost_time = post_time_point - pre_time_point;
CINATRA_LOG_DEBUG << "reconnect client{" << client.get()
CINATRA_LOG_TRACE << "reconnect client{" << client.get()
<< "} cost time: "
<< cost_time / std::chrono::milliseconds{1} << "ms";
if (ok) {
CINATRA_LOG_DEBUG << "reconnect client{" << client.get() << "} success";
CINATRA_LOG_TRACE << "reconnect client{" << client.get() << "} success";
co_return;
}
CINATRA_LOG_DEBUG << "reconnect client{" << client.get()
CINATRA_LOG_TRACE << "reconnect client{" << client.get()
<< "} failed. If client close:{" << client->has_closed()
<< "}";
auto wait_time = rand_time(
(pool_config_.reconnect_wait_time * (i + 1) - cost_time) / 1ms * 1ms);
(self->pool_config_.reconnect_wait_time - cost_time) / 1ms * 1ms);
self = nullptr;
if (wait_time.count() > 0)
co_await coro_io::sleep_for(wait_time, &client->get_executor());
}
self = watcher.lock();
++i;
} while (i < self->pool_config_.connect_retry_count);
CINATRA_LOG_WARNING << "reconnect client{" << client.get() << "},host:{"
<< client->get_host() << ":" << client->get_port()
<< "} out of max limit, stop retry. connect failed";
Expand All @@ -150,30 +156,23 @@ class client_pool : public std::enable_shared_from_this<
async_simple::Promise<std::unique_ptr<client_t>> promise_;
};

async_simple::coro::Lazy<void> connect_client(
static async_simple::coro::Lazy<void> connect_client(
std::unique_ptr<client_t> client, std::weak_ptr<client_pool> watcher,
std::shared_ptr<promise_handler> handler) {
CINATRA_LOG_DEBUG << "try to connect client{" << client.get()
<< "} to host:" << host_name_;
auto result = co_await client->connect(host_name_);
std::shared_ptr<client_pool> self = watcher.lock();
if (!client_t::is_ok(result)) {
CINATRA_LOG_DEBUG << "connect client{" << client.get() << "} to failed. ";
if (self) {
co_await reconnect(client);
}
}
if (client) {
CINATRA_LOG_DEBUG << "connect client{" << client.get() << "} successful!";
}
co_await reconnect(client, watcher);
auto has_get_connect = handler->flag_.exchange(true);
if (!has_get_connect) {
handler->promise_.setValue(std::move(client));
}
else {
auto conn_lim = std::min<unsigned>(10u, pool_config_.max_connection);
if (self && free_clients_.size() < conn_lim && client) {
enqueue(free_clients_, std::move(client), pool_config_.idle_timeout);
if (client) {
auto self = watcher.lock();
auto conn_lim =
std::min<unsigned>(10u, self->pool_config_.max_connection);
if (self && self->free_clients_.size() < conn_lim) {
self->enqueue(self->free_clients_, std::move(client),
self->pool_config_.idle_timeout);
}
}
}
}
Expand Down Expand Up @@ -226,7 +225,7 @@ class client_pool : public std::enable_shared_from_this<
}
}
});
CINATRA_LOG_DEBUG << "wait client by promise {" << &handler->promise_
CINATRA_LOG_TRACE << "wait client by promise {" << &handler->promise_
<< "}";
client = co_await handler->promise_.getFuture();
if (client) {
Expand All @@ -237,7 +236,7 @@ class client_pool : public std::enable_shared_from_this<
}
}
else {
CINATRA_LOG_DEBUG << "get free client{" << client.get()
CINATRA_LOG_TRACE << "get free client{" << client.get()
<< "}. from queue";
}
co_return std::move(client);
Expand All @@ -250,7 +249,7 @@ class client_pool : public std::enable_shared_from_this<
if (clients.enqueue(std::move(client)) == 1) {
std::size_t expected = 0;
if (clients.collecter_cnt_.compare_exchange_strong(expected, 1)) {
CINATRA_LOG_DEBUG << "start timeout client collecter of client_pool{"
CINATRA_LOG_TRACE << "start timeout client collecter of client_pool{"
<< host_name_ << "}";
collect_idle_timeout_client(
this->weak_from_this(), clients,
Expand All @@ -274,7 +273,7 @@ class client_pool : public std::enable_shared_from_this<
if (!has_get_connect) {
handler->promise_.setValue(std::move(client));
promise_cnt_ -= cnt;
CINATRA_LOG_DEBUG << "collect free client{" << client.get()
CINATRA_LOG_TRACE << "collect free client{" << client.get()
<< "} and wake up promise{" << &handler->promise_
<< "}";
return;
Expand All @@ -285,13 +284,13 @@ class client_pool : public std::enable_shared_from_this<

if (free_clients_.size() < pool_config_.max_connection) {
if (client) {
CINATRA_LOG_DEBUG << "collect free client{" << client.get()
CINATRA_LOG_TRACE << "collect free client{" << client.get()
<< "} enqueue";
enqueue(free_clients_, std::move(client), pool_config_.idle_timeout);
}
}
else {
CINATRA_LOG_DEBUG << "out of max connection limit <<"
CINATRA_LOG_TRACE << "out of max connection limit <<"
<< pool_config_.max_connection
<< ", collect free client{" << client.get()
<< "} enqueue short connect queue";
Expand All @@ -300,7 +299,7 @@ class client_pool : public std::enable_shared_from_this<
}
}
else {
CINATRA_LOG_DEBUG << "client{" << client.get()
CINATRA_LOG_TRACE << "client{" << client.get()
<< "} is closed. we won't collect it";
}

Expand Down
3 changes: 1 addition & 2 deletions include/cinatra/ylt/coro_io/coro_io.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
#include <async_simple/coro/Lazy.h>

#include <asio/connect.hpp>
#include <asio/dispatch.hpp>
#include <asio/experimental/channel.hpp>
#include <asio/ip/tcp.hpp>
#include <asio/read.hpp>
Expand Down Expand Up @@ -303,7 +302,7 @@ inline async_simple::coro::Lazy<void> sleep_for(Duration d) {
template <typename R, typename Func, typename Executor>
struct post_helper {
void operator()(auto handler) {
asio::dispatch(e, [this, handler]() {
asio::post(e, [this, handler]() {
try {
if constexpr (std::is_same_v<R, async_simple::Try<void>>) {
func();
Expand Down
2 changes: 1 addition & 1 deletion include/cinatra/ylt/coro_io/io_context_pool.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ class ExecutorWrapper : public async_simple::Executor {

context_t &context() { return executor_.context(); }

auto get_asio_executor() { return executor_; }
auto get_asio_executor() const { return executor_; }

operator ExecutorImpl() { return executor_; }

Expand Down
4 changes: 2 additions & 2 deletions lang/coro_http_client_introduction.md
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ async_simple::coro::Lazy<void> test_async_client() {
```

# http 重连
当http 请求失败之后,这个http client是不允许复用的,因为内部的socket 都已经关闭了,除非你调用reconnect 去重连host,这样就可以复用http client 了。
当http 请求失败之后,这个http client是不允许复用的,因为内部的socket 都已经关闭了,除非你调用connect 去重连host,这样就可以复用http client 了。

```c++
coro_http_client client1{};
Expand All @@ -250,7 +250,7 @@ async_simple::coro::Lazy<void> test_async_client() {
CHECK(r.status != 200);

// 通过重连复用client1
r = async_simple::coro::syncAwait(client1.reconnect("http://cn.bing.com"));
r = async_simple::coro::syncAwait(client1.connect("http://cn.bing.com"));
CHECK(client1.get_host() == "cn.bing.com");
CHECK(client1.get_port() == "http");
CHECK(r.status == 200);
Expand Down
6 changes: 3 additions & 3 deletions tests/test_cinatra.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -574,15 +574,15 @@ TEST_CASE("test coro_http_client async_http_connect") {
client1.async_http_connect("http//www.badurl.com"));
CHECK(r.status != 200);

r = async_simple::coro::syncAwait(client1.reconnect("http://cn.bing.com"));
r = async_simple::coro::syncAwait(client1.connect("http://cn.bing.com"));
CHECK(client1.get_host() == "cn.bing.com");
CHECK(client1.get_port() == "http");
CHECK(r.status >= 200);

r = async_simple::coro::syncAwait(client1.reconnect("http://www.baidu.com"));
r = async_simple::coro::syncAwait(client1.connect("http://www.baidu.com"));

CHECK(r.status >= 200);
r = async_simple::coro::syncAwait(client1.reconnect("http://cn.bing.com"));
r = async_simple::coro::syncAwait(client1.connect("http://cn.bing.com"));
CHECK(r.status == 200);
}

Expand Down
Loading