Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve reverse proxy #526

Merged
merged 3 commits into from
Feb 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion include/cinatra.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
#define CINATRA_CINATRA_HPP

#include "cinatra/coro_http_client.hpp"
#include "cinatra/coro_http_reverse_proxy.hpp"
#include "cinatra/coro_http_server.hpp"
#include "cinatra/smtp_client.hpp"

Expand Down
107 changes: 0 additions & 107 deletions include/cinatra/coro_http_reverse_proxy.hpp

This file was deleted.

57 changes: 57 additions & 0 deletions include/cinatra/coro_http_server.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,13 @@
#include "asio/streambuf.hpp"
#include "async_simple/Promise.h"
#include "async_simple/coro/Lazy.h"
#include "cinatra/coro_http_client.hpp"
#include "cinatra/coro_http_response.hpp"
#include "cinatra/coro_http_router.hpp"
#include "cinatra/mime_types.hpp"
#include "cinatra_log_wrapper.hpp"
#include "coro_http_connection.hpp"
#include "ylt/coro_io/channel.hpp"
#include "ylt/coro_io/coro_file.hpp"
#include "ylt/coro_io/coro_io.hpp"
#include "ylt/coro_io/io_context_pool.hpp"
Expand Down Expand Up @@ -162,6 +164,37 @@ class coro_http_server {
}
}

template <http_method... method, typename... Aspects>
void set_http_proxy_handler(std::string url_path,
std::vector<std::string_view> hosts,
coro_io::load_blance_algorithm type =
coro_io::load_blance_algorithm::random,
std::vector<int> weights = {},
Aspects &&...aspects) {
if (hosts.empty()) {
throw std::invalid_argument("not config hosts yet!");
}

auto channel = std::make_shared<coro_io::channel<coro_http_client>>(
coro_io::channel<coro_http_client>::create(hosts, {.lba = type},
weights));
set_http_handler<method...>(
url_path,
[this, channel, type, url_path](
coro_http_request &req,
coro_http_response &response) -> async_simple::coro::Lazy<void> {
co_await channel->send_request(
[this, &req, &response](
coro_http_client &client,
std::string_view host) -> async_simple::coro::Lazy<void> {
uri_t uri;
uri.parse_from(host.data());
co_await reply(client, uri.get_path(), req, response);
});
},
std::forward<Aspects>(aspects)...);
}

void set_max_size_of_cache_files(size_t max_size = 3 * 1024 * 1024) {
std::error_code ec;
for (const auto &file :
Expand Down Expand Up @@ -679,6 +712,30 @@ class coro_http_server {
co_return true;
}

async_simple::coro::Lazy<void> reply(coro_http_client &client,
std::string url_path,
coro_http_request &req,
coro_http_response &response) {
std::unordered_map<std::string, std::string> req_headers;
for (auto &[k, v] : req_headers) {
req_headers.emplace(k, v);
}

auto ctx = req_context<std::string_view>{.content = req.get_body()};
auto result = co_await client.async_request(
std::move(url_path), method_type(req.get_method()), std::move(ctx),
std::move(req_headers));

for (auto &[k, v] : result.resp_headers) {
response.add_header(std::string(k), std::string(v));
}

response.set_status_and_content_view(
static_cast<status_type>(result.status), result.resp_body);
co_await response.get_conn()->reply();
response.set_delay(true);
}

private:
std::unique_ptr<coro_io::io_context_pool> pool_;
asio::io_context *out_ctx_ = nullptr;
Expand Down
53 changes: 37 additions & 16 deletions include/cinatra/ylt/coro_io/channel.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

#include <atomic>
#include <memory>
#include <numeric>
#include <random>

#include "client_pool.hpp"
Expand Down Expand Up @@ -53,10 +54,31 @@ class channel {
}
};

/*
Supposing that there is a server set ''S'' = {S0, S1, …, Sn-1};
W(Si) indicates the weight of Si;
''i'' indicates the server selected last time, and ''i'' is initialized with
-1;
''cw'' is the current weight in scheduling, and cw is initialized with zero;
max(S) is the maximum weight of all the servers in S;
gcd(S) is the greatest common divisor of all server weights in S;

while (true) {
i = (i + 1) mod n;
if (i == 0) {
cw = cw - gcd(S);
if (cw <= 0) {
cw = max(S);
if (cw == 0)
return NULL;
}
}
if (W(Si) >= cw)
return Si;
}
*/
struct WRRLoadBlancer {
WRRLoadBlancer(const std::vector<std::string>& hosts,
const std::vector<int>& weights)
: hosts_(hosts), weights_(weights) {
WRRLoadBlancer(const std::vector<int>& weights) : weights_(weights) {
max_gcd_ = get_max_weight_gcd();
max_weight_ = get_max_weight();
}
Expand All @@ -74,12 +96,8 @@ class channel {

private:
int select_host_with_weight_round_robin() {
if (hosts_.empty()) {
throw std::invalid_argument("host list is empty!");
}

while (true) {
wrr_current_ = (wrr_current_ + 1) % hosts_.size();
wrr_current_ = (wrr_current_ + 1) % weights_.size();
if (wrr_current_ == 0) {
weight_current_ = weight_current_ - max_gcd_;
if (weight_current_ <= 0) {
Expand All @@ -96,15 +114,13 @@ class channel {
}
}

int gcd(int a, int b) { return !b ? a : gcd(b, a % b); }

int get_max_weight_gcd() {
int res = weights_[0];
int cur_max = 0, cur_min = 0;
for (size_t i = 0; i < hosts_.size(); i++) {
for (size_t i = 0; i < weights_.size(); i++) {
cur_max = (std::max)(res, weights_[i]);
cur_min = (std::min)(res, weights_[i]);
res = gcd(cur_max, cur_min);
res = std::gcd(cur_max, cur_min);
}
return res;
}
Expand All @@ -113,7 +129,6 @@ class channel {
return *std::max_element(weights_.begin(), weights_.end());
}

std::vector<std::string> hosts_;
std::vector<int> weights_;
int max_gcd_ = 0;
int max_weight_ = 0;
Expand Down Expand Up @@ -190,9 +205,15 @@ class channel {
case load_blance_algorithm::RR:
lb_worker = RRLoadBlancer{};
break;
case load_blance_algorithm::WRR:
lb_worker = WRRLoadBlancer({hosts.begin(), hosts.end()}, weights);
break;
case load_blance_algorithm::WRR: {
if (hosts.empty() || weights.empty()) {
throw std::invalid_argument("host/weight list is empty!");
}
if (hosts.size() != weights.size()) {
throw std::invalid_argument("hosts count is not equal with weights!");
}
lb_worker = WRRLoadBlancer(weights);
} break;
case load_blance_algorithm::random:
default:
lb_worker = RandomLoadBlancer{};
Expand Down
Loading
Loading