Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve summary #677

Merged
merged 2 commits into from
Dec 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 8 additions & 6 deletions include/cinatra/ylt/metric/summary.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ YLT_REFL(json_summary_t, name, help, type, labels_name, quantiles_key, metrics);
class summary_t : public static_metric {
public:
summary_t(std::string name, std::string help, std::vector<double> quantiles,
std::chrono::seconds max_age = std::chrono::seconds{36000})
std::chrono::seconds max_age = std::chrono::seconds{0})
: static_metric(MetricType::Summary, std::move(name), std::move(help)),
quantiles_(std::move(quantiles)),
impl_(quantiles_,
Expand All @@ -48,7 +48,7 @@ class summary_t : public static_metric {

summary_t(std::string name, std::string help, std::vector<double> quantiles,
std::map<std::string, std::string> static_labels,
std::chrono::seconds max_age = std::chrono::seconds{36000})
std::chrono::seconds max_age = std::chrono::seconds{60})
: static_metric(MetricType::Summary, std::move(name), std::move(help),
std::move(static_labels)),
quantiles_(std::move(quantiles)),
Expand Down Expand Up @@ -133,20 +133,22 @@ class summary_t : public static_metric {

private:
std::vector<double> quantiles_;
ylt::metric::detail::summary_impl<> impl_;
ylt::metric::detail::summary_impl<uint64_t> impl_;
};

template <size_t N>
class basic_dynamic_summary
: public dynamic_metric_impl<ylt::metric::detail::summary_impl<>, N> {
: public dynamic_metric_impl<ylt::metric::detail::summary_impl<uint32_t>,
N> {
private:
using Base = dynamic_metric_impl<ylt::metric::detail::summary_impl<>, N>;
using Base =
dynamic_metric_impl<ylt::metric::detail::summary_impl<uint32_t>, N>;

public:
basic_dynamic_summary(
std::string name, std::string help, std::vector<double> quantiles,
std::array<std::string, N> labels_name,
std::chrono::milliseconds max_age = std::chrono::seconds{36000})
std::chrono::milliseconds max_age = std::chrono::seconds{60})
: Base(MetricType::Summary, std::move(name), std::move(help),
std::move(labels_name)),
quantiles_(std::move(quantiles)),
Expand Down
63 changes: 36 additions & 27 deletions include/cinatra/ylt/metric/summary_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,15 @@
#include <iterator>
#include <limits>
#include <memory>
#include <type_traits>
#include <vector>

namespace ylt::metric::detail {

template <std::size_t frac_bit = 6>
template <typename uint_type, std::size_t frac_bit = 6>
class summary_impl {
static_assert(sizeof(uint_type) >= 4);
static_assert(std::is_unsigned_v<uint_type>);
constexpr static uint32_t decode_impl(uint16_t float16_value) {
float16_value <<= (8 - frac_bit);
uint32_t sign = float16_value >> 15;
Expand Down Expand Up @@ -57,7 +60,8 @@ class summary_impl {
static constexpr float float16_max = (1ull << 63) * 2.0f; // 2^64

static uint16_t encode(float flt) {
unsigned int& fltInt32 = *(unsigned int*)&flt;
static_assert(sizeof(float) == 4);
uint32_t& fltInt32 = *(uint32_t*)&flt;
if (std::abs(flt) >= float16_max || std::isnan(flt)) {
flt = (fltInt32 & 0x8000'0000) ? (-float16_max) : (float16_max);
}
Expand Down Expand Up @@ -88,9 +92,9 @@ class summary_impl {

struct data_t {
static constexpr size_t piece_size = bucket_size / piece_cnt;
using piece_t = std::array<std::atomic<uint32_t>, piece_size>;
using piece_t = std::array<std::atomic<uint_type>, piece_size>;

std::atomic<uint32_t>& operator[](std::size_t index) {
std::atomic<uint_type>& operator[](std::size_t index) {
piece_t* piece = arr[index / piece_size];
if (piece == nullptr) {
auto ptr = new piece_t{};
Expand Down Expand Up @@ -122,7 +126,7 @@ class summary_impl {
}
template <bool inc_order>
void stat_impl(uint64_t& count,
std::vector<std::pair<int16_t, uint32_t>>& result, int i) {
std::vector<std::pair<int16_t, uint_type>>& result, int i) {
auto piece = arr[i].load(std::memory_order_relaxed);
if (piece) {
if constexpr (inc_order) {
Expand All @@ -146,7 +150,7 @@ class summary_impl {
}
}
void stat(uint64_t& count,
std::vector<std::pair<int16_t, uint32_t>>& result) {
std::vector<std::pair<int16_t, uint_type>>& result) {
for (int i = piece_cnt - 1; i >= piece_cnt / 2; --i) {
stat_impl<false>(count, result, i);
}
Expand Down Expand Up @@ -182,36 +186,38 @@ class summary_impl {
static inline const unsigned long ms_count =
std::chrono::steady_clock::duration{std::chrono::milliseconds{1}}.count();

constexpr static unsigned int near_uint32_max = 4290000000U;
constexpr static uint32_t near_uint32_max = 4290000000U;

void increase(data_t& arr, uint16_t pos) {
if (arr[pos].fetch_add(1, std::memory_order::relaxed) >
near_uint32_max) /*no overflow*/ [[likely]] {
arr[pos].fetch_sub(1, std::memory_order::relaxed);
int upper = (pos < bucket_size / 2) ? (bucket_size / 2) : (bucket_size);
int lower = (pos < bucket_size / 2) ? (0) : (bucket_size / 2);
for (int delta = 1, lim = (std::max)(upper - pos, pos - lower + 1);
delta < lim; ++delta) {
if (pos + delta < upper) {
if (arr[pos + delta].fetch_add(1, std::memory_order::relaxed) <=
near_uint32_max) {
break;
auto res = arr[pos].fetch_add(1, std::memory_order::relaxed);
if constexpr (std::is_same_v<uint_type, uint32_t>) {
if (res > near_uint32_max) /*no overflow*/ [[likely]] {
arr[pos].fetch_sub(1, std::memory_order::relaxed);
int upper = (pos < bucket_size / 2) ? (bucket_size / 2) : (bucket_size);
int lower = (pos < bucket_size / 2) ? (0) : (bucket_size / 2);
for (int delta = 1, lim = (std::max)(upper - pos, pos - lower + 1);
delta < lim; ++delta) {
if (pos + delta < upper) {
if (arr[pos + delta].fetch_add(1, std::memory_order::relaxed) <=
near_uint32_max) {
break;
}
arr[pos + delta].fetch_sub(1, std::memory_order::relaxed);
}
arr[pos + delta].fetch_sub(1, std::memory_order::relaxed);
}
if (pos - delta >= lower) {
if (arr[pos - delta].fetch_add(1, std::memory_order::relaxed) <=
near_uint32_max) {
break;
if (pos - delta >= lower) {
if (arr[pos - delta].fetch_add(1, std::memory_order::relaxed) <=
near_uint32_max) {
break;
}
arr[pos - delta].fetch_sub(1, std::memory_order::relaxed);
}
arr[pos - delta].fetch_sub(1, std::memory_order::relaxed);
}
}
}
}

struct data_copy_t {
std::vector<std::pair<int16_t, uint32_t>> arr[2];
std::vector<std::pair<int16_t, uint_type>> arr[2];
int index[2] = {}, smaller_one;
void init() {
if (arr[0][0] <= arr[1][0]) {
Expand All @@ -231,7 +237,7 @@ class summary_impl {
}
}
int16_t value() { return arr[smaller_one][index[smaller_one]].first; }
uint32_t count() { return arr[smaller_one][index[smaller_one]].second; }
uint_type count() { return arr[smaller_one][index[smaller_one]].second; }
};

public:
Expand Down Expand Up @@ -304,6 +310,9 @@ class summary_impl {
e = 1;
}
auto target_count = std::min<double>(e * count, count);
if (e == 0) {
target_count = std::min(uint64_t{1}, count);
}
while (true) {
if (target_count <= count_now) [[unlikely]] {
result.push_back(v);
Expand Down
15 changes: 10 additions & 5 deletions tests/test_cinatra_websocket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -277,18 +277,24 @@ TEST_CASE("test send after server stop") {

TEST_CASE("test read write in different threads") {
cinatra::coro_http_server server(1, 8090);
size_t count = 0;
std::promise<void> promise;
server.set_http_handler<cinatra::GET>(
"/",
[](coro_http_request &req,
coro_http_response &resp) -> async_simple::coro::Lazy<void> {
[&](coro_http_request &req,
coro_http_response &resp) -> async_simple::coro::Lazy<void> {
CHECK(req.get_content_type() == content_type::websocket);
websocket_result result{};
while (true) {
result = co_await req.get_conn()->read_websocket();
if (result.ec) {
break;
}

count++;
if (count == 100) {
promise.set_value();
break;
}
auto ec = co_await req.get_conn()->write_websocket(result.data);
if (ec) {
break;
Expand Down Expand Up @@ -326,8 +332,7 @@ TEST_CASE("test read write in different threads") {

async_simple::coro::syncAwait(lazy());

std::this_thread::sleep_for(std::chrono::milliseconds(300));

promise.get_future().wait_for(std::chrono::seconds(2));
server.stop();
}

Expand Down
24 changes: 21 additions & 3 deletions tests/test_metric.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -950,15 +950,15 @@ TEST_CASE("test summary with illegal quantities") {
CHECK(str.find("test_summary_sum") != std::string::npos);
CHECK(str.find("test_summary{quantile=\"") != std::string::npos);
CHECK(result[0] < 0);
CHECK(result[1] < 0);
CHECK(result[1] == 0);
CHECK(result[result.size() - 1] > result[result.size() - 2]);

#ifdef CINATRA_ENABLE_METRIC_JSON
std::string str_json;
summary.serialize_to_json(str_json);
std::cout << str_json << "\n";
std::cout << str_json.size() << std::endl;
CHECK(str_json.size() == 233);
CHECK(str_json.size() == 222);
#endif
}

Expand Down Expand Up @@ -994,7 +994,7 @@ TEST_CASE("test summary with many quantities") {
summary.serialize_to_json(str_json);
std::cout << str_json << "\n";
std::cout << str_json.size() << std::endl;
CHECK(str_json.size() == 8868);
CHECK(str_json.size() == 8857);
#endif
}

Expand Down Expand Up @@ -2023,6 +2023,24 @@ TEST_CASE("test remove label value") {
CHECK(!counter.has_label_value(std::vector<std::string>{}));
}

TEST_CASE("test static summary with 0 and 1 quantiles") {
{
ylt::metric::summary_t s("test", "help", {0, 1});
for (uint64_t i = 0; i < 100ull; ++i) {
s.observe(1);
}
auto result = s.get_rates();
CHECK(result[0] == 1);
CHECK(result[1] == 1);
}
{
ylt::metric::summary_t s("test", "help", {0, 1});
auto result = s.get_rates();
CHECK(result[0] == 0);
CHECK(result[1] == 0);
}
}

DOCTEST_MSVC_SUPPRESS_WARNING_WITH_PUSH(4007)
int main(int argc, char** argv) { return doctest::Context(argc, argv).run(); }
DOCTEST_MSVC_SUPPRESS_WARNING_POP
Loading