diff --git a/sdk/include/opentelemetry/sdk/metrics/state/sync_metric_storage.h b/sdk/include/opentelemetry/sdk/metrics/state/sync_metric_storage.h index c16f33ede2..278f7dbe3f 100644 --- a/sdk/include/opentelemetry/sdk/metrics/state/sync_metric_storage.h +++ b/sdk/include/opentelemetry/sdk/metrics/state/sync_metric_storage.h @@ -5,16 +5,14 @@ #ifndef ENABLE_METRICS_PREVIEW # include "opentelemetry/common/key_value_iterable_view.h" # include "opentelemetry/sdk/common/attributemap_hash.h" -# include "opentelemetry/sdk/instrumentationlibrary/instrumentation_library.h" # include "opentelemetry/sdk/metrics/aggregation/default_aggregation.h" # include "opentelemetry/sdk/metrics/exemplar/reservoir.h" # include "opentelemetry/sdk/metrics/state/attributes_hashmap.h" # include "opentelemetry/sdk/metrics/state/metric_collector.h" # include "opentelemetry/sdk/metrics/state/metric_storage.h" +# include "opentelemetry/sdk/metrics/state/temporal_metric_storage.h" # include "opentelemetry/sdk/metrics/view/attributes_processor.h" -# include "opentelemetry/sdk/metrics/view/view.h" -# include "opentelemetry/sdk/resource/resource.h" # include # include @@ -24,13 +22,6 @@ namespace sdk { namespace metrics { - -struct LastReportedMetrics -{ - std::unique_ptr attributes_map; - opentelemetry::common::SystemTimestamp collection_ts; -}; - class SyncMetricStorage : public MetricStorage, public WritableMetricStorage { @@ -43,7 +34,9 @@ class SyncMetricStorage : public MetricStorage, public WritableMetricStorage aggregation_type_{aggregation_type}, attributes_hashmap_(new AttributesHashMap()), attributes_processor_{attributes_processor}, - exemplar_reservoir_(exemplar_reservoir) + exemplar_reservoir_(exemplar_reservoir), + temporal_metric_storage_(instrument_descriptor) + { create_default_aggregation_ = [&]() -> std::unique_ptr { return std::move( @@ -114,14 +107,10 @@ class SyncMetricStorage : public MetricStorage, public WritableMetricStorage // hashmap to maintain the metrics for delta collection (i.e, collection since last Collect call) std::unique_ptr attributes_hashmap_; - // unreported metrics stash for all the collectors - std::unordered_map>> - unreported_metrics_; - // last reported metrics stash for all the collectors. - std::unordered_map last_reported_metrics_; const AttributesProcessor *attributes_processor_; std::function()> create_default_aggregation_; nostd::shared_ptr exemplar_reservoir_; + TemporalMetricStorage temporal_metric_storage_; }; } // namespace metrics diff --git a/sdk/src/metrics/state/sync_metric_storage.cc b/sdk/src/metrics/state/sync_metric_storage.cc index f42de82b4b..8c79f9d7ca 100644 --- a/sdk/src/metrics/state/sync_metric_storage.cc +++ b/sdk/src/metrics/state/sync_metric_storage.cc @@ -25,104 +25,9 @@ bool SyncMetricStorage::Collect(CollectorHandle *collector, // recordings std::shared_ptr delta_metrics = std::move(attributes_hashmap_); attributes_hashmap_.reset(new AttributesHashMap); - for (auto &col : collectors) - { - unreported_metrics_[col.get()].push_back(delta_metrics); - } - // Get the unreported metrics for the `collector` from `unreported metrics stash` - // since last collection, this will also cleanup the unreported metrics for `collector` - // from the stash. - auto present = unreported_metrics_.find(collector); - if (present == unreported_metrics_.end()) - { - // no unreported metrics for the collector, return. - return true; - } - auto unreported_list = std::move(present->second); - - // Iterate over the unreporter metrics for `collector` and store result in `merged_metrics` - std::unique_ptr merged_metrics(new AttributesHashMap); - for (auto &agg_hashmap : unreported_list) - { - agg_hashmap->GetAllEnteries([&merged_metrics, this](const MetricAttributes &attributes, - Aggregation &aggregation) { - auto agg = merged_metrics->Get(attributes); - if (agg) - { - merged_metrics->Set(attributes, std::move(agg->Merge(aggregation))); - } - else - { - merged_metrics->Set( - attributes, - std::move( - DefaultAggregation::CreateAggregation(instrument_descriptor_)->Merge(aggregation))); - merged_metrics->GetAllEnteries( - [](const MetricAttributes &attr, Aggregation &aggr) { return true; }); - } - return true; - }); - } - // Get the last reported metrics for the `collector` from `last reported metrics` stash - // - If the aggregation_temporarily for the collector is cumulative - // - Merge the last reported metrics with unreported metrics (which is in merged_metrics), - // Final result of merge would be in merged_metrics. - // - Move the final merge to the `last reported metrics` stash. - // - If the aggregation_temporarily is delta - // - Store the unreported metrics for `collector` (which is in merged_mtrics) to - // `last reported metrics` stash. - - auto reported = last_reported_metrics_.find(collector); - if (reported != last_reported_metrics_.end()) - { - last_collection_ts = last_reported_metrics_[collector].collection_ts; - auto last_aggr_hashmap = std::move(last_reported_metrics_[collector].attributes_map); - if (aggregation_temporarily == AggregationTemporality::kCumulative) - { - // merge current delta to previous cumulative - last_aggr_hashmap->GetAllEnteries( - [&merged_metrics, this](const MetricAttributes &attributes, Aggregation &aggregation) { - auto agg = merged_metrics->Get(attributes); - if (agg) - { - merged_metrics->Set(attributes, agg->Merge(aggregation)); - } - else - { - merged_metrics->Set(attributes, - DefaultAggregation::CreateAggregation(instrument_descriptor_)); - } - return true; - }); - } - last_reported_metrics_[collector] = - LastReportedMetrics{std::move(merged_metrics), collection_ts}; - } - else - { - merged_metrics->GetAllEnteries( - [](const MetricAttributes &attr, Aggregation &aggr) { return true; }); - last_reported_metrics_.insert( - std::make_pair(collector, LastReportedMetrics{std::move(merged_metrics), collection_ts})); - } - - // Generate the MetricData from the final merged_metrics, and invoke callback over it. - - AttributesHashMap *result_to_export = (last_reported_metrics_[collector]).attributes_map.get(); - MetricData metric_data; - metric_data.instrument_descriptor = instrument_descriptor_; - metric_data.start_ts = last_collection_ts; - metric_data.end_ts = collection_ts; - result_to_export->GetAllEnteries( - [&metric_data](const MetricAttributes &attributes, Aggregation &aggregation) { - PointDataAttributes point_data_attr; - point_data_attr.point_data = aggregation.ToPoint(); - point_data_attr.attributes = attributes; - metric_data.point_data_attr_.push_back(point_data_attr); - return true; - }); - return callback(metric_data); + return temporal_metric_storage_.buildMetrics(collector, collectors, sdk_start_ts, collection_ts, + std::move(delta_metrics), callback); } } // namespace metrics