diff --git a/sdk/include/opentelemetry/sdk/metrics/state/async_metric_storage.h b/sdk/include/opentelemetry/sdk/metrics/state/async_metric_storage.h index b23443595b..79731a80bc 100644 --- a/sdk/include/opentelemetry/sdk/metrics/state/async_metric_storage.h +++ b/sdk/include/opentelemetry/sdk/metrics/state/async_metric_storage.h @@ -44,6 +44,7 @@ class AsyncMetricStorage : public MetricStorage, public AsyncWritableMetricStora opentelemetry::common::SystemTimestamp /* observation_time */) noexcept { // process the read measurements - aggregate and store in hashmap + std::lock_guard guard(hashmap_lock_); for (auto &measurement : measurements) { auto aggr = DefaultAggregation::CreateAggregation(aggregation_type_, instrument_descriptor_); @@ -96,10 +97,16 @@ class AsyncMetricStorage : public MetricStorage, public AsyncWritableMetricStora nostd::function_ref metric_collection_callback) noexcept override { - auto status = temporal_metric_storage_.buildMetrics(collector, collectors, sdk_start_ts, - collection_ts, std::move(delta_hash_map_), - metric_collection_callback); - delta_hash_map_.reset(new AttributesHashMap()); + std::shared_ptr delta_metrics = nullptr; + { + std::lock_guard guard(hashmap_lock_); + delta_metrics = std::move(delta_hash_map_); + delta_hash_map_.reset(new AttributesHashMap); + } + + auto status = + temporal_metric_storage_.buildMetrics(collector, collectors, sdk_start_ts, collection_ts, + delta_metrics, metric_collection_callback); return status; } @@ -110,6 +117,7 @@ class AsyncMetricStorage : public MetricStorage, public AsyncWritableMetricStora void *state_; std::unique_ptr cumulative_hash_map_; std::unique_ptr delta_hash_map_; + opentelemetry::common::SpinLockMutex hashmap_lock_; TemporalMetricStorage temporal_metric_storage_; }; diff --git a/sdk/include/opentelemetry/sdk/metrics/state/attributes_hashmap.h b/sdk/include/opentelemetry/sdk/metrics/state/attributes_hashmap.h index be86c96826..2f93b32f66 100644 --- a/sdk/include/opentelemetry/sdk/metrics/state/attributes_hashmap.h +++ b/sdk/include/opentelemetry/sdk/metrics/state/attributes_hashmap.h @@ -3,7 +3,6 @@ #pragma once #ifndef ENABLE_METRICS_PREVIEW -# include "opentelemetry/common/spin_lock_mutex.h" # include "opentelemetry/nostd/function_ref.h" # include "opentelemetry/sdk/common/attribute_utils.h" # include "opentelemetry/sdk/common/attributemap_hash.h" @@ -13,7 +12,6 @@ # include # include -# include # include OPENTELEMETRY_BEGIN_NAMESPACE @@ -37,7 +35,6 @@ class AttributesHashMap public: Aggregation *Get(const MetricAttributes &attributes) const { - std::lock_guard guard(lock_); auto it = hash_map_.find(attributes); if (it != hash_map_.end()) { @@ -52,7 +49,6 @@ class AttributesHashMap */ bool Has(const MetricAttributes &attributes) const { - std::lock_guard guard(lock_); return (hash_map_.find(attributes) == hash_map_.end()) ? false : true; } @@ -64,8 +60,6 @@ class AttributesHashMap Aggregation *GetOrSetDefault(const MetricAttributes &attributes, std::function()> aggregation_callback) { - std::lock_guard guard(lock_); - auto it = hash_map_.find(attributes); if (it != hash_map_.end()) { @@ -81,7 +75,6 @@ class AttributesHashMap */ void Set(const MetricAttributes &attributes, std::unique_ptr value) { - std::lock_guard guard(lock_); hash_map_[attributes] = std::move(value); } @@ -91,7 +84,6 @@ class AttributesHashMap bool GetAllEnteries( nostd::function_ref callback) const { - std::lock_guard guard(lock_); for (auto &kv : hash_map_) { if (!callback(kv.first, *(kv.second.get()))) @@ -105,17 +97,11 @@ class AttributesHashMap /** * Return the size of hash. */ - size_t Size() - { - std::lock_guard guard(lock_); - return hash_map_.size(); - } + size_t Size() { return hash_map_.size(); } private: std::unordered_map, AttributeHashGenerator> hash_map_; - - mutable opentelemetry::common::SpinLockMutex lock_; }; } // namespace metrics diff --git a/sdk/include/opentelemetry/sdk/metrics/state/sync_metric_storage.h b/sdk/include/opentelemetry/sdk/metrics/state/sync_metric_storage.h index a9b0604f05..16bac34079 100644 --- a/sdk/include/opentelemetry/sdk/metrics/state/sync_metric_storage.h +++ b/sdk/include/opentelemetry/sdk/metrics/state/sync_metric_storage.h @@ -52,6 +52,7 @@ class SyncMetricStorage : public MetricStorage, public SyncWritableMetricStorage return; } exemplar_reservoir_->OfferMeasurement(value, {}, context, std::chrono::system_clock::now()); + std::lock_guard guard(attribute_hashmap_lock_); attributes_hashmap_->GetOrSetDefault({}, create_default_aggregation_)->Aggregate(value); } @@ -67,6 +68,7 @@ class SyncMetricStorage : public MetricStorage, public SyncWritableMetricStorage exemplar_reservoir_->OfferMeasurement(value, attributes, context, std::chrono::system_clock::now()); auto attr = attributes_processor_->process(attributes); + std::lock_guard guard(attribute_hashmap_lock_); attributes_hashmap_->GetOrSetDefault(attr, create_default_aggregation_)->Aggregate(value); } @@ -77,6 +79,7 @@ class SyncMetricStorage : public MetricStorage, public SyncWritableMetricStorage return; } exemplar_reservoir_->OfferMeasurement(value, {}, context, std::chrono::system_clock::now()); + std::lock_guard guard(attribute_hashmap_lock_); attributes_hashmap_->GetOrSetDefault({}, create_default_aggregation_)->Aggregate(value); } @@ -93,6 +96,7 @@ class SyncMetricStorage : public MetricStorage, public SyncWritableMetricStorage exemplar_reservoir_->OfferMeasurement(value, attributes, context, std::chrono::system_clock::now()); auto attr = attributes_processor_->process(attributes); + std::lock_guard guard(attribute_hashmap_lock_); attributes_hashmap_->GetOrSetDefault(attr, create_default_aggregation_)->Aggregate(value); } @@ -117,6 +121,7 @@ class SyncMetricStorage : public MetricStorage, public SyncWritableMetricStorage std::function()> create_default_aggregation_; nostd::shared_ptr exemplar_reservoir_; TemporalMetricStorage temporal_metric_storage_; + opentelemetry::common::SpinLockMutex attribute_hashmap_lock_; }; } // namespace metrics diff --git a/sdk/src/metrics/state/sync_metric_storage.cc b/sdk/src/metrics/state/sync_metric_storage.cc index eec68ed6d7..be753ca98f 100644 --- a/sdk/src/metrics/state/sync_metric_storage.cc +++ b/sdk/src/metrics/state/sync_metric_storage.cc @@ -20,8 +20,12 @@ bool SyncMetricStorage::Collect(CollectorHandle *collector, // Add the current delta metrics to `unreported metrics stash` for all the collectors, // this will also empty the delta metrics hashmap, and make it available for // recordings - std::shared_ptr delta_metrics = std::move(attributes_hashmap_); - attributes_hashmap_.reset(new AttributesHashMap); + std::shared_ptr delta_metrics = nullptr; + { + std::lock_guard guard(attribute_hashmap_lock_); + delta_metrics = std::move(attributes_hashmap_); + attributes_hashmap_.reset(new AttributesHashMap); + } return temporal_metric_storage_.buildMetrics(collector, collectors, sdk_start_ts, collection_ts, std::move(delta_metrics), callback); diff --git a/sdk/test/metrics/attributes_hashmap_benchmark.cc b/sdk/test/metrics/attributes_hashmap_benchmark.cc index 2ebf03175a..92035809e0 100644 --- a/sdk/test/metrics/attributes_hashmap_benchmark.cc +++ b/sdk/test/metrics/attributes_hashmap_benchmark.cc @@ -25,14 +25,17 @@ void BM_AttributseHashMap(benchmark::State &state) std::vector attributes = {{{"k1", "v1"}, {"k2", "v2"}}, {{"k1", "v1"}, {"k2", "v2"}, {"k3", "v3"}}}; - auto work = [&attributes, &hash_map](const size_t i) { + std::mutex m; + + auto work = [&attributes, &hash_map, &m](const size_t i) { std::function()> create_default_aggregation = []() -> std::unique_ptr { return std::unique_ptr(new DropAggregation); }; - + m.lock(); hash_map.GetOrSetDefault(attributes[i % 2], create_default_aggregation)->Aggregate(1l); benchmark::DoNotOptimize(hash_map.Has(attributes[i % 2])); + m.unlock(); }; while (state.KeepRunning()) {