Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix summary #625

Merged
merged 5 commits into from
Aug 21, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 36 additions & 34 deletions include/cinatra/ylt/metric/summary.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ REFLECTION(json_summary_t, name, help, type, metrics);
#endif

struct block_t {
std::atomic<bool> is_coro_started_ = false;
std::atomic<bool> stop_ = false;
ylt::detail::moodycamel::ConcurrentQueue<double> sample_queue_;
std::shared_ptr<TimeWindowQuantiles> quantile_values_;
Expand Down Expand Up @@ -75,7 +76,7 @@ class summary_t : public static_metric {
block_->sample_queue_.enqueue(value);

bool expected = false;
if (is_coro_started_.compare_exchange_strong(expected, true)) {
if (block_->is_coro_started_.compare_exchange_strong(expected, true)) {
start(block_).via(excutor_->get_executor()).start([](auto &&) {
});
}
Expand Down Expand Up @@ -220,13 +221,13 @@ class summary_t : public static_metric {
}

if (block->sample_queue_.size_approx() == 0) {
is_coro_started_ = false;
block_->is_coro_started_ = false;
if (block->sample_queue_.size_approx() == 0) {
break;
}

bool expected = false;
if (!is_coro_started_.compare_exchange_strong(expected, true)) {
if (!block_->is_coro_started_.compare_exchange_strong(expected, true)) {
break;
}

Expand All @@ -243,7 +244,6 @@ class summary_t : public static_metric {
std::shared_ptr<block_t> block_;
static inline std::shared_ptr<coro_io::io_context_pool> excutor_ =
coro_io::create_io_context_pool(1);
std::atomic<bool> is_coro_started_ = false;
bool has_observe_ = false;
};

Expand All @@ -260,6 +260,10 @@ struct sum_and_count_t {

template <uint8_t N>
struct labels_block_t {
summary_t::Quantiles quantiles_; // readonly
std::chrono::milliseconds max_age_;
uint16_t age_buckets_;
std::atomic<bool> is_coro_started_ = false;
std::atomic<bool> stop_ = false;
ylt::detail::moodycamel::ConcurrentQueue<summary_label_sample<N>>
sample_queue_;
Expand All @@ -280,12 +284,16 @@ class basic_dynamic_summary : public dynamic_metric {
std::array<std::string, N> labels_name,
std::chrono::milliseconds max_age = std::chrono::seconds{60},
uint16_t age_buckets = 5)
: quantiles_{std::move(quantiles)},
dynamic_metric(MetricType::Summary, std::move(name), std::move(help),
std::move(labels_name)),
max_age_(max_age),
age_buckets_(age_buckets) {
init_block(labels_block_);
: dynamic_metric(MetricType::Summary, std::move(name), std::move(help),
std::move(labels_name)) {
labels_block_ = std::make_shared<labels_block_t<N>>();
labels_block_->quantiles_ = std::move(quantiles);
labels_block_->max_age_ = max_age;
labels_block_->age_buckets_ = age_buckets;

start(labels_block_).via(excutor_->get_executor()).start([](auto &&) {
});

g_user_metric_count++;
}

Expand All @@ -307,7 +315,8 @@ class basic_dynamic_summary : public dynamic_metric {
labels_block_->sample_queue_.enqueue({std::move(labels_value), value});

bool expected = false;
if (is_coro_started_.compare_exchange_strong(expected, true)) {
if (labels_block_->is_coro_started_.compare_exchange_strong(expected,
true)) {
start(labels_block_).via(excutor_->get_executor()).start([](auto &&) {
});
}
Expand All @@ -327,7 +336,7 @@ class basic_dynamic_summary : public dynamic_metric {
const std::array<std::string, N> &labels_value, double &sum,
uint64_t &count) {
std::vector<double> vec;
if (quantiles_.empty()) {
if (labels_block_->quantiles_.empty()) {
co_return std::vector<double>{};
}

Expand All @@ -339,7 +348,7 @@ class basic_dynamic_summary : public dynamic_metric {
}
sum = labels_block_->sum_and_count_[labels_value].sum;
count = labels_block_->sum_and_count_[labels_value].count;
for (const auto &quantile : quantiles_) {
for (const auto &quantile : labels_block_->quantiles_) {
vec.push_back(it->second->get(quantile.quantile));
}
},
Expand All @@ -359,13 +368,6 @@ class basic_dynamic_summary : public dynamic_metric {
}
#endif
private:
template <typename T>
void init_block(std::shared_ptr<T> &block) {
block = std::make_shared<T>();
start(block).via(excutor_->get_executor()).start([](auto &&) {
});
}

async_simple::coro::Lazy<void> start(
std::shared_ptr<labels_block_t<N>> label_block) {
summary_label_sample<N> sample;
Expand All @@ -376,8 +378,9 @@ class basic_dynamic_summary : public dynamic_metric {
auto &ptr = label_block->label_quantile_values_[sample.labels_value];

if (ptr == nullptr) {
ptr = std::make_shared<TimeWindowQuantiles>(quantiles_, max_age_,
age_buckets_);
ptr = std::make_shared<TimeWindowQuantiles>(
label_block->quantiles_, label_block->max_age_,
label_block->age_buckets_);
}

ptr->insert(sample.value);
Expand All @@ -393,13 +396,14 @@ class basic_dynamic_summary : public dynamic_metric {
co_await async_simple::coro::Yield{};

if (label_block->sample_queue_.size_approx() == 0) {
is_coro_started_ = false;
label_block->is_coro_started_ = false;
if (label_block->sample_queue_.size_approx() == 0) {
break;
}

bool expected = false;
if (!is_coro_started_.compare_exchange_strong(expected, true)) {
if (!label_block->is_coro_started_.compare_exchange_strong(expected,
true)) {
break;
}

Expand All @@ -412,7 +416,7 @@ class basic_dynamic_summary : public dynamic_metric {
}

async_simple::coro::Lazy<void> serialize_async_with_label(std::string &str) {
if (quantiles_.empty()) {
if (labels_block_->quantiles_.empty()) {
co_return;
}

Expand All @@ -432,13 +436,14 @@ class basic_dynamic_summary : public dynamic_metric {
double sum = 0;
uint64_t count = 0;
auto rates = co_await get_rates(labels_value, sum, count);
for (size_t i = 0; i < quantiles_.size(); i++) {
for (size_t i = 0; i < labels_block_->quantiles_.size(); i++) {
str.append(name_);
str.append("{");
build_label_string(str, labels_name_, labels_value);
str.append(",");
str.append("quantile=\"");
str.append(std::to_string(quantiles_[i].quantile)).append("\"} ");
str.append(std::to_string(labels_block_->quantiles_[i].quantile))
.append("\"} ");
str.append(std::to_string(rates[i])).append("\n");
}

Expand All @@ -459,7 +464,7 @@ class basic_dynamic_summary : public dynamic_metric {
#ifdef CINATRA_ENABLE_METRIC_JSON
async_simple::coro::Lazy<void> serialize_to_json_with_label_async(
std::string &str) {
if (quantiles_.empty()) {
if (labels_block_->quantiles_.empty()) {
co_return;
}

Expand All @@ -482,11 +487,12 @@ class basic_dynamic_summary : public dynamic_metric {
auto rates = co_await get_rates(labels_value, sum, count);
metric.count = count;
metric.sum = sum;
for (size_t i = 0; i < quantiles_.size(); i++) {
for (size_t i = 0; i < labels_block_->quantiles_.size(); i++) {
for (size_t i = 0; i < labels_value.size(); i++) {
metric.labels[labels_name_[i]] = labels_value[i];
}
metric.quantiles.emplace(quantiles_[i].quantile, rates[i]);
metric.quantiles.emplace(labels_block_->quantiles_[i].quantile,
rates[i]);
}

summary.metrics.push_back(std::move(metric));
Expand All @@ -495,13 +501,9 @@ class basic_dynamic_summary : public dynamic_metric {
}
#endif

Quantiles quantiles_; // readonly
std::shared_ptr<labels_block_t<N>> labels_block_;
static inline std::shared_ptr<coro_io::io_context_pool> excutor_ =
coro_io::create_io_context_pool(1);
std::chrono::milliseconds max_age_;
uint16_t age_buckets_;
std::atomic<bool> is_coro_started_ = false;
bool has_observe_ = false;
};

Expand Down
Loading