Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SDK] Implement Forceflush for Periodic Metric Reader #2064

Merged
merged 16 commits into from
Mar 27, 2023
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,11 @@ class PeriodicExportingMetricReader : public MetricReader
std::thread worker_thread_;

/* Synchronization primitives */
std::condition_variable cv_;
std::mutex cv_m_;
std::atomic<bool> is_force_flush_pending_;
std::atomic<bool> is_force_wakeup_background_worker_;
std::atomic<bool> is_force_flush_notified_;
std::condition_variable cv_, force_flush_cv_;
std::mutex cv_m_, force_flush_m_;
};

} // namespace metrics
Expand Down
87 changes: 79 additions & 8 deletions sdk/src/metrics/export/periodic_exporting_metric_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -60,14 +60,19 @@ void PeriodicExportingMetricReader::DoBackgroundWork()
auto end = std::chrono::steady_clock::now();
auto export_time_ms = std::chrono::duration_cast<std::chrono::milliseconds>(end - start);
auto remaining_wait_interval_ms = export_interval_millis_ - export_time_ms;
cv_.wait_for(lk, remaining_wait_interval_ms);
cv_.wait_for(lk, remaining_wait_interval_ms, [this]() {
if (is_force_wakeup_background_worker_.load(std::memory_order_acquire))
{
is_force_wakeup_background_worker_.store(false, std::memory_order_release);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can store fail then false should be returned?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know if it can ever fail. It's atomic assignment operation so should eventually be successful. Also, this method returns void, not sure how to get any failure status from this method.
Let me know if I am missing something :)

return true;
}
if (IsShutdown())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: return IsShutdown();.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed. thanks

{
return true;
}
return false;
});
} while (IsShutdown() != true);
// One last Collect and Export before shutdown
auto status = CollectAndExportOnce();
if (!status)
{
OTEL_INTERNAL_LOG_ERROR("[Periodic Exporting Metric Reader] Collect-Export Cycle Failure.")
}
}

bool PeriodicExportingMetricReader::CollectAndExportOnce()
Expand All @@ -86,6 +91,7 @@ bool PeriodicExportingMetricReader::CollectAndExportOnce()
return true;
});
});

std::future_status status;
do
{
Expand All @@ -96,12 +102,77 @@ bool PeriodicExportingMetricReader::CollectAndExportOnce()
break;
}
} while (status != std::future_status::ready);
bool notify_force_flush = is_force_flush_pending_.exchange(false, std::memory_order_acq_rel);
if (notify_force_flush)
{
is_force_flush_notified_.store(true, std::memory_order_release);
force_flush_cv_.notify_one();
}

return true;
}

bool PeriodicExportingMetricReader::OnForceFlush(std::chrono::microseconds timeout) noexcept
{
return exporter_->ForceFlush(timeout);
std::unique_lock<std::mutex> lk_cv(force_flush_m_);
is_force_flush_pending_.store(true, std::memory_order_release);
auto break_condition = [this]() {
if (IsShutdown())
{
return true;
}

// Wake up the worker thread once.
if (is_force_flush_pending_.load(std::memory_order_acquire))
{
is_force_wakeup_background_worker_.store(true, std::memory_order_release);
cv_.notify_one();
}
return is_force_flush_notified_.load(std::memory_order_acquire);
};

timeout = opentelemetry::common::DurationUtil::AdjustWaitForTimeout(
timeout, std::chrono::microseconds::zero());
std::chrono::steady_clock::duration timeout_steady =
std::chrono::duration_cast<std::chrono::steady_clock::duration>(timeout);
if (timeout_steady <= std::chrono::steady_clock::duration::zero())
{
timeout_steady = std::chrono::steady_clock::duration::max();
}

bool result = false;
while (!result && timeout_steady > std::chrono::steady_clock::duration::zero())
{
// When is_force_flush_notified_.store(true) and force_flush_cv_.notify_all() is called
// between is_force_flush_pending_.load() and force_flush_cv_.wait(). We must not wait
// for ever
std::chrono::steady_clock::time_point start_timepoint = std::chrono::steady_clock::now();
result = force_flush_cv_.wait_for(lk_cv, export_interval_millis_, break_condition);
timeout_steady -= std::chrono::steady_clock::now() - start_timepoint;
}

// If it will be already signaled, we must wait util notified.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// If it will be already signaled, we must wait util notified.
// If it will be already signaled, we must wait until notified.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed. thanks.

// We use a spin lock here
if (false == is_force_flush_pending_.exchange(false, std::memory_order_acq_rel))
{
for (int retry_waiting_times = 0;
false == is_force_flush_notified_.load(std::memory_order_acquire); ++retry_waiting_times)
{
opentelemetry::common::SpinLockMutex::fast_yield();
if ((retry_waiting_times & 127) == 127)
{
std::this_thread::yield();
}
}
}

is_force_flush_notified_.store(false, std::memory_order_release);
if (result)
{

result = exporter_->ForceFlush(timeout);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it better to use timeout_steady if it's greater than zero here? We have already wait some time before and the left timeout should be less.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. Have changed it to use timeout_steady now.

}
return result;
}

bool PeriodicExportingMetricReader::OnShutDown(std::chrono::microseconds timeout) noexcept
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ TEST(PeriodicExporingMetricReader, BasicTests)
MockMetricProducer producer;
reader.SetMetricProducer(&producer);
std::this_thread::sleep_for(std::chrono::milliseconds(2000));
EXPECT_NO_THROW(reader.ForceFlush());
reader.Shutdown();
EXPECT_EQ(static_cast<MockPushMetricExporter *>(exporter_ptr)->GetDataCount(),
static_cast<MockMetricProducer *>(&producer)->GetDataCount());
Expand Down