Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[new features]Improve metric #591

Merged
merged 35 commits into from
Jun 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions example/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ set(CINATRA_EXAMPLE
main.cpp
)

include_directories(../include)
include_directories(../include/cinatra)

add_executable(${project_name} ${CINATRA_EXAMPLE})
target_compile_definitions(${project_name} PRIVATE ASYNC_SIMPLE_HAS_NOT_AIO)

Expand Down
136 changes: 120 additions & 16 deletions example/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,10 @@
#include <vector>

#include "../include/cinatra.hpp"
#include "cinatra/ylt/metric/gauge.hpp"
#include "cinatra/ylt/metric/histogram.hpp"
#include "cinatra/ylt/metric/summary.hpp"
#include "metric_conf.hpp"

using namespace cinatra;
using namespace ylt;
using namespace std::chrono_literals;

void create_file(std::string filename, size_t file_size = 64) {
Expand Down Expand Up @@ -373,6 +372,8 @@ async_simple::coro::Lazy<void> basic_usage() {
response.set_status_and_content(status_type::ok, "ok");
});

server.use_metrics();

person_t person{};
server.set_http_handler<GET>("/person", &person_t::foo, person);

Expand Down Expand Up @@ -420,17 +421,21 @@ async_simple::coro::Lazy<void> basic_usage() {
// make sure you have install openssl and enable CINATRA_ENABLE_SSL
#ifdef CINATRA_ENABLE_SSL
coro_http_client client2{};
result = co_await client2.async_get("https://baidu.com");
assert(result.status == 200);

result = client2.post("https://baidu.com", "test", req_content_type::string);
std::cout << result.resp_body << "\n";
result.net_err.value() assert(result.status == 200);
#endif
}

void use_metric() {
auto c = std::make_shared<counter_t>("request_count", "request count",
std::vector{"method", "url"});
auto failed = std::make_shared<gauge_t>("not_found_request_count",
"not found request count",
std::vector{"method", "code", "url"});
using namespace ylt;
auto c =
std::make_shared<counter_t>("request_count", "request count",
std::vector<std::string>{"method", "url"});
auto failed = std::make_shared<gauge_t>(
"not_found_request_count", "not found request count",
std::vector<std::string>{"method", "code", "url"});
auto total =
std::make_shared<counter_t>("total_request_count", "total request count");

Expand All @@ -443,11 +448,11 @@ void use_metric() {
summary_t::Quantiles{
{0.5, 0.05}, {0.9, 0.01}, {0.95, 0.005}, {0.99, 0.001}});

metric_t::regiter_metric(c);
metric_t::regiter_metric(total);
metric_t::regiter_metric(failed);
metric_t::regiter_metric(h);
metric_t::regiter_metric(summary);
default_metric_manger::register_metric_dynamic(c);
default_metric_manger::register_metric_dynamic(total);
default_metric_manger::register_metric_dynamic(failed);
default_metric_manger::register_metric_dynamic(h);
default_metric_manger::register_metric_dynamic(summary);

std::random_device rd;
std::mt19937 gen(rd());
Expand Down Expand Up @@ -499,13 +504,112 @@ void use_metric() {
server.set_http_handler<GET, POST>(
"/metrics", [](coro_http_request &req, coro_http_response &resp) {
resp.need_date_head(false);
resp.set_status_and_content(status_type::ok, metric_t::serialize());
resp.set_status_and_content(status_type::ok, "");
});
server.sync_start();
}

void metrics_example() {
auto get_req_counter = std::make_shared<counter_t>(
"get_req_count", "get req count",
std::map<std::string, std::string>{{"url", "/get"}});
auto get_req_qps = std::make_shared<gauge_t>("get_req_qps", "get req qps");
// default_metric_manger::register_metric_static(get_req_counter,
// get_req_qps);
int64_t last = 0;
std::thread thd([&] {
while (true) {
std::this_thread::sleep_for(1s);
auto value = get_req_counter->value({"/get"});
get_req_qps->update(value - last);
last = value;
}
});
thd.detach();

coro_http_server server(1, 9001);
server.set_http_handler<GET>(
"/get", [&](coro_http_request &req, coro_http_response &resp) {
// get_req_counter->inc({"/get"});
resp.set_status_and_content(status_type::ok, "ok");
});
server.set_http_handler<GET>(
"/", [&](coro_http_request &req, coro_http_response &resp) {
resp.set_status_and_content(status_type::ok, "hello world");
});
server.use_metrics();
server.sync_start();
}

async_simple::coro::Lazy<void> use_channel() {
coro_http_server server(1, 9001);
server.set_http_handler<GET>(
"/", [&](coro_http_request &req, coro_http_response &resp) {
resp.set_status_and_content(status_type::ok, "hello world");
});
server.use_metrics();
server.async_start();
std::this_thread::sleep_for(100ms);

auto channel = std::make_shared<coro_io::channel<coro_http_client>>(
coro_io::channel<coro_http_client>::create(
{"127.0.0.1:9001"}, {.lba = coro_io::load_blance_algorithm::random}));
std::string url = "http://127.0.0.1:9001/";
co_await channel->send_request(
[&url](coro_http_client &client,
std::string_view host) -> async_simple::coro::Lazy<void> {
auto data = co_await client.async_get(url);
std::cout << data.net_err.message() << "\n";
std::cout << data.resp_body << "\n";
});
}

async_simple::coro::Lazy<void> use_pool() {
coro_http_server server(1, 9001);
server.set_http_handler<GET>(
"/", [&](coro_http_request &req, coro_http_response &resp) {
resp.set_status_and_content(status_type::ok, "hello world");
});
server.use_metrics();
server.async_start();

auto map = default_metric_manger::metric_map_static();
for (auto &[k, m] : map) {
std::cout << k << ", ";
std::cout << m->help() << "\n";
}

std::string url = "http://127.0.0.1:9001/";

auto pool = coro_io::client_pool<coro_http_client>::create(
url, {std::thread::hardware_concurrency() * 2});

std::atomic<size_t> count = 0;
for (size_t i = 0; i < 10000; i++) {
pool->send_request(
[&](coro_http_client &client) -> async_simple::coro::Lazy<void> {
auto data = co_await client.async_get(url);
std::cout << data.resp_body << "\n";
})
.start([&](auto &&) {
count++;
});
}

while (count != 10000) {
std::this_thread::sleep_for(5ms);
}

int size = pool->free_client_count();
printf("current client count: %d, \n", size);
co_return;
}

int main() {
// use_metric();
// metrics_example();
async_simple::coro::syncAwait(use_channel());
async_simple::coro::syncAwait(use_pool());
async_simple::coro::syncAwait(basic_usage());
async_simple::coro::syncAwait(use_aspects());
async_simple::coro::syncAwait(static_file_server());
Expand Down
75 changes: 72 additions & 3 deletions include/cinatra/coro_http_connection.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,14 @@
#include "sha1.hpp"
#include "string_resize.hpp"
#include "websocket.hpp"
#include "ylt/metric/counter.hpp"
#include "ylt/metric/gauge.hpp"
#include "ylt/metric/histogram.hpp"
#include "ylt/metric/metric.hpp"
#ifdef CINATRA_ENABLE_GZIP
#include "gzip.hpp"
#endif
#include "metric_conf.hpp"
#include "ylt/coro_io/coro_file.hpp"
#include "ylt/coro_io/coro_io.hpp"

Expand All @@ -47,9 +52,14 @@ class coro_http_connection
request_(parser_, this),
response_(this) {
buffers_.reserve(3);

cinatra_metric_conf::server_total_fd_inc();
}

~coro_http_connection() { close(); }
~coro_http_connection() {
cinatra_metric_conf::server_total_fd_dec();
close();
}

#ifdef CINATRA_ENABLE_SSL
bool init_ssl(const std::string &cert_file, const std::string &key_file,
Expand Down Expand Up @@ -94,6 +104,8 @@ class coro_http_connection
#ifdef CINATRA_ENABLE_SSL
bool has_shake = false;
#endif
std::chrono::system_clock::time_point start{};
std::chrono::system_clock::time_point mid{};
while (true) {
#ifdef CINATRA_ENABLE_SSL
if (use_ssl_ && !has_shake) {
Expand All @@ -113,13 +125,21 @@ class coro_http_connection
if (ec != asio::error::eof) {
CINATRA_LOG_WARNING << "read http header error: " << ec.message();
}

cinatra_metric_conf::server_failed_req_inc();
close();
break;
}

if (cinatra_metric_conf::enable_metric) {
start = std::chrono::system_clock::now();
cinatra_metric_conf::server_total_req_inc();
}

const char *data_ptr = asio::buffer_cast<const char *>(head_buf_.data());
int head_len = parser_.parse_request(data_ptr, size, 0);
if (head_len <= 0) {
cinatra_metric_conf::server_failed_req_inc();
CINATRA_LOG_ERROR << "parse http header error";
close();
break;
Expand All @@ -133,6 +153,9 @@ class coro_http_connection
if (type != content_type::chunked && type != content_type::multipart) {
size_t body_len = parser_.body_len();
if (body_len == 0) {
if (cinatra_metric_conf::enable_metric) {
cinatra_metric_conf::server_total_recv_bytes_inc(head_len);
}
if (parser_.method() == "GET"sv) {
if (request_.is_upgrade()) {
#ifdef CINATRA_ENABLE_GZIP
Expand All @@ -152,6 +175,16 @@ class coro_http_connection
}
response_.set_delay(true);
}
else {
if (cinatra_metric_conf::enable_metric) {
mid = std::chrono::system_clock::now();
double count =
std::chrono::duration_cast<std::chrono::microseconds>(mid -
start)
.count();
cinatra_metric_conf::server_read_latency_observe(count);
}
}
}
}
else if (body_len <= head_buf_.size()) {
Expand All @@ -161,6 +194,7 @@ class coro_http_connection
memcpy(body_.data(), data_ptr, body_len);
head_buf_.consume(head_buf_.size());
}
cinatra_metric_conf::server_total_recv_bytes_inc(head_len + body_len);
}
else {
size_t part_size = head_buf_.size();
Expand All @@ -175,9 +209,22 @@ class coro_http_connection
size_to_read);
if (ec) {
CINATRA_LOG_ERROR << "async_read error: " << ec.message();
cinatra_metric_conf::server_failed_req_inc();
close();
break;
}
else {
if (cinatra_metric_conf::enable_metric) {
cinatra_metric_conf::server_total_recv_bytes_inc(head_len +
body_len);
mid = std::chrono::system_clock::now();
double count =
std::chrono::duration_cast<std::chrono::microseconds>(mid -
start)
.count();
cinatra_metric_conf::server_read_latency_observe(count);
}
}
}
}

Expand Down Expand Up @@ -362,6 +409,14 @@ class coro_http_connection
}
}

if (cinatra_metric_conf::enable_metric) {
mid = std::chrono::system_clock::now();
double count =
std::chrono::duration_cast<std::chrono::microseconds>(mid - start)
.count();
cinatra_metric_conf::server_req_latency_observe(count);
}

response_.clear();
request_.clear();
buffers_.clear();
Expand All @@ -375,18 +430,32 @@ class coro_http_connection
}

async_simple::coro::Lazy<bool> reply(bool need_to_bufffer = true) {
if (response_.status() >= status_type::bad_request) {
if (cinatra_metric_conf::enable_metric)
cinatra_metric_conf::server_failed_req_inc();
}
std::error_code ec;
size_t size;
if (multi_buf_) {
if (need_to_bufffer) {
response_.to_buffers(buffers_, chunk_size_str_);
}
int64_t send_size = 0;
for (auto &buf : buffers_) {
send_size += buf.size();
}
if (cinatra_metric_conf::enable_metric) {
cinatra_metric_conf::server_total_send_bytes_inc(send_size);
}
std::tie(ec, size) = co_await async_write(buffers_);
}
else {
if (need_to_bufffer) {
response_.build_resp_str(resp_str_);
}
if (cinatra_metric_conf::enable_metric) {
cinatra_metric_conf::server_total_send_bytes_inc(resp_str_.size());
}
std::tie(ec, size) = co_await async_write(asio::buffer(resp_str_));
}

Expand Down Expand Up @@ -794,7 +863,7 @@ class coro_http_connection
return last_rwtime_;
}

auto &get_executor() { return *executor_; }
auto get_executor() { return executor_; }

void close(bool need_cb = true) {
if (has_closed_) {
Expand Down Expand Up @@ -884,7 +953,7 @@ class coro_http_connection

private:
friend class multipart_reader_t<coro_http_connection>;
async_simple::Executor *executor_;
coro_io::ExecutorWrapper<> *executor_;
asio::ip::tcp::socket socket_;
coro_http_router &router_;
asio::streambuf head_buf_;
Expand Down
Loading
Loading