From ea9d10c23f1af29ea51679715a9c26a5d49db362 Mon Sep 17 00:00:00 2001 From: Lalit Kumar Bhasin Date: Tue, 25 Oct 2022 17:40:09 -0700 Subject: [PATCH] Fix:1674, Add Monotonic Property to Sum Aggregation, and unit tests for Up Down Counter (#1675) --- .../exporters/prometheus/exporter_utils.h | 3 +- exporters/prometheus/src/exporter_utils.cc | 33 +- .../metrics/aggregation/default_aggregation.h | 22 +- .../sdk/metrics/aggregation/sum_aggregation.h | 4 +- .../sdk/metrics/data/point_data.h | 3 +- .../metrics/aggregation/sum_aggregation.cc | 37 +- sdk/test/metrics/BUILD | 16 + sdk/test/metrics/CMakeLists.txt | 1 + sdk/test/metrics/aggregation_test.cc | 4 +- .../sync_metric_storage_counter_test.cc | 5 +- ...ync_metric_storage_up_down_counter_test.cc | 330 ++++++++++++++++++ 11 files changed, 429 insertions(+), 29 deletions(-) create mode 100644 sdk/test/metrics/sync_metric_storage_up_down_counter_test.cc diff --git a/exporters/prometheus/include/opentelemetry/exporters/prometheus/exporter_utils.h b/exporters/prometheus/include/opentelemetry/exporters/prometheus/exporter_utils.h index 09486a438e..9400cff40f 100644 --- a/exporters/prometheus/include/opentelemetry/exporters/prometheus/exporter_utils.h +++ b/exporters/prometheus/include/opentelemetry/exporters/prometheus/exporter_utils.h @@ -49,7 +49,8 @@ class PrometheusExporterUtils /** * Translate the OTel metric type to Prometheus metric type */ - static ::prometheus::MetricType TranslateType(opentelemetry::sdk::metrics::AggregationType kind); + static ::prometheus::MetricType TranslateType(opentelemetry::sdk::metrics::AggregationType kind, + bool is_monotonic = true); /** * Set metric data for: diff --git a/exporters/prometheus/src/exporter_utils.cc b/exporters/prometheus/src/exporter_utils.cc index e705cd9f17..fc2b102f24 100644 --- a/exporters/prometheus/src/exporter_utils.cc +++ b/exporters/prometheus/src/exporter_utils.cc @@ -54,8 +54,14 @@ std::vector PrometheusExporterUtils::TranslateT auto time = metric_data.start_ts.time_since_epoch(); for (const auto &point_data_attr : metric_data.point_data_attr_) { - auto kind = getAggregationType(point_data_attr.point_data); - const prometheus_client::MetricType type = TranslateType(kind); + auto kind = getAggregationType(point_data_attr.point_data); + bool is_monotonic = true; + if (kind == sdk::metrics::AggregationType::kSum) + { + is_monotonic = + nostd::get(point_data_attr.point_data).is_monotonic_; + } + const prometheus_client::MetricType type = TranslateType(kind, is_monotonic); metric_family.type = type; if (type == prometheus_client::MetricType::Histogram) // Histogram { @@ -85,6 +91,14 @@ std::vector PrometheusExporterUtils::TranslateT std::vector values{last_value_point_data.value_}; SetData(values, point_data_attr.attributes, type, time, &metric_family); } + else if (nostd::holds_alternative( + point_data_attr.point_data)) + { + auto sum_point_data = + nostd::get(point_data_attr.point_data); + std::vector values{sum_point_data.value_}; + SetData(values, point_data_attr.attributes, type, time, &metric_family); + } else { OTEL_INTERNAL_LOG_WARN( @@ -159,16 +173,27 @@ metric_sdk::AggregationType PrometheusExporterUtils::getAggregationType( * Translate the OTel metric type to Prometheus metric type */ prometheus_client::MetricType PrometheusExporterUtils::TranslateType( - metric_sdk::AggregationType kind) + metric_sdk::AggregationType kind, + bool is_monotonic) { switch (kind) { case metric_sdk::AggregationType::kSum: - return prometheus_client::MetricType::Counter; + if (!is_monotonic) + { + return prometheus_client::MetricType::Gauge; + } + else + { + return prometheus_client::MetricType::Counter; + } + break; case metric_sdk::AggregationType::kHistogram: return prometheus_client::MetricType::Histogram; + break; case metric_sdk::AggregationType::kLastValue: return prometheus_client::MetricType::Gauge; + break; default: return prometheus_client::MetricType::Untyped; } diff --git a/sdk/include/opentelemetry/sdk/metrics/aggregation/default_aggregation.h b/sdk/include/opentelemetry/sdk/metrics/aggregation/default_aggregation.h index 73eff4969b..001e54a2af 100644 --- a/sdk/include/opentelemetry/sdk/metrics/aggregation/default_aggregation.h +++ b/sdk/include/opentelemetry/sdk/metrics/aggregation/default_aggregation.h @@ -32,12 +32,15 @@ class DefaultAggregation switch (instrument_descriptor.type_) { case InstrumentType::kCounter: - case InstrumentType::kUpDownCounter: case InstrumentType::kObservableCounter: + return (instrument_descriptor.value_type_ == InstrumentValueType::kLong) + ? std::move(std::unique_ptr(new LongSumAggregation(true))) + : std::move(std::unique_ptr(new DoubleSumAggregation(true))); + case InstrumentType::kUpDownCounter: case InstrumentType::kObservableUpDownCounter: return (instrument_descriptor.value_type_ == InstrumentValueType::kLong) - ? std::move(std::unique_ptr(new LongSumAggregation())) - : std::move(std::unique_ptr(new DoubleSumAggregation())); + ? std::move(std::unique_ptr(new LongSumAggregation(false))) + : std::move(std::unique_ptr(new DoubleSumAggregation(false))); break; case InstrumentType::kHistogram: { if (instrument_descriptor.value_type_ == InstrumentValueType::kLong) @@ -91,16 +94,23 @@ class DefaultAggregation return std::unique_ptr(new DoubleLastValueAggregation()); } break; - case AggregationType::kSum: + case AggregationType::kSum: { + bool is_monotonic = true; + if (instrument_descriptor.type_ == InstrumentType::kUpDownCounter || + instrument_descriptor.type_ == InstrumentType::kObservableUpDownCounter) + { + is_monotonic = false; + } if (instrument_descriptor.value_type_ == InstrumentValueType::kLong) { - return std::unique_ptr(new LongSumAggregation()); + return std::unique_ptr(new LongSumAggregation(is_monotonic)); } else { - return std::unique_ptr(new DoubleSumAggregation()); + return std::unique_ptr(new DoubleSumAggregation(is_monotonic)); } break; + } default: return DefaultAggregation::CreateAggregation(instrument_descriptor, aggregation_config); } diff --git a/sdk/include/opentelemetry/sdk/metrics/aggregation/sum_aggregation.h b/sdk/include/opentelemetry/sdk/metrics/aggregation/sum_aggregation.h index 8f156e1cc1..bb0d637b60 100644 --- a/sdk/include/opentelemetry/sdk/metrics/aggregation/sum_aggregation.h +++ b/sdk/include/opentelemetry/sdk/metrics/aggregation/sum_aggregation.h @@ -17,7 +17,7 @@ namespace metrics class LongSumAggregation : public Aggregation { public: - LongSumAggregation(); + LongSumAggregation(bool is_monotonic); LongSumAggregation(SumPointData &&); LongSumAggregation(const SumPointData &); @@ -39,7 +39,7 @@ class LongSumAggregation : public Aggregation class DoubleSumAggregation : public Aggregation { public: - DoubleSumAggregation(); + DoubleSumAggregation(bool is_monotonic); DoubleSumAggregation(SumPointData &&); DoubleSumAggregation(const SumPointData &); diff --git a/sdk/include/opentelemetry/sdk/metrics/data/point_data.h b/sdk/include/opentelemetry/sdk/metrics/data/point_data.h index 5ee69ffba8..81878ff757 100644 --- a/sdk/include/opentelemetry/sdk/metrics/data/point_data.h +++ b/sdk/include/opentelemetry/sdk/metrics/data/point_data.h @@ -29,7 +29,8 @@ class SumPointData SumPointData &operator=(SumPointData &&) = default; SumPointData() = default; - ValueType value_ = {}; + ValueType value_ = {}; + bool is_monotonic_ = true; }; class LastValuePointData diff --git a/sdk/src/metrics/aggregation/sum_aggregation.cc b/sdk/src/metrics/aggregation/sum_aggregation.cc index 1c2772f540..eab53a49c2 100644 --- a/sdk/src/metrics/aggregation/sum_aggregation.cc +++ b/sdk/src/metrics/aggregation/sum_aggregation.cc @@ -3,6 +3,7 @@ #ifndef ENABLE_METRICS_PREVIEW # include "opentelemetry/sdk/metrics/aggregation/sum_aggregation.h" +# include "opentelemetry/sdk/common/global_log_handler.h" # include "opentelemetry/sdk/metrics/data/point_data.h" # include "opentelemetry/version.h" @@ -15,9 +16,10 @@ namespace sdk namespace metrics { -LongSumAggregation::LongSumAggregation() +LongSumAggregation::LongSumAggregation(bool is_monotonic) { - point_data_.value_ = (int64_t)0; + point_data_.value_ = (int64_t)0; + point_data_.is_monotonic_ = is_monotonic; } LongSumAggregation::LongSumAggregation(SumPointData &&data) : point_data_{std::move(data)} {} @@ -26,6 +28,14 @@ LongSumAggregation::LongSumAggregation(const SumPointData &data) : point_data_{d void LongSumAggregation::Aggregate(int64_t value, const PointAttributes & /* attributes */) noexcept { + if (point_data_.is_monotonic_ && value < 0) + { + OTEL_INTERNAL_LOG_WARN( + " LongSumAggregation::Aggregate Negative value ignored for Monotonic increasing " + "measurement. Value" + << value); + return; + } const std::lock_guard locked(lock_); point_data_.value_ = nostd::get(point_data_.value_) + value; } @@ -37,20 +47,19 @@ std::unique_ptr LongSumAggregation::Merge(const Aggregation &delta) nostd::get((static_cast(delta).ToPoint())) .value_) + nostd::get(nostd::get(ToPoint()).value_); - std::unique_ptr aggr(new LongSumAggregation()); + std::unique_ptr aggr(new LongSumAggregation(point_data_.is_monotonic_)); static_cast(aggr.get())->point_data_.value_ = merge_value; return aggr; } std::unique_ptr LongSumAggregation::Diff(const Aggregation &next) const noexcept { - int64_t diff_value = nostd::get( nostd::get((static_cast(next).ToPoint())) .value_) - nostd::get(nostd::get(ToPoint()).value_); - std::unique_ptr aggr(new LongSumAggregation()); + std::unique_ptr aggr(new LongSumAggregation(point_data_.is_monotonic_)); static_cast(aggr.get())->point_data_.value_ = diff_value; return aggr; } @@ -61,9 +70,10 @@ PointType LongSumAggregation::ToPoint() const noexcept return point_data_; } -DoubleSumAggregation::DoubleSumAggregation() +DoubleSumAggregation::DoubleSumAggregation(bool is_monotonic) { - point_data_.value_ = 0.0; + point_data_.value_ = 0.0; + point_data_.is_monotonic_ = is_monotonic; } DoubleSumAggregation::DoubleSumAggregation(SumPointData &&data) : point_data_(std::move(data)) {} @@ -73,6 +83,14 @@ DoubleSumAggregation::DoubleSumAggregation(const SumPointData &data) : point_dat void DoubleSumAggregation::Aggregate(double value, const PointAttributes & /* attributes */) noexcept { + if (point_data_.is_monotonic_ && value < 0) + { + OTEL_INTERNAL_LOG_WARN( + " DoubleSumAggregation::Aggregate Negative value ignored for Monotonic increasing " + "measurement. Value" + << value); + return; + } const std::lock_guard locked(lock_); point_data_.value_ = nostd::get(point_data_.value_) + value; } @@ -84,20 +102,19 @@ std::unique_ptr DoubleSumAggregation::Merge(const Aggregation &delt nostd::get((static_cast(delta).ToPoint())) .value_) + nostd::get(nostd::get(ToPoint()).value_); - std::unique_ptr aggr(new DoubleSumAggregation()); + std::unique_ptr aggr(new DoubleSumAggregation(point_data_.is_monotonic_)); static_cast(aggr.get())->point_data_.value_ = merge_value; return aggr; } std::unique_ptr DoubleSumAggregation::Diff(const Aggregation &next) const noexcept { - double diff_value = nostd::get( nostd::get((static_cast(next).ToPoint())) .value_) - nostd::get(nostd::get(ToPoint()).value_); - std::unique_ptr aggr(new DoubleSumAggregation()); + std::unique_ptr aggr(new DoubleSumAggregation(point_data_.is_monotonic_)); static_cast(aggr.get())->point_data_.value_ = diff_value; return aggr; } diff --git a/sdk/test/metrics/BUILD b/sdk/test/metrics/BUILD index 5d3076b8be..333670dc6f 100644 --- a/sdk/test/metrics/BUILD +++ b/sdk/test/metrics/BUILD @@ -95,6 +95,22 @@ cc_test( ], ) +cc_test( + name = "sync_metric_storage_up_down_counter_test", + srcs = [ + "sync_metric_storage_up_down_counter_test.cc", + ], + tags = [ + "metrics", + "test", + ], + deps = [ + "//sdk/src/metrics", + "//sdk/src/resource", + "@com_google_googletest//:gtest_main", + ], +) + cc_test( name = "sync_metric_storage_histogram_test", srcs = [ diff --git a/sdk/test/metrics/CMakeLists.txt b/sdk/test/metrics/CMakeLists.txt index 75a3a0e559..2ff8b5f30a 100644 --- a/sdk/test/metrics/CMakeLists.txt +++ b/sdk/test/metrics/CMakeLists.txt @@ -8,6 +8,7 @@ foreach( attributes_hashmap_test sync_metric_storage_counter_test sync_metric_storage_histogram_test + sync_metric_storage_up_down_counter_test async_metric_storage_test multi_metric_storage_test observer_result_test diff --git a/sdk/test/metrics/aggregation_test.cc b/sdk/test/metrics/aggregation_test.cc index 33f17871aa..6e4121931c 100644 --- a/sdk/test/metrics/aggregation_test.cc +++ b/sdk/test/metrics/aggregation_test.cc @@ -14,7 +14,7 @@ using namespace opentelemetry::sdk::metrics; namespace nostd = opentelemetry::nostd; TEST(Aggregation, LongSumAggregation) { - LongSumAggregation aggr; + LongSumAggregation aggr(true); auto data = aggr.ToPoint(); ASSERT_TRUE(nostd::holds_alternative(data)); auto sum_data = nostd::get(data); @@ -28,7 +28,7 @@ TEST(Aggregation, LongSumAggregation) TEST(Aggregation, DoubleSumAggregation) { - DoubleSumAggregation aggr; + DoubleSumAggregation aggr(true); auto data = aggr.ToPoint(); ASSERT_TRUE(nostd::holds_alternative(data)); auto sum_data = nostd::get(data); diff --git a/sdk/test/metrics/sync_metric_storage_counter_test.cc b/sdk/test/metrics/sync_metric_storage_counter_test.cc index f182d36f4f..7118e66529 100644 --- a/sdk/test/metrics/sync_metric_storage_counter_test.cc +++ b/sdk/test/metrics/sync_metric_storage_counter_test.cc @@ -38,7 +38,7 @@ class MockCollectorHandle : public CollectorHandle class WritableMetricStorageTestFixture : public ::testing::TestWithParam {}; -TEST_P(WritableMetricStorageTestFixture, LongSumAggregation) +TEST_P(WritableMetricStorageTestFixture, LongCounterSumAggregation) { AggregationTemporality temporality = GetParam(); auto sdk_start_ts = std::chrono::system_clock::now(); @@ -106,7 +106,6 @@ TEST_P(WritableMetricStorageTestFixture, LongSumAggregation) expected_total_get_requests = 0; expected_total_put_requests = 0; } - // collect one more time. collection_ts = std::chrono::system_clock::now(); count_attributes = 0; @@ -172,7 +171,7 @@ INSTANTIATE_TEST_SUITE_P(WritableMetricStorageTestLong, ::testing::Values(AggregationTemporality::kCumulative, AggregationTemporality::kDelta)); -TEST_P(WritableMetricStorageTestFixture, DoubleSumAggregation) +TEST_P(WritableMetricStorageTestFixture, DoubleCounterSumAggregation) { AggregationTemporality temporality = GetParam(); auto sdk_start_ts = std::chrono::system_clock::now(); diff --git a/sdk/test/metrics/sync_metric_storage_up_down_counter_test.cc b/sdk/test/metrics/sync_metric_storage_up_down_counter_test.cc new file mode 100644 index 0000000000..77841d7330 --- /dev/null +++ b/sdk/test/metrics/sync_metric_storage_up_down_counter_test.cc @@ -0,0 +1,330 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +#ifndef ENABLE_METRICS_PREVIEW +# include "opentelemetry/common/key_value_iterable_view.h" +# include "opentelemetry/nostd/shared_ptr.h" +# include "opentelemetry/sdk/metrics/exemplar/histogram_exemplar_reservoir.h" +# include "opentelemetry/sdk/metrics/instruments.h" +# include "opentelemetry/sdk/metrics/state/sync_metric_storage.h" +# include "opentelemetry/sdk/metrics/view/attributes_processor.h" + +# include +# include +# include + +using namespace opentelemetry::sdk::metrics; +using namespace opentelemetry::common; +using M = std::map; +namespace nostd = opentelemetry::nostd; + +class MockCollectorHandle : public CollectorHandle +{ +public: + MockCollectorHandle(AggregationTemporality temp) : temporality(temp) {} + + ~MockCollectorHandle() override = default; + + AggregationTemporality GetAggregationTemporality( + InstrumentType /* instrument_type */) noexcept override + { + return temporality; + } + +private: + AggregationTemporality temporality; +}; + +class WritableMetricStorageTestFixture : public ::testing::TestWithParam +{}; + +TEST_P(WritableMetricStorageTestFixture, LongUpDownCounterSumAggregation) +{ + AggregationTemporality temporality = GetParam(); + auto sdk_start_ts = std::chrono::system_clock::now(); + int64_t expected_total_active_get_requests = 0; + int64_t expected_total_active_put_requests = 0; + InstrumentDescriptor instr_desc = {"name", "desc", "1unit", InstrumentType::kUpDownCounter, + InstrumentValueType::kLong}; + std::map attributes_get = {{"RequestType", "GET"}}; + std::map attributes_put = {{"RequestType", "PUT"}}; + + std::unique_ptr default_attributes_processor{ + new DefaultAttributesProcessor{}}; + opentelemetry::sdk::metrics::SyncMetricStorage storage( + instr_desc, AggregationType::kSum, default_attributes_processor.get(), + ExemplarReservoir::GetNoExemplarReservoir(), nullptr); + + int64_t val1 = 10, val2 = 30, val3 = -5, val4 = -10; + storage.RecordLong(val1, KeyValueIterableView>(attributes_get), + opentelemetry::context::Context{}); + expected_total_active_get_requests += val1; + + storage.RecordLong(val2, KeyValueIterableView>(attributes_put), + opentelemetry::context::Context{}); + expected_total_active_put_requests += val2; + + storage.RecordLong(val3, KeyValueIterableView>(attributes_get), + opentelemetry::context::Context{}); + expected_total_active_get_requests += val3; + + storage.RecordLong(val4, KeyValueIterableView>(attributes_put), + opentelemetry::context::Context{}); + expected_total_active_put_requests += val4; + + std::shared_ptr collector(new MockCollectorHandle(temporality)); + std::vector> collectors; + collectors.push_back(collector); + + // Some computation here + auto collection_ts = std::chrono::system_clock::now(); + size_t count_attributes = 0; + storage.Collect(collector.get(), collectors, sdk_start_ts, collection_ts, + [&](const MetricData data) { + for (auto data_attr : data.point_data_attr_) + { + auto sum_data = opentelemetry::nostd::get(data_attr.point_data); + if (opentelemetry::nostd::get( + data_attr.attributes.find("RequestType")->second) == "GET") + { + EXPECT_EQ(opentelemetry::nostd::get(sum_data.value_), + expected_total_active_get_requests); + count_attributes++; + } + else if (opentelemetry::nostd::get( + data_attr.attributes.find("RequestType")->second) == "PUT") + { + EXPECT_EQ(opentelemetry::nostd::get(sum_data.value_), + expected_total_active_put_requests); + count_attributes++; + } + } + return true; + }); + EXPECT_EQ(count_attributes, 2); // GET and PUT + // In case of delta temporarily, subsequent collection would contain new data points, so resetting + // the counts + if (temporality == AggregationTemporality::kDelta) + { + expected_total_active_get_requests = 0; + expected_total_active_put_requests = 0; + } + + // collect one more time. + collection_ts = std::chrono::system_clock::now(); + count_attributes = 0; + storage.Collect(collector.get(), collectors, sdk_start_ts, collection_ts, + [&](const MetricData data) { + for (auto data_attr : data.point_data_attr_) + { + auto sum_data = opentelemetry::nostd::get(data_attr.point_data); + if (opentelemetry::nostd::get( + data_attr.attributes.find("RequestType")->second) == "GET") + { + count_attributes++; + EXPECT_EQ(opentelemetry::nostd::get(sum_data.value_), + expected_total_active_get_requests); + } + else if (opentelemetry::nostd::get( + data_attr.attributes.find("RequestType")->second) == "PUT") + { + count_attributes++; + EXPECT_EQ(opentelemetry::nostd::get(sum_data.value_), + expected_total_active_put_requests); + } + } + return true; + }); + if (temporality == AggregationTemporality::kCumulative) + { + EXPECT_EQ(count_attributes, 2); // GET AND PUT + } + + val1 = 50; + val2 = 40; + storage.RecordLong(val1, KeyValueIterableView>(attributes_get), + opentelemetry::context::Context{}); + expected_total_active_get_requests += val1; + storage.RecordLong(val2, KeyValueIterableView>(attributes_put), + opentelemetry::context::Context{}); + expected_total_active_put_requests += val2; + + collection_ts = std::chrono::system_clock::now(); + count_attributes = 0; + storage.Collect(collector.get(), collectors, sdk_start_ts, collection_ts, + [&](const MetricData data) { + for (auto data_attr : data.point_data_attr_) + { + auto sum_data = opentelemetry::nostd::get(data_attr.point_data); + if (opentelemetry::nostd::get( + data_attr.attributes.find("RequestType")->second) == "GET") + { + EXPECT_EQ(opentelemetry::nostd::get(sum_data.value_), + expected_total_active_get_requests); + count_attributes++; + } + else if (opentelemetry::nostd::get( + data_attr.attributes.find("RequestType")->second) == "PUT") + { + EXPECT_EQ(opentelemetry::nostd::get(sum_data.value_), + expected_total_active_put_requests); + count_attributes++; + } + } + return true; + }); + EXPECT_EQ(count_attributes, 2); // GET and PUT +} + +INSTANTIATE_TEST_SUITE_P(WritableMetricStorageTestLong, + WritableMetricStorageTestFixture, + ::testing::Values(AggregationTemporality::kCumulative, + AggregationTemporality::kDelta)); + +TEST_P(WritableMetricStorageTestFixture, DoubleUpDownCounterSumAggregation) +{ + AggregationTemporality temporality = GetParam(); + auto sdk_start_ts = std::chrono::system_clock::now(); + double expected_total_active_get_requests = 0; + double expected_total_active_put_requests = 0; + InstrumentDescriptor instr_desc = {"name", "desc", "1unit", InstrumentType::kUpDownCounter, + InstrumentValueType::kDouble}; + std::map attributes_get = {{"RequestType", "GET"}}; + std::map attributes_put = {{"RequestType", "PUT"}}; + + std::unique_ptr default_attributes_processor{ + new DefaultAttributesProcessor{}}; + opentelemetry::sdk::metrics::SyncMetricStorage storage( + instr_desc, AggregationType::kSum, default_attributes_processor.get(), + ExemplarReservoir::GetNoExemplarReservoir(), nullptr); + + storage.RecordDouble(10.0, + KeyValueIterableView>(attributes_get), + opentelemetry::context::Context{}); + expected_total_active_get_requests += 10; + + storage.RecordDouble(30.0, + KeyValueIterableView>(attributes_put), + opentelemetry::context::Context{}); + expected_total_active_put_requests += 30; + + storage.RecordDouble(-5.0, + KeyValueIterableView>(attributes_get), + opentelemetry::context::Context{}); + expected_total_active_get_requests -= 5; + + storage.RecordDouble(-10.0, + KeyValueIterableView>(attributes_put), + opentelemetry::context::Context{}); + expected_total_active_put_requests -= 10; + + std::shared_ptr collector(new MockCollectorHandle(temporality)); + std::vector> collectors; + collectors.push_back(collector); + + // Some computation here + auto collection_ts = std::chrono::system_clock::now(); + size_t count_attributes = 0; + storage.Collect(collector.get(), collectors, sdk_start_ts, collection_ts, + [&](const MetricData data) { + for (auto data_attr : data.point_data_attr_) + { + auto sum_data = opentelemetry::nostd::get(data_attr.point_data); + if (opentelemetry::nostd::get( + data_attr.attributes.find("RequestType")->second) == "GET") + { + EXPECT_EQ(opentelemetry::nostd::get(sum_data.value_), + expected_total_active_get_requests); + count_attributes++; + } + else if (opentelemetry::nostd::get( + data_attr.attributes.find("RequestType")->second) == "PUT") + { + EXPECT_EQ(opentelemetry::nostd::get(sum_data.value_), + expected_total_active_put_requests); + count_attributes++; + } + } + return true; + }); + EXPECT_EQ(count_attributes, 2); // GET and PUT + + // In case of delta temporarily, subsequent collection would contain new data points, so resetting + // the counts + if (temporality == AggregationTemporality::kDelta) + { + expected_total_active_get_requests = 0; + expected_total_active_put_requests = 0; + } + + // collect one more time. + collection_ts = std::chrono::system_clock::now(); + count_attributes = 0; + storage.Collect(collector.get(), collectors, sdk_start_ts, collection_ts, + [&](const MetricData data) { + for (auto data_attr : data.point_data_attr_) + { + auto sum_data = opentelemetry::nostd::get(data_attr.point_data); + if (opentelemetry::nostd::get( + data_attr.attributes.find("RequestType")->second) == "GET") + { + count_attributes++; + EXPECT_EQ(opentelemetry::nostd::get(sum_data.value_), + expected_total_active_get_requests); + } + else if (opentelemetry::nostd::get( + data_attr.attributes.find("RequestType")->second) == "PUT") + { + count_attributes++; + EXPECT_EQ(opentelemetry::nostd::get(sum_data.value_), + expected_total_active_put_requests); + } + } + return true; + }); + if (temporality == AggregationTemporality::kCumulative) + { + EXPECT_EQ(count_attributes, 2); // GET AND PUT + } + + storage.RecordDouble(50.0, + KeyValueIterableView>(attributes_get), + opentelemetry::context::Context{}); + expected_total_active_get_requests += 50; + storage.RecordDouble(40.0, + KeyValueIterableView>(attributes_put), + opentelemetry::context::Context{}); + expected_total_active_put_requests += 40; + + collection_ts = std::chrono::system_clock::now(); + count_attributes = 0; + storage.Collect(collector.get(), collectors, sdk_start_ts, collection_ts, + [&](const MetricData data) { + for (auto data_attr : data.point_data_attr_) + { + auto sum_data = opentelemetry::nostd::get(data_attr.point_data); + if (opentelemetry::nostd::get( + data_attr.attributes.find("RequestType")->second) == "GET") + { + EXPECT_EQ(opentelemetry::nostd::get(sum_data.value_), + expected_total_active_get_requests); + count_attributes++; + } + else if (opentelemetry::nostd::get( + data_attr.attributes.find("RequestType")->second) == "PUT") + { + EXPECT_EQ(opentelemetry::nostd::get(sum_data.value_), + expected_total_active_put_requests); + count_attributes++; + } + } + return true; + }); + EXPECT_EQ(count_attributes, 2); // GET and PUT +} +INSTANTIATE_TEST_SUITE_P(WritableMetricStorageTestDouble, + WritableMetricStorageTestFixture, + ::testing::Values(AggregationTemporality::kCumulative, + AggregationTemporality::kDelta)); + +#endif