Skip to content

Commit

Permalink
[new features]Improve metric (#591)
Browse files Browse the repository at this point in the history
  • Loading branch information
qicosmos authored Jun 7, 2024
1 parent bf4199f commit 3e6d502
Show file tree
Hide file tree
Showing 18 changed files with 1,493 additions and 316 deletions.
3 changes: 3 additions & 0 deletions example/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ set(CINATRA_EXAMPLE
main.cpp
)

include_directories(../include)
include_directories(../include/cinatra)

add_executable(${project_name} ${CINATRA_EXAMPLE})
target_compile_definitions(${project_name} PRIVATE ASYNC_SIMPLE_HAS_NOT_AIO)

Expand Down
136 changes: 120 additions & 16 deletions example/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,10 @@
#include <vector>

#include "../include/cinatra.hpp"
#include "cinatra/ylt/metric/gauge.hpp"
#include "cinatra/ylt/metric/histogram.hpp"
#include "cinatra/ylt/metric/summary.hpp"
#include "metric_conf.hpp"

using namespace cinatra;
using namespace ylt;
using namespace std::chrono_literals;

void create_file(std::string filename, size_t file_size = 64) {
Expand Down Expand Up @@ -373,6 +372,8 @@ async_simple::coro::Lazy<void> basic_usage() {
response.set_status_and_content(status_type::ok, "ok");
});

server.use_metrics();

person_t person{};
server.set_http_handler<GET>("/person", &person_t::foo, person);

Expand Down Expand Up @@ -420,17 +421,21 @@ async_simple::coro::Lazy<void> basic_usage() {
// make sure you have install openssl and enable CINATRA_ENABLE_SSL
#ifdef CINATRA_ENABLE_SSL
coro_http_client client2{};
result = co_await client2.async_get("https://baidu.com");
assert(result.status == 200);

result = client2.post("https://baidu.com", "test", req_content_type::string);
std::cout << result.resp_body << "\n";
result.net_err.value() assert(result.status == 200);
#endif
}

void use_metric() {
auto c = std::make_shared<counter_t>("request_count", "request count",
std::vector{"method", "url"});
auto failed = std::make_shared<gauge_t>("not_found_request_count",
"not found request count",
std::vector{"method", "code", "url"});
using namespace ylt;
auto c =
std::make_shared<counter_t>("request_count", "request count",
std::vector<std::string>{"method", "url"});
auto failed = std::make_shared<gauge_t>(
"not_found_request_count", "not found request count",
std::vector<std::string>{"method", "code", "url"});
auto total =
std::make_shared<counter_t>("total_request_count", "total request count");

Expand All @@ -443,11 +448,11 @@ void use_metric() {
summary_t::Quantiles{
{0.5, 0.05}, {0.9, 0.01}, {0.95, 0.005}, {0.99, 0.001}});

metric_t::regiter_metric(c);
metric_t::regiter_metric(total);
metric_t::regiter_metric(failed);
metric_t::regiter_metric(h);
metric_t::regiter_metric(summary);
default_metric_manger::register_metric_dynamic(c);
default_metric_manger::register_metric_dynamic(total);
default_metric_manger::register_metric_dynamic(failed);
default_metric_manger::register_metric_dynamic(h);
default_metric_manger::register_metric_dynamic(summary);

std::random_device rd;
std::mt19937 gen(rd());
Expand Down Expand Up @@ -499,13 +504,112 @@ void use_metric() {
server.set_http_handler<GET, POST>(
"/metrics", [](coro_http_request &req, coro_http_response &resp) {
resp.need_date_head(false);
resp.set_status_and_content(status_type::ok, metric_t::serialize());
resp.set_status_and_content(status_type::ok, "");
});
server.sync_start();
}

void metrics_example() {
auto get_req_counter = std::make_shared<counter_t>(
"get_req_count", "get req count",
std::map<std::string, std::string>{{"url", "/get"}});
auto get_req_qps = std::make_shared<gauge_t>("get_req_qps", "get req qps");
// default_metric_manger::register_metric_static(get_req_counter,
// get_req_qps);
int64_t last = 0;
std::thread thd([&] {
while (true) {
std::this_thread::sleep_for(1s);
auto value = get_req_counter->value({"/get"});
get_req_qps->update(value - last);
last = value;
}
});
thd.detach();

coro_http_server server(1, 9001);
server.set_http_handler<GET>(
"/get", [&](coro_http_request &req, coro_http_response &resp) {
// get_req_counter->inc({"/get"});
resp.set_status_and_content(status_type::ok, "ok");
});
server.set_http_handler<GET>(
"/", [&](coro_http_request &req, coro_http_response &resp) {
resp.set_status_and_content(status_type::ok, "hello world");
});
server.use_metrics();
server.sync_start();
}

async_simple::coro::Lazy<void> use_channel() {
coro_http_server server(1, 9001);
server.set_http_handler<GET>(
"/", [&](coro_http_request &req, coro_http_response &resp) {
resp.set_status_and_content(status_type::ok, "hello world");
});
server.use_metrics();
server.async_start();
std::this_thread::sleep_for(100ms);

auto channel = std::make_shared<coro_io::channel<coro_http_client>>(
coro_io::channel<coro_http_client>::create(
{"127.0.0.1:9001"}, {.lba = coro_io::load_blance_algorithm::random}));
std::string url = "http://127.0.0.1:9001/";
co_await channel->send_request(
[&url](coro_http_client &client,
std::string_view host) -> async_simple::coro::Lazy<void> {
auto data = co_await client.async_get(url);
std::cout << data.net_err.message() << "\n";
std::cout << data.resp_body << "\n";
});
}

async_simple::coro::Lazy<void> use_pool() {
coro_http_server server(1, 9001);
server.set_http_handler<GET>(
"/", [&](coro_http_request &req, coro_http_response &resp) {
resp.set_status_and_content(status_type::ok, "hello world");
});
server.use_metrics();
server.async_start();

auto map = default_metric_manger::metric_map_static();
for (auto &[k, m] : map) {
std::cout << k << ", ";
std::cout << m->help() << "\n";
}

std::string url = "http://127.0.0.1:9001/";

auto pool = coro_io::client_pool<coro_http_client>::create(
url, {std::thread::hardware_concurrency() * 2});

std::atomic<size_t> count = 0;
for (size_t i = 0; i < 10000; i++) {
pool->send_request(
[&](coro_http_client &client) -> async_simple::coro::Lazy<void> {
auto data = co_await client.async_get(url);
std::cout << data.resp_body << "\n";
})
.start([&](auto &&) {
count++;
});
}

while (count != 10000) {
std::this_thread::sleep_for(5ms);
}

int size = pool->free_client_count();
printf("current client count: %d, \n", size);
co_return;
}

int main() {
// use_metric();
// metrics_example();
async_simple::coro::syncAwait(use_channel());
async_simple::coro::syncAwait(use_pool());
async_simple::coro::syncAwait(basic_usage());
async_simple::coro::syncAwait(use_aspects());
async_simple::coro::syncAwait(static_file_server());
Expand Down
75 changes: 72 additions & 3 deletions include/cinatra/coro_http_connection.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,14 @@
#include "sha1.hpp"
#include "string_resize.hpp"
#include "websocket.hpp"
#include "ylt/metric/counter.hpp"
#include "ylt/metric/gauge.hpp"
#include "ylt/metric/histogram.hpp"
#include "ylt/metric/metric.hpp"
#ifdef CINATRA_ENABLE_GZIP
#include "gzip.hpp"
#endif
#include "metric_conf.hpp"
#include "ylt/coro_io/coro_file.hpp"
#include "ylt/coro_io/coro_io.hpp"

Expand All @@ -47,9 +52,14 @@ class coro_http_connection
request_(parser_, this),
response_(this) {
buffers_.reserve(3);

cinatra_metric_conf::server_total_fd_inc();
}

~coro_http_connection() { close(); }
~coro_http_connection() {
cinatra_metric_conf::server_total_fd_dec();
close();
}

#ifdef CINATRA_ENABLE_SSL
bool init_ssl(const std::string &cert_file, const std::string &key_file,
Expand Down Expand Up @@ -94,6 +104,8 @@ class coro_http_connection
#ifdef CINATRA_ENABLE_SSL
bool has_shake = false;
#endif
std::chrono::system_clock::time_point start{};
std::chrono::system_clock::time_point mid{};
while (true) {
#ifdef CINATRA_ENABLE_SSL
if (use_ssl_ && !has_shake) {
Expand All @@ -113,13 +125,21 @@ class coro_http_connection
if (ec != asio::error::eof) {
CINATRA_LOG_WARNING << "read http header error: " << ec.message();
}

cinatra_metric_conf::server_failed_req_inc();
close();
break;
}

if (cinatra_metric_conf::enable_metric) {
start = std::chrono::system_clock::now();
cinatra_metric_conf::server_total_req_inc();
}

const char *data_ptr = asio::buffer_cast<const char *>(head_buf_.data());
int head_len = parser_.parse_request(data_ptr, size, 0);
if (head_len <= 0) {
cinatra_metric_conf::server_failed_req_inc();
CINATRA_LOG_ERROR << "parse http header error";
close();
break;
Expand All @@ -133,6 +153,9 @@ class coro_http_connection
if (type != content_type::chunked && type != content_type::multipart) {
size_t body_len = parser_.body_len();
if (body_len == 0) {
if (cinatra_metric_conf::enable_metric) {
cinatra_metric_conf::server_total_recv_bytes_inc(head_len);
}
if (parser_.method() == "GET"sv) {
if (request_.is_upgrade()) {
#ifdef CINATRA_ENABLE_GZIP
Expand All @@ -152,6 +175,16 @@ class coro_http_connection
}
response_.set_delay(true);
}
else {
if (cinatra_metric_conf::enable_metric) {
mid = std::chrono::system_clock::now();
double count =
std::chrono::duration_cast<std::chrono::microseconds>(mid -
start)
.count();
cinatra_metric_conf::server_read_latency_observe(count);
}
}
}
}
else if (body_len <= head_buf_.size()) {
Expand All @@ -161,6 +194,7 @@ class coro_http_connection
memcpy(body_.data(), data_ptr, body_len);
head_buf_.consume(head_buf_.size());
}
cinatra_metric_conf::server_total_recv_bytes_inc(head_len + body_len);
}
else {
size_t part_size = head_buf_.size();
Expand All @@ -175,9 +209,22 @@ class coro_http_connection
size_to_read);
if (ec) {
CINATRA_LOG_ERROR << "async_read error: " << ec.message();
cinatra_metric_conf::server_failed_req_inc();
close();
break;
}
else {
if (cinatra_metric_conf::enable_metric) {
cinatra_metric_conf::server_total_recv_bytes_inc(head_len +
body_len);
mid = std::chrono::system_clock::now();
double count =
std::chrono::duration_cast<std::chrono::microseconds>(mid -
start)
.count();
cinatra_metric_conf::server_read_latency_observe(count);
}
}
}
}

Expand Down Expand Up @@ -362,6 +409,14 @@ class coro_http_connection
}
}

if (cinatra_metric_conf::enable_metric) {
mid = std::chrono::system_clock::now();
double count =
std::chrono::duration_cast<std::chrono::microseconds>(mid - start)
.count();
cinatra_metric_conf::server_req_latency_observe(count);
}

response_.clear();
request_.clear();
buffers_.clear();
Expand All @@ -375,18 +430,32 @@ class coro_http_connection
}

async_simple::coro::Lazy<bool> reply(bool need_to_bufffer = true) {
if (response_.status() >= status_type::bad_request) {
if (cinatra_metric_conf::enable_metric)
cinatra_metric_conf::server_failed_req_inc();
}
std::error_code ec;
size_t size;
if (multi_buf_) {
if (need_to_bufffer) {
response_.to_buffers(buffers_, chunk_size_str_);
}
int64_t send_size = 0;
for (auto &buf : buffers_) {
send_size += buf.size();
}
if (cinatra_metric_conf::enable_metric) {
cinatra_metric_conf::server_total_send_bytes_inc(send_size);
}
std::tie(ec, size) = co_await async_write(buffers_);
}
else {
if (need_to_bufffer) {
response_.build_resp_str(resp_str_);
}
if (cinatra_metric_conf::enable_metric) {
cinatra_metric_conf::server_total_send_bytes_inc(resp_str_.size());
}
std::tie(ec, size) = co_await async_write(asio::buffer(resp_str_));
}

Expand Down Expand Up @@ -794,7 +863,7 @@ class coro_http_connection
return last_rwtime_;
}

auto &get_executor() { return *executor_; }
auto get_executor() { return executor_; }

void close(bool need_cb = true) {
if (has_closed_) {
Expand Down Expand Up @@ -884,7 +953,7 @@ class coro_http_connection

private:
friend class multipart_reader_t<coro_http_connection>;
async_simple::Executor *executor_;
coro_io::ExecutorWrapper<> *executor_;
asio::ip::tcp::socket socket_;
coro_http_router &router_;
asio::streambuf head_buf_;
Expand Down
Loading

0 comments on commit 3e6d502

Please sign in to comment.