From 17a70e982aadeb702c12def819aeb643fe6bfb89 Mon Sep 17 00:00:00 2001 From: Lalit Date: Thu, 16 Mar 2023 09:26:23 -0700 Subject: [PATCH 01/10] fix --- .../export/periodic_exporting_metric_reader.h | 5 +++ .../periodic_exporting_metric_reader.cc | 41 ++++++++++++++++++- 2 files changed, 45 insertions(+), 1 deletion(-) diff --git a/sdk/include/opentelemetry/sdk/metrics/export/periodic_exporting_metric_reader.h b/sdk/include/opentelemetry/sdk/metrics/export/periodic_exporting_metric_reader.h index ecb5cc7e72..19a5c9988e 100644 --- a/sdk/include/opentelemetry/sdk/metrics/export/periodic_exporting_metric_reader.h +++ b/sdk/include/opentelemetry/sdk/metrics/export/periodic_exporting_metric_reader.h @@ -56,6 +56,11 @@ class PeriodicExportingMetricReader : public MetricReader std::chrono::milliseconds export_interval_millis_; std::chrono::milliseconds export_timeout_millis_; + std::mutex force_flush_m_; + std::atomic is_force_flush_pending_; + std::atomic is_force_wakeup_background_worker_; + + void DoBackgroundWork(); bool CollectAndExportOnce(); diff --git a/sdk/src/metrics/export/periodic_exporting_metric_reader.cc b/sdk/src/metrics/export/periodic_exporting_metric_reader.cc index bcca86bee0..e433464ff8 100644 --- a/sdk/src/metrics/export/periodic_exporting_metric_reader.cc +++ b/sdk/src/metrics/export/periodic_exporting_metric_reader.cc @@ -60,7 +60,18 @@ void PeriodicExportingMetricReader::DoBackgroundWork() auto end = std::chrono::steady_clock::now(); auto export_time_ms = std::chrono::duration_cast(end - start); auto remaining_wait_interval_ms = export_interval_millis_ - export_time_ms; - cv_.wait_for(lk, remaining_wait_interval_ms); + cv_.wait_for(lk, remaining_wait_interval_ms, [this]() { + if (is_force_wakeup_background_worker_.load(std::memory_order_acquire)) + { + return true; + } + if (IsShutdown()) + { + return true; + } + return false; + }); + is_force_wakeup_background_worker_.store(false, std::memory_order_release); } while (IsShutdown() != true); // One last Collect and Export before shutdown auto status = CollectAndExportOnce(); @@ -96,11 +107,39 @@ bool PeriodicExportingMetricReader::CollectAndExportOnce() break; } } while (status != std::future_status::ready); + bool notify_force_flush = + is_force_flush_pending_.exchange(false, std::memory_order_acq_rel); + if (notify_force_flush) + { + is_force_flush_notified_.store(true, std::memory_order_release); + force_flush_cv_.notify_one(); + } + return true; } bool PeriodicExportingMetricReader::OnForceFlush(std::chrono::microseconds timeout) noexcept { + std::unique_lock lk_cv(force_flush_m_); + is_force_flush_pending_.store(true, std::memory_order_release); + auto break_condition = [this]() { + if (IsShutdown()) + { + return true; + } + + // Wake up the worker thread once. + if (is_force_flush_pending_.load(std::memory_order_acquire)) + { + is_force_wakeup_background_worker_.store(true, std::memory_order_release); + cv_.notify_one(); + } + + return synchronization_data_->is_force_flush_notified.load(std::memory_order_acquire); + }; + + + return exporter_->ForceFlush(timeout); } From 85fd20ade771bc61303f4de4d3c4a948e565c89a Mon Sep 17 00:00:00 2001 From: Lalit Kumar Bhasin Date: Thu, 16 Mar 2023 14:38:24 -0700 Subject: [PATCH 02/10] fix --- .../export/periodic_exporting_metric_reader.h | 12 +++--- .../periodic_exporting_metric_reader.cc | 39 ++++++++++++------- 2 files changed, 30 insertions(+), 21 deletions(-) diff --git a/sdk/include/opentelemetry/sdk/metrics/export/periodic_exporting_metric_reader.h b/sdk/include/opentelemetry/sdk/metrics/export/periodic_exporting_metric_reader.h index 19a5c9988e..83773ffb61 100644 --- a/sdk/include/opentelemetry/sdk/metrics/export/periodic_exporting_metric_reader.h +++ b/sdk/include/opentelemetry/sdk/metrics/export/periodic_exporting_metric_reader.h @@ -56,11 +56,6 @@ class PeriodicExportingMetricReader : public MetricReader std::chrono::milliseconds export_interval_millis_; std::chrono::milliseconds export_timeout_millis_; - std::mutex force_flush_m_; - std::atomic is_force_flush_pending_; - std::atomic is_force_wakeup_background_worker_; - - void DoBackgroundWork(); bool CollectAndExportOnce(); @@ -68,8 +63,11 @@ class PeriodicExportingMetricReader : public MetricReader std::thread worker_thread_; /* Synchronization primitives */ - std::condition_variable cv_; - std::mutex cv_m_; + std::atomic is_force_flush_pending_; + std::atomic is_force_wakeup_background_worker_; + std::atomic is_force_flush_notified_; + std::condition_variable cv_, force_flush_cv_; + std::mutex cv_m_, force_flush_m_; }; } // namespace metrics diff --git a/sdk/src/metrics/export/periodic_exporting_metric_reader.cc b/sdk/src/metrics/export/periodic_exporting_metric_reader.cc index e433464ff8..8eb363945d 100644 --- a/sdk/src/metrics/export/periodic_exporting_metric_reader.cc +++ b/sdk/src/metrics/export/periodic_exporting_metric_reader.cc @@ -60,9 +60,10 @@ void PeriodicExportingMetricReader::DoBackgroundWork() auto end = std::chrono::steady_clock::now(); auto export_time_ms = std::chrono::duration_cast(end - start); auto remaining_wait_interval_ms = export_interval_millis_ - export_time_ms; - cv_.wait_for(lk, remaining_wait_interval_ms, [this]() { + cv_.wait_for(lk, remaining_wait_interval_ms, [this]() { if (is_force_wakeup_background_worker_.load(std::memory_order_acquire)) { + is_force_wakeup_background_worker_.store(false, std::memory_order_release); return true; } if (IsShutdown()) @@ -71,14 +72,7 @@ void PeriodicExportingMetricReader::DoBackgroundWork() } return false; }); - is_force_wakeup_background_worker_.store(false, std::memory_order_release); } while (IsShutdown() != true); - // One last Collect and Export before shutdown - auto status = CollectAndExportOnce(); - if (!status) - { - OTEL_INTERNAL_LOG_ERROR("[Periodic Exporting Metric Reader] Collect-Export Cycle Failure.") - } } bool PeriodicExportingMetricReader::CollectAndExportOnce() @@ -107,14 +101,13 @@ bool PeriodicExportingMetricReader::CollectAndExportOnce() break; } } while (status != std::future_status::ready); - bool notify_force_flush = - is_force_flush_pending_.exchange(false, std::memory_order_acq_rel); + bool notify_force_flush = is_force_flush_pending_.exchange(false, std::memory_order_acq_rel); if (notify_force_flush) { is_force_flush_notified_.store(true, std::memory_order_release); force_flush_cv_.notify_one(); } - + return true; } @@ -134,13 +127,31 @@ bool PeriodicExportingMetricReader::OnForceFlush(std::chrono::microseconds timeo is_force_wakeup_background_worker_.store(true, std::memory_order_release); cv_.notify_one(); } - - return synchronization_data_->is_force_flush_notified.load(std::memory_order_acquire); }; + bool status = force_flush_cv_.wait_for(lk_cv, timeout, break_condition); + // If it will be already signaled, we must wait util notified. + // We use a spin lock here + if (false == is_force_flush_pending_.exchange(false, std::memory_order_acq_rel)) + { + for (int retry_waiting_times = 0; + false == is_force_flush_notified_.load(std::memory_order_acquire); ++retry_waiting_times) + { + opentelemetry::common::SpinLockMutex::fast_yield(); + if ((retry_waiting_times & 127) == 127) + { + std::this_thread::yield(); + } + } + } + is_force_flush_notified_.store(false, std::memory_order_release); + if (status) + { - return exporter_->ForceFlush(timeout); + status = exporter_->ForceFlush(timeout); + } + return status; } bool PeriodicExportingMetricReader::OnShutDown(std::chrono::microseconds timeout) noexcept From 2e24554ec8dcde84c75cd30d260c412a97bf1871 Mon Sep 17 00:00:00 2001 From: Lalit Kumar Bhasin Date: Thu, 16 Mar 2023 15:35:45 -0700 Subject: [PATCH 03/10] fix --- sdk/src/metrics/export/periodic_exporting_metric_reader.cc | 1 + 1 file changed, 1 insertion(+) diff --git a/sdk/src/metrics/export/periodic_exporting_metric_reader.cc b/sdk/src/metrics/export/periodic_exporting_metric_reader.cc index 8eb363945d..8ee7d28e42 100644 --- a/sdk/src/metrics/export/periodic_exporting_metric_reader.cc +++ b/sdk/src/metrics/export/periodic_exporting_metric_reader.cc @@ -127,6 +127,7 @@ bool PeriodicExportingMetricReader::OnForceFlush(std::chrono::microseconds timeo is_force_wakeup_background_worker_.store(true, std::memory_order_release); cv_.notify_one(); } + return is_force_flush_notified_.load(std::memory_order_acquire); }; bool status = force_flush_cv_.wait_for(lk_cv, timeout, break_condition); From 57dac9ca35bd602390f29061a98e45f5c5b0c8a2 Mon Sep 17 00:00:00 2001 From: Lalit Date: Mon, 20 Mar 2023 17:22:24 -0700 Subject: [PATCH 04/10] fix --- .../periodic_exporting_metric_reader.cc | 26 ++++++++++++++++--- .../periodic_exporting_metric_reader_test.cc | 1 + 2 files changed, 23 insertions(+), 4 deletions(-) diff --git a/sdk/src/metrics/export/periodic_exporting_metric_reader.cc b/sdk/src/metrics/export/periodic_exporting_metric_reader.cc index 8ee7d28e42..4d0320a031 100644 --- a/sdk/src/metrics/export/periodic_exporting_metric_reader.cc +++ b/sdk/src/metrics/export/periodic_exporting_metric_reader.cc @@ -129,8 +129,26 @@ bool PeriodicExportingMetricReader::OnForceFlush(std::chrono::microseconds timeo } return is_force_flush_notified_.load(std::memory_order_acquire); }; + timeout = opentelemetry::common::DurationUtil::AdjustWaitForTimeout( + timeout, std::chrono::microseconds::zero()); - bool status = force_flush_cv_.wait_for(lk_cv, timeout, break_condition); + std::chrono::steady_clock::duration timeout_steady = + std::chrono::duration_cast(timeout); + if (timeout_steady <= std::chrono::steady_clock::duration::zero()) + { + timeout_steady = std::chrono::steady_clock::duration::max(); + } + + bool result = false; + while (!result && timeout_steady > std::chrono::steady_clock::duration::zero()) + { + // When is_force_flush_notified.store(true) and force_flush_cv.notify_all() is called + // between is_force_flush_pending.load() and force_flush_cv.wait(). We must not wait + // for ever + std::chrono::steady_clock::time_point start_timepoint = std::chrono::steady_clock::now(); + result = force_flush_cv_.wait_for(lk_cv, export_interval_millis_, break_condition); + timeout_steady -= std::chrono::steady_clock::now() - start_timepoint; + } // If it will be already signaled, we must wait util notified. // We use a spin lock here @@ -147,12 +165,12 @@ bool PeriodicExportingMetricReader::OnForceFlush(std::chrono::microseconds timeo } } is_force_flush_notified_.store(false, std::memory_order_release); - if (status) + if (result) { - status = exporter_->ForceFlush(timeout); + result = exporter_->ForceFlush(timeout); } - return status; + return result; } bool PeriodicExportingMetricReader::OnShutDown(std::chrono::microseconds timeout) noexcept diff --git a/sdk/test/metrics/periodic_exporting_metric_reader_test.cc b/sdk/test/metrics/periodic_exporting_metric_reader_test.cc index cd789b5028..e115f79f75 100644 --- a/sdk/test/metrics/periodic_exporting_metric_reader_test.cc +++ b/sdk/test/metrics/periodic_exporting_metric_reader_test.cc @@ -70,6 +70,7 @@ TEST(PeriodicExporingMetricReader, BasicTests) MockMetricProducer producer; reader.SetMetricProducer(&producer); std::this_thread::sleep_for(std::chrono::milliseconds(2000)); + EXPECT_NO_THROW(reader.ForceFlush()); reader.Shutdown(); EXPECT_EQ(static_cast(exporter_ptr)->GetDataCount(), static_cast(&producer)->GetDataCount()); From 9ff3d23210cdab2a6bcddb3379fcb666cd3b3b81 Mon Sep 17 00:00:00 2001 From: Lalit Date: Mon, 20 Mar 2023 17:33:59 -0700 Subject: [PATCH 05/10] spacing --- .../metrics/export/periodic_exporting_metric_reader.cc | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/sdk/src/metrics/export/periodic_exporting_metric_reader.cc b/sdk/src/metrics/export/periodic_exporting_metric_reader.cc index 4d0320a031..e4be79a3b0 100644 --- a/sdk/src/metrics/export/periodic_exporting_metric_reader.cc +++ b/sdk/src/metrics/export/periodic_exporting_metric_reader.cc @@ -91,6 +91,7 @@ bool PeriodicExportingMetricReader::CollectAndExportOnce() return true; }); }); + std::future_status status; do { @@ -129,9 +130,9 @@ bool PeriodicExportingMetricReader::OnForceFlush(std::chrono::microseconds timeo } return is_force_flush_notified_.load(std::memory_order_acquire); }; + timeout = opentelemetry::common::DurationUtil::AdjustWaitForTimeout( timeout, std::chrono::microseconds::zero()); - std::chrono::steady_clock::duration timeout_steady = std::chrono::duration_cast(timeout); if (timeout_steady <= std::chrono::steady_clock::duration::zero()) @@ -142,8 +143,8 @@ bool PeriodicExportingMetricReader::OnForceFlush(std::chrono::microseconds timeo bool result = false; while (!result && timeout_steady > std::chrono::steady_clock::duration::zero()) { - // When is_force_flush_notified.store(true) and force_flush_cv.notify_all() is called - // between is_force_flush_pending.load() and force_flush_cv.wait(). We must not wait + // When is_force_flush_notified_.store(true) and force_flush_cv_.notify_all() is called + // between is_force_flush_pending_.load() and force_flush_cv_.wait(). We must not wait // for ever std::chrono::steady_clock::time_point start_timepoint = std::chrono::steady_clock::now(); result = force_flush_cv_.wait_for(lk_cv, export_interval_millis_, break_condition); @@ -164,6 +165,7 @@ bool PeriodicExportingMetricReader::OnForceFlush(std::chrono::microseconds timeo } } } + is_force_flush_notified_.store(false, std::memory_order_release); if (result) { From 94315f36790c2802a78f550736ea89af5d84e584 Mon Sep 17 00:00:00 2001 From: Lalit Date: Mon, 20 Mar 2023 17:34:54 -0700 Subject: [PATCH 06/10] spacing --- sdk/src/metrics/export/periodic_exporting_metric_reader.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/src/metrics/export/periodic_exporting_metric_reader.cc b/sdk/src/metrics/export/periodic_exporting_metric_reader.cc index e4be79a3b0..d28d70f150 100644 --- a/sdk/src/metrics/export/periodic_exporting_metric_reader.cc +++ b/sdk/src/metrics/export/periodic_exporting_metric_reader.cc @@ -165,7 +165,7 @@ bool PeriodicExportingMetricReader::OnForceFlush(std::chrono::microseconds timeo } } } - + is_force_flush_notified_.store(false, std::memory_order_release); if (result) { From 5f9c7837645851c5919bcaf0744eaa35f469e295 Mon Sep 17 00:00:00 2001 From: Lalit Date: Mon, 20 Mar 2023 21:38:33 -0700 Subject: [PATCH 07/10] fix --- sdk/src/metrics/export/periodic_exporting_metric_reader.cc | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdk/src/metrics/export/periodic_exporting_metric_reader.cc b/sdk/src/metrics/export/periodic_exporting_metric_reader.cc index d28d70f150..9fe7eb228c 100644 --- a/sdk/src/metrics/export/periodic_exporting_metric_reader.cc +++ b/sdk/src/metrics/export/periodic_exporting_metric_reader.cc @@ -169,8 +169,8 @@ bool PeriodicExportingMetricReader::OnForceFlush(std::chrono::microseconds timeo is_force_flush_notified_.store(false, std::memory_order_release); if (result) { - - result = exporter_->ForceFlush(timeout); + result = exporter_->ForceFlush( + std::chrono::duration_cast(timeout_steady)); } return result; } From 5973ec72f599bdf111e6f7a4417bc76c630917a4 Mon Sep 17 00:00:00 2001 From: Lalit Date: Tue, 21 Mar 2023 22:59:02 -0700 Subject: [PATCH 08/10] fix comments --- .../periodic_exporting_metric_reader.cc | 34 ++++++++++++++++--- 1 file changed, 30 insertions(+), 4 deletions(-) diff --git a/sdk/src/metrics/export/periodic_exporting_metric_reader.cc b/sdk/src/metrics/export/periodic_exporting_metric_reader.cc index 9fe7eb228c..3e379ee53b 100644 --- a/sdk/src/metrics/export/periodic_exporting_metric_reader.cc +++ b/sdk/src/metrics/export/periodic_exporting_metric_reader.cc @@ -131,10 +131,10 @@ bool PeriodicExportingMetricReader::OnForceFlush(std::chrono::microseconds timeo return is_force_flush_notified_.load(std::memory_order_acquire); }; - timeout = opentelemetry::common::DurationUtil::AdjustWaitForTimeout( + auto wait_timeout = opentelemetry::common::DurationUtil::AdjustWaitForTimeout( timeout, std::chrono::microseconds::zero()); std::chrono::steady_clock::duration timeout_steady = - std::chrono::duration_cast(timeout); + std::chrono::duration_cast(wait_timeout); if (timeout_steady <= std::chrono::steady_clock::duration::zero()) { timeout_steady = std::chrono::steady_clock::duration::max(); @@ -167,10 +167,36 @@ bool PeriodicExportingMetricReader::OnForceFlush(std::chrono::microseconds timeo } is_force_flush_notified_.store(false, std::memory_order_release); + + if (timeout == std::chrono::steady_clock::duration::zero()) + { + } + if (timeout_steady <= std::chrono::steady_clock::duration::zero()) + { + // forceflush timeout, exporter force-flush won't be triggered. + result = false; + } + if (result) { - result = exporter_->ForceFlush( - std::chrono::duration_cast(timeout_steady)); + // - If original `timeout` is `zero`, use that in exporter::forceflush + // - Else if remaining `timeout_steady` more than zero, use that in exporter::forceflush + // - Else don't invoke exporter::forceflush ( as remaining time is zero or less) + if (timeout <= std::chrono::steady_clock::duration::zero()) + { + result = + exporter_->ForceFlush(std::chrono::duration_cast(timeout)); + } + else if (timeout_steady > std::chrono::steady_clock::duration::zero()) + { + result = exporter_->ForceFlush( + std::chrono::duration_cast(timeout_steady)); + } + else + { + // remaining timeout_steady is zero or less + result = false; + } } return result; } From ea5ca54ff059783453da74aa48e926ae73e2d722 Mon Sep 17 00:00:00 2001 From: Lalit Kumar Bhasin Date: Thu, 23 Mar 2023 14:11:56 -0700 Subject: [PATCH 09/10] fix --- .../metrics/export/periodic_exporting_metric_reader.cc | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/sdk/src/metrics/export/periodic_exporting_metric_reader.cc b/sdk/src/metrics/export/periodic_exporting_metric_reader.cc index 3e379ee53b..66528a210b 100644 --- a/sdk/src/metrics/export/periodic_exporting_metric_reader.cc +++ b/sdk/src/metrics/export/periodic_exporting_metric_reader.cc @@ -165,18 +165,8 @@ bool PeriodicExportingMetricReader::OnForceFlush(std::chrono::microseconds timeo } } } - is_force_flush_notified_.store(false, std::memory_order_release); - if (timeout == std::chrono::steady_clock::duration::zero()) - { - } - if (timeout_steady <= std::chrono::steady_clock::duration::zero()) - { - // forceflush timeout, exporter force-flush won't be triggered. - result = false; - } - if (result) { // - If original `timeout` is `zero`, use that in exporter::forceflush From 8a3eb559656e6f164372c4ca2610025b70da1911 Mon Sep 17 00:00:00 2001 From: Lalit Date: Fri, 24 Mar 2023 23:28:49 -0700 Subject: [PATCH 10/10] review comments --- .../metrics/export/periodic_exporting_metric_reader.cc | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/sdk/src/metrics/export/periodic_exporting_metric_reader.cc b/sdk/src/metrics/export/periodic_exporting_metric_reader.cc index 66528a210b..dfa4a5ee6b 100644 --- a/sdk/src/metrics/export/periodic_exporting_metric_reader.cc +++ b/sdk/src/metrics/export/periodic_exporting_metric_reader.cc @@ -66,11 +66,7 @@ void PeriodicExportingMetricReader::DoBackgroundWork() is_force_wakeup_background_worker_.store(false, std::memory_order_release); return true; } - if (IsShutdown()) - { - return true; - } - return false; + return IsShutdown(); }); } while (IsShutdown() != true); } @@ -151,7 +147,7 @@ bool PeriodicExportingMetricReader::OnForceFlush(std::chrono::microseconds timeo timeout_steady -= std::chrono::steady_clock::now() - start_timepoint; } - // If it will be already signaled, we must wait util notified. + // If it will be already signaled, we must wait until notified. // We use a spin lock here if (false == is_force_flush_pending_.exchange(false, std::memory_order_acq_rel)) {