From 6f53da3ccdb73882a6b252d073f474f6c1f9f30c Mon Sep 17 00:00:00 2001 From: DEBAJIT DAS <85024550+DebajitDas@users.noreply.github.com> Date: Mon, 21 Mar 2022 19:31:51 +0530 Subject: [PATCH] Async callback Exporter to Processor changes (#1275) --- .../exporters/elasticsearch/es_log_exporter.h | 11 ++ .../elasticsearch/src/es_log_exporter.cc | 132 +++++++++++++++++- .../exporters/jaeger/jaeger_exporter.h | 9 ++ exporters/jaeger/src/jaeger_exporter.cc | 9 ++ .../memory/in_memory_span_exporter.h | 14 ++ .../exporters/ostream/log_exporter.h | 7 + .../exporters/ostream/span_exporter.h | 6 + exporters/ostream/src/log_exporter.cc | 10 ++ exporters/ostream/src/span_exporter.cc | 9 ++ .../exporters/otlp/otlp_grpc_exporter.h | 9 ++ .../exporters/otlp/otlp_grpc_log_exporter.h | 10 ++ .../exporters/otlp/otlp_http_client.h | 6 +- .../exporters/otlp/otlp_http_exporter.h | 10 ++ .../exporters/otlp/otlp_http_log_exporter.h | 10 ++ exporters/otlp/src/otlp_grpc_exporter.cc | 10 ++ exporters/otlp/src/otlp_grpc_log_exporter.cc | 10 ++ exporters/otlp/src/otlp_http_client.cc | 6 +- exporters/otlp/src/otlp_http_exporter.cc | 11 ++ exporters/otlp/src/otlp_http_log_exporter.cc | 11 ++ .../otlp/test/otlp_http_exporter_test.cc | 13 +- .../otlp/test/otlp_http_log_exporter_test.cc | 13 +- .../exporters/zipkin/zipkin_exporter.h | 9 ++ exporters/zipkin/src/zipkin_exporter.cc | 9 ++ .../ext/http/client/curl/http_client_curl.h | 10 +- .../ext/http/client/http_client.h | 2 +- .../http/client/nosend/http_client_nosend.h | 2 +- ext/test/http/curl_http_test.cc | 16 +-- ext/test/w3c_tracecontext_test/main.cc | 4 +- .../sdk/logs/batch_log_processor.h | 24 +++- sdk/include/opentelemetry/sdk/logs/exporter.h | 9 ++ .../sdk/logs/simple_log_processor.h | 4 +- .../sdk/trace/batch_span_processor.h | 25 +++- .../opentelemetry/sdk/trace/exporter.h | 9 ++ .../sdk/trace/simple_processor.h | 23 ++- sdk/src/logs/batch_log_processor.cc | 61 +++++++- sdk/src/logs/simple_log_processor.cc | 20 ++- sdk/src/trace/batch_span_processor.cc | 56 +++++++- sdk/test/logs/batch_log_processor_test.cc | 120 +++++++++++++++- sdk/test/logs/simple_log_processor_test.cc | 14 ++ sdk/test/trace/batch_span_processor_test.cc | 99 +++++++++++++ sdk/test/trace/simple_processor_test.cc | 7 + 41 files changed, 785 insertions(+), 64 deletions(-) diff --git a/exporters/elasticsearch/include/opentelemetry/exporters/elasticsearch/es_log_exporter.h b/exporters/elasticsearch/include/opentelemetry/exporters/elasticsearch/es_log_exporter.h index ea58807e96..50e763e0c2 100644 --- a/exporters/elasticsearch/include/opentelemetry/exporters/elasticsearch/es_log_exporter.h +++ b/exporters/elasticsearch/include/opentelemetry/exporters/elasticsearch/es_log_exporter.h @@ -89,6 +89,17 @@ class ElasticsearchLogExporter final : public opentelemetry::sdk::logs::LogExpor const opentelemetry::nostd::span> &records) noexcept override; + /** + * Exports a vector of log records to the Elasticsearch instance asynchronously. + * @param records A list of log records to send to Elasticsearch. + * @param result_callback callback function accepting ExportResult as argument + */ + void Export( + const opentelemetry::nostd::span> + &records, + nostd::function_ref result_callback) noexcept + override; + /** * Shutdown this exporter. * @param timeout The maximum time to wait for the shutdown method to return diff --git a/exporters/elasticsearch/src/es_log_exporter.cc b/exporters/elasticsearch/src/es_log_exporter.cc index a5a66ebe01..991678d6b9 100644 --- a/exporters/elasticsearch/src/es_log_exporter.cc +++ b/exporters/elasticsearch/src/es_log_exporter.cc @@ -110,6 +110,89 @@ class ResponseHandler : public http_client::EventHandler bool console_debug_ = false; }; +/** + * This class handles the async response message from the Elasticsearch request + */ +class AsyncResponseHandler : public http_client::EventHandler +{ +public: + /** + * Creates a response handler, that by default doesn't display to console + */ + AsyncResponseHandler( + std::shared_ptr session, + nostd::function_ref result_callback, + bool console_debug = false) + : console_debug_{console_debug}, + session_{std::move(session)}, + result_callback_{result_callback} + {} + + /** + * Cleans up the session in the destructor. + */ + ~AsyncResponseHandler() { session_->FinishSession(); } + + /** + * Automatically called when the response is received + */ + void OnResponse(http_client::Response &response) noexcept override + { + + // Store the body of the request + body_ = std::string(response.GetBody().begin(), response.GetBody().end()); + if (body_.find("\"failed\" : 0") == std::string::npos) + { + OTEL_INTERNAL_LOG_ERROR( + "[ES Trace Exporter] Logs were not written to Elasticsearch correctly, response body: " + << body_); + result_callback_(sdk::common::ExportResult::kFailure); + } + else + { + result_callback_(sdk::common::ExportResult::kSuccess); + } + } + + // Callback method when an http event occurs + void OnEvent(http_client::SessionState state, nostd::string_view reason) noexcept override + { + // If any failure event occurs, release the condition variable to unblock main thread + switch (state) + { + case http_client::SessionState::ConnectFailed: + OTEL_INTERNAL_LOG_ERROR("[ES Trace Exporter] Connection to elasticsearch failed"); + break; + case http_client::SessionState::SendFailed: + OTEL_INTERNAL_LOG_ERROR("[ES Trace Exporter] Request failed to be sent to elasticsearch"); + + break; + case http_client::SessionState::TimedOut: + OTEL_INTERNAL_LOG_ERROR("[ES Trace Exporter] Request to elasticsearch timed out"); + + break; + case http_client::SessionState::NetworkError: + OTEL_INTERNAL_LOG_ERROR("[ES Trace Exporter] Network error to elasticsearch"); + break; + default: + break; + } + result_callback_(sdk::common::ExportResult::kFailure); + } + +private: + // Stores the session object for the request + std::shared_ptr session_; + // Callback to call to on receiving events + nostd::function_ref result_callback_; + + // A string to store the response body + std::string body_ = ""; + + // Whether to print the results from the callback + bool console_debug_ = false; +}; + ElasticsearchLogExporter::ElasticsearchLogExporter() : options_{ElasticsearchExporterOptions()}, http_client_{new ext::http::client::curl::HttpClient()} @@ -162,8 +245,8 @@ sdk::common::ExportResult ElasticsearchLogExporter::Export( request->SetBody(body_vec); // Send the request - std::unique_ptr handler(new ResponseHandler(options_.console_debug_)); - session->SendRequest(*handler); + auto handler = std::make_shared(options_.console_debug_); + session->SendRequest(handler); // Wait for the response to be received if (options_.console_debug_) @@ -198,6 +281,51 @@ sdk::common::ExportResult ElasticsearchLogExporter::Export( return sdk::common::ExportResult::kSuccess; } +void ElasticsearchLogExporter::Export( + const opentelemetry::nostd::span> + &records, + nostd::function_ref result_callback) noexcept +{ + // Return failure if this exporter has been shutdown + if (isShutdown()) + { + OTEL_INTERNAL_LOG_ERROR("[ES Log Exporter] Exporting " + << records.size() << " log(s) failed, exporter is shutdown"); + return; + } + + // Create a connection to the ElasticSearch instance + auto session = http_client_->CreateSession(options_.host_ + std::to_string(options_.port_)); + auto request = session->CreateRequest(); + + // Populate the request with headers and methods + request->SetUri(options_.index_ + "/_bulk?pretty"); + request->SetMethod(http_client::Method::Post); + request->AddHeader("Content-Type", "application/json"); + request->SetTimeoutMs(std::chrono::milliseconds(1000 * options_.response_timeout_)); + + // Create the request body + std::string body = ""; + for (auto &record : records) + { + // Append {"index":{}} before JSON body, which tells Elasticsearch to write to index specified + // in URI + body += "{\"index\" : {}}\n"; + + // Add the context of the Recordable + auto json_record = std::unique_ptr( + static_cast(record.release())); + body += json_record->GetJSON().dump() + "\n"; + } + std::vector body_vec(body.begin(), body.end()); + request->SetBody(body_vec); + + // Send the request + auto handler = + std::make_shared(session, result_callback, options_.console_debug_); + session->SendRequest(handler); +} + bool ElasticsearchLogExporter::Shutdown(std::chrono::microseconds timeout) noexcept { const std::lock_guard locked(lock_); diff --git a/exporters/jaeger/include/opentelemetry/exporters/jaeger/jaeger_exporter.h b/exporters/jaeger/include/opentelemetry/exporters/jaeger/jaeger_exporter.h index 284bab2cab..eb3b4bd621 100644 --- a/exporters/jaeger/include/opentelemetry/exporters/jaeger/jaeger_exporter.h +++ b/exporters/jaeger/include/opentelemetry/exporters/jaeger/jaeger_exporter.h @@ -61,6 +61,15 @@ class JaegerExporter final : public opentelemetry::sdk::trace::SpanExporter const nostd::span> &spans) noexcept override; + /** + * Exports a batch of span recordables asynchronously. + * @param spans a span of unique pointers to span recordables + * @param result_callback callback function accepting ExportResult as argument + */ + void Export(const nostd::span> &spans, + nostd::function_ref + result_callback) noexcept override; + /** * Shutdown the exporter. * @param timeout an option timeout, default to max. diff --git a/exporters/jaeger/src/jaeger_exporter.cc b/exporters/jaeger/src/jaeger_exporter.cc index c07f2f0100..4a028773ca 100644 --- a/exporters/jaeger/src/jaeger_exporter.cc +++ b/exporters/jaeger/src/jaeger_exporter.cc @@ -70,6 +70,15 @@ sdk_common::ExportResult JaegerExporter::Export( return sdk_common::ExportResult::kSuccess; } +void JaegerExporter::Export( + const nostd::span> &spans, + nostd::function_ref result_callback) noexcept +{ + OTEL_INTERNAL_LOG_WARN(" async not supported. Making sync interface call"); + auto status = Export(spans); + result_callback(status); +} + void JaegerExporter::InitializeEndpoint() { if (options_.transport_format == TransportFormat::kThriftUdpCompact) diff --git a/exporters/memory/include/opentelemetry/exporters/memory/in_memory_span_exporter.h b/exporters/memory/include/opentelemetry/exporters/memory/in_memory_span_exporter.h index 3e47ccb177..3ebd3b8e89 100644 --- a/exporters/memory/include/opentelemetry/exporters/memory/in_memory_span_exporter.h +++ b/exporters/memory/include/opentelemetry/exporters/memory/in_memory_span_exporter.h @@ -64,6 +64,20 @@ class InMemorySpanExporter final : public opentelemetry::sdk::trace::SpanExporte return sdk::common::ExportResult::kSuccess; } + /** + * Exports a batch of span recordables asynchronously. + * @param spans a span of unique pointers to span recordables + * @param result_callback callback function accepting ExportResult as argument + */ + void Export( + const nostd::span> &spans, + nostd::function_ref result_callback) noexcept override + { + OTEL_INTERNAL_LOG_WARN(" async not supported. Making sync interface call"); + auto status = Export(spans); + result_callback(status); + } + /** * @param timeout an optional value containing the timeout of the exporter * note: passing custom timeout values is not currently supported for this exporter diff --git a/exporters/ostream/include/opentelemetry/exporters/ostream/log_exporter.h b/exporters/ostream/include/opentelemetry/exporters/ostream/log_exporter.h index ad1d54a215..017d967c70 100644 --- a/exporters/ostream/include/opentelemetry/exporters/ostream/log_exporter.h +++ b/exporters/ostream/include/opentelemetry/exporters/ostream/log_exporter.h @@ -39,6 +39,13 @@ class OStreamLogExporter final : public opentelemetry::sdk::logs::LogExporter const opentelemetry::nostd::span> &records) noexcept override; + /** + * Exports a span of logs sent from the processor asynchronously. + */ + void Export(const opentelemetry::nostd::span> &records, + opentelemetry::nostd::function_ref + result_callback) noexcept; + /** * Marks the OStream Log Exporter as shut down. */ diff --git a/exporters/ostream/include/opentelemetry/exporters/ostream/span_exporter.h b/exporters/ostream/include/opentelemetry/exporters/ostream/span_exporter.h index 8122b6777a..5af47280be 100644 --- a/exporters/ostream/include/opentelemetry/exporters/ostream/span_exporter.h +++ b/exporters/ostream/include/opentelemetry/exporters/ostream/span_exporter.h @@ -38,6 +38,12 @@ class OStreamSpanExporter final : public opentelemetry::sdk::trace::SpanExporter const opentelemetry::nostd::span> &spans) noexcept override; + void Export( + const opentelemetry::nostd::span> + &spans, + opentelemetry::nostd::function_ref + result_callback) noexcept override; + bool Shutdown( std::chrono::microseconds timeout = std::chrono::microseconds::max()) noexcept override; diff --git a/exporters/ostream/src/log_exporter.cc b/exporters/ostream/src/log_exporter.cc index a8ea060481..d4122d6899 100644 --- a/exporters/ostream/src/log_exporter.cc +++ b/exporters/ostream/src/log_exporter.cc @@ -180,6 +180,16 @@ sdk::common::ExportResult OStreamLogExporter::Export( return sdk::common::ExportResult::kSuccess; } +void OStreamLogExporter::Export( + const opentelemetry::nostd::span> &records, + opentelemetry::nostd::function_ref + result_callback) noexcept +{ + // Do not have async support + auto result = Export(records); + result_callback(result); +} + bool OStreamLogExporter::Shutdown(std::chrono::microseconds) noexcept { const std::lock_guard locked(lock_); diff --git a/exporters/ostream/src/span_exporter.cc b/exporters/ostream/src/span_exporter.cc index dea72f57f8..24fcf6007b 100644 --- a/exporters/ostream/src/span_exporter.cc +++ b/exporters/ostream/src/span_exporter.cc @@ -96,6 +96,15 @@ sdk::common::ExportResult OStreamSpanExporter::Export( return sdk::common::ExportResult::kSuccess; } +void OStreamSpanExporter::Export( + const opentelemetry::nostd::span> &spans, + opentelemetry::nostd::function_ref + result_callback) noexcept +{ + auto result = Export(spans); + result_callback(result); +} + bool OStreamSpanExporter::Shutdown(std::chrono::microseconds timeout) noexcept { const std::lock_guard locked(lock_); diff --git a/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_grpc_exporter.h b/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_grpc_exporter.h index a28e6fca85..722e525b04 100644 --- a/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_grpc_exporter.h +++ b/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_grpc_exporter.h @@ -52,6 +52,15 @@ class OtlpGrpcExporter final : public opentelemetry::sdk::trace::SpanExporter sdk::common::ExportResult Export( const nostd::span> &spans) noexcept override; + /** + * Exports a batch of span recordables asynchronously. + * @param spans a span of unique pointers to span recordables + * @param result_callback callback function accepting ExportResult as argument + */ + virtual void Export( + const nostd::span> &spans, + nostd::function_ref result_callback) noexcept override; + /** * Shut down the exporter. * @param timeout an optional timeout, the default timeout of 0 means that no diff --git a/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_grpc_log_exporter.h b/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_grpc_log_exporter.h index a8aeda85b8..5b7f79e13b 100644 --- a/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_grpc_log_exporter.h +++ b/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_grpc_log_exporter.h @@ -55,6 +55,16 @@ class OtlpGrpcLogExporter : public opentelemetry::sdk::logs::LogExporter const nostd::span> &records) noexcept override; + /** + * Exports a vector of log records asynchronously. + * @param records A list of log records. + * @param result_callback callback function accepting ExportResult as argument + */ + virtual void Export( + const nostd::span> &records, + nostd::function_ref result_callback) noexcept + override; + /** * Shutdown this exporter. * @param timeout The maximum time to wait for the shutdown method to return. diff --git a/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_http_client.h b/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_http_client.h index 554f792cdb..0c1c1fe684 100644 --- a/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_http_client.h +++ b/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_http_client.h @@ -141,7 +141,7 @@ class OtlpHttpClient */ void addSession( std::shared_ptr session, - std::unique_ptr &&event_handle) noexcept; + std::shared_ptr event_handle) noexcept; /** * @brief Real delete all sessions and event handles. @@ -168,13 +168,13 @@ class OtlpHttpClient struct HttpSessionData { std::shared_ptr session; - std::unique_ptr event_handle; + std::shared_ptr event_handle; inline HttpSessionData() = default; inline explicit HttpSessionData( std::shared_ptr &&input_session, - std::unique_ptr &&input_handle) + std::shared_ptr &&input_handle) { session.swap(input_session); event_handle.swap(input_handle); diff --git a/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_http_exporter.h b/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_http_exporter.h index 20b67bc6af..599204c262 100644 --- a/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_http_exporter.h +++ b/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_http_exporter.h @@ -86,6 +86,16 @@ class OtlpHttpExporter final : public opentelemetry::sdk::trace::SpanExporter const nostd::span> &spans) noexcept override; + /** + * Exports a batch of span recordables asynchronously. + * @param spans a span of unique pointers to span recordables + * @param result_callback callback function accepting ExportResult as argument + */ + virtual void Export( + const nostd::span> &spans, + nostd::function_ref result_callback) noexcept + override; + /** * Shut down the exporter. * @param timeout an optional timeout, the default timeout of 0 means that no diff --git a/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_http_log_exporter.h b/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_http_log_exporter.h index e5d49cdac3..417f1dcbf8 100644 --- a/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_http_log_exporter.h +++ b/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_http_log_exporter.h @@ -87,6 +87,16 @@ class OtlpHttpLogExporter final : public opentelemetry::sdk::logs::LogExporter const nostd::span> &records) noexcept override; + /** + * Exports a vector of log records asynchronously. + * @param records A list of log records. + * @param result_callback callback function accepting ExportResult as argument + */ + virtual void Export( + const nostd::span> &records, + nostd::function_ref result_callback) noexcept + override; + /** * Shutdown this exporter. * @param timeout The maximum time to wait for the shutdown method to return diff --git a/exporters/otlp/src/otlp_grpc_exporter.cc b/exporters/otlp/src/otlp_grpc_exporter.cc index 32f4a60a52..f191580b7f 100644 --- a/exporters/otlp/src/otlp_grpc_exporter.cc +++ b/exporters/otlp/src/otlp_grpc_exporter.cc @@ -143,6 +143,16 @@ sdk::common::ExportResult OtlpGrpcExporter::Export( return sdk::common::ExportResult::kSuccess; } +void OtlpGrpcExporter::Export( + const nostd::span> &spans, + nostd::function_ref result_callback) noexcept +{ + OTEL_INTERNAL_LOG_WARN( + "[OTLP TRACE GRPC Exporter] async not supported. Making sync interface call"); + auto status = Export(spans); + result_callback(status); +} + bool OtlpGrpcExporter::Shutdown(std::chrono::microseconds timeout) noexcept { const std::lock_guard locked(lock_); diff --git a/exporters/otlp/src/otlp_grpc_log_exporter.cc b/exporters/otlp/src/otlp_grpc_log_exporter.cc index 38bfb0a5bb..b56f8d9caa 100644 --- a/exporters/otlp/src/otlp_grpc_log_exporter.cc +++ b/exporters/otlp/src/otlp_grpc_log_exporter.cc @@ -161,6 +161,16 @@ opentelemetry::sdk::common::ExportResult OtlpGrpcLogExporter::Export( return sdk::common::ExportResult::kSuccess; } +void OtlpGrpcLogExporter::Export( + const nostd::span> &logs, + nostd::function_ref result_callback) noexcept +{ + OTEL_INTERNAL_LOG_WARN( + "[OTLP LOG GRPC Exporter] async not supported. Making sync interface call"); + auto status = Export(logs); + result_callback(status); +} + bool OtlpGrpcLogExporter::Shutdown(std::chrono::microseconds timeout) noexcept { const std::lock_guard locked(lock_); diff --git a/exporters/otlp/src/otlp_http_client.cc b/exporters/otlp/src/otlp_http_client.cc index abc24a927a..79c0b69fd8 100644 --- a/exporters/otlp/src/otlp_http_client.cc +++ b/exporters/otlp/src/otlp_http_client.cc @@ -764,7 +764,7 @@ opentelemetry::sdk::common::ExportResult OtlpHttpClient::Export( request->ReplaceHeader("Content-Type", content_type); // Send the request - addSession(std::move(session), std::unique_ptr{ + addSession(std::move(session), std::shared_ptr{ new ResponseHandler(options_.console_debug)}); } @@ -861,7 +861,7 @@ void OtlpHttpClient::ReleaseSession( void OtlpHttpClient::addSession( std::shared_ptr session, - std::unique_ptr &&event_handle) noexcept + std::shared_ptr event_handle) noexcept { if (!session || !event_handle) { @@ -878,7 +878,7 @@ void OtlpHttpClient::addSession( session_data.event_handle.swap(event_handle); // Send request after the session is added - key->SendRequest(*handle); + key->SendRequest(session_data.event_handle); } bool OtlpHttpClient::cleanupGCSessions() noexcept diff --git a/exporters/otlp/src/otlp_http_exporter.cc b/exporters/otlp/src/otlp_http_exporter.cc index cfaa06a479..29926d1651 100644 --- a/exporters/otlp/src/otlp_http_exporter.cc +++ b/exporters/otlp/src/otlp_http_exporter.cc @@ -11,6 +11,8 @@ #include "opentelemetry/exporters/otlp/protobuf_include_suffix.h" +#include "opentelemetry/sdk/common/global_log_handler.h" + namespace nostd = opentelemetry::nostd; OPENTELEMETRY_BEGIN_NAMESPACE @@ -57,6 +59,15 @@ opentelemetry::sdk::common::ExportResult OtlpHttpExporter::Export( return http_client_->Export(service_request); } +void OtlpHttpExporter::Export( + const nostd::span> &spans, + nostd::function_ref result_callback) noexcept +{ + OTEL_INTERNAL_LOG_WARN(" async not supported. Making sync interface call"); + auto status = Export(spans); + result_callback(status); +} + bool OtlpHttpExporter::Shutdown(std::chrono::microseconds timeout) noexcept { return http_client_->Shutdown(timeout); diff --git a/exporters/otlp/src/otlp_http_log_exporter.cc b/exporters/otlp/src/otlp_http_log_exporter.cc index 8a0cc536d4..bd17c4fb37 100644 --- a/exporters/otlp/src/otlp_http_log_exporter.cc +++ b/exporters/otlp/src/otlp_http_log_exporter.cc @@ -13,6 +13,8 @@ # include "opentelemetry/exporters/otlp/protobuf_include_suffix.h" +# include "opentelemetry/sdk/common/global_log_handler.h" + namespace nostd = opentelemetry::nostd; OPENTELEMETRY_BEGIN_NAMESPACE @@ -58,6 +60,15 @@ opentelemetry::sdk::common::ExportResult OtlpHttpLogExporter::Export( return http_client_->Export(service_request); } +void OtlpHttpLogExporter::Export( + const nostd::span> &logs, + nostd::function_ref result_callback) noexcept +{ + OTEL_INTERNAL_LOG_WARN(" async not supported. Making sync interface call"); + auto status = Export(logs); + result_callback(status); +} + bool OtlpHttpLogExporter::Shutdown(std::chrono::microseconds timeout) noexcept { return http_client_->Shutdown(timeout); diff --git a/exporters/otlp/test/otlp_http_exporter_test.cc b/exporters/otlp/test/otlp_http_exporter_test.cc index e9dbd711e8..12c7494b2e 100644 --- a/exporters/otlp/test/otlp_http_exporter_test.cc +++ b/exporters/otlp/test/otlp_http_exporter_test.cc @@ -138,8 +138,8 @@ TEST_F(OtlpHttpExporterTestPeer, ExportJsonIntegrationTest) auto mock_session = std::static_pointer_cast(no_send_client->session_); EXPECT_CALL(*mock_session, SendRequest) - .WillOnce([&mock_session, - report_trace_id](opentelemetry::ext::http::client::EventHandler &callback) { + .WillOnce([&mock_session, report_trace_id]( + std::shared_ptr callback) { auto check_json = nlohmann::json::parse(mock_session->GetRequest()->body_, nullptr, false); auto resource_span = *check_json["resource_spans"].begin(); auto instrumentation_library_span = *resource_span["instrumentation_library_spans"].begin(); @@ -155,7 +155,7 @@ TEST_F(OtlpHttpExporterTestPeer, ExportJsonIntegrationTest) } // let the otlp_http_client to continue http_client::nosend::Response response; - callback.OnResponse(response); + callback->OnResponse(response); }); child_span->End(); @@ -219,8 +219,8 @@ TEST_F(OtlpHttpExporterTestPeer, ExportBinaryIntegrationTest) auto mock_session = std::static_pointer_cast(no_send_client->session_); EXPECT_CALL(*mock_session, SendRequest) - .WillOnce([&mock_session, - report_trace_id](opentelemetry::ext::http::client::EventHandler &callback) { + .WillOnce([&mock_session, report_trace_id]( + std::shared_ptr callback) { opentelemetry::proto::collector::trace::v1::ExportTraceServiceRequest request_body; request_body.ParseFromArray(&mock_session->GetRequest()->body_[0], static_cast(mock_session->GetRequest()->body_.size())); @@ -237,7 +237,8 @@ TEST_F(OtlpHttpExporterTestPeer, ExportBinaryIntegrationTest) // let the otlp_http_client to continue http_client::nosend::Response response; - response.Finish(callback); + + response.Finish(*callback.get()); }); child_span->End(); diff --git a/exporters/otlp/test/otlp_http_log_exporter_test.cc b/exporters/otlp/test/otlp_http_log_exporter_test.cc index 1373b2ece3..e49824f0b7 100644 --- a/exporters/otlp/test/otlp_http_log_exporter_test.cc +++ b/exporters/otlp/test/otlp_http_log_exporter_test.cc @@ -129,8 +129,8 @@ TEST_F(OtlpHttpLogExporterTestPeer, ExportJsonIntegrationTest) auto mock_session = std::static_pointer_cast(no_send_client->session_); EXPECT_CALL(*mock_session, SendRequest) - .WillOnce([&mock_session, report_trace_id, - report_span_id](opentelemetry::ext::http::client::EventHandler &callback) { + .WillOnce([&mock_session, report_trace_id, report_span_id]( + std::shared_ptr callback) { auto check_json = nlohmann::json::parse(mock_session->GetRequest()->body_, nullptr, false); auto resource_logs = *check_json["resource_logs"].begin(); auto instrumentation_library_span = *resource_logs["instrumentation_library_logs"].begin(); @@ -151,7 +151,8 @@ TEST_F(OtlpHttpLogExporterTestPeer, ExportJsonIntegrationTest) // let the otlp_http_client to continue http_client::nosend::Response response; - response.Finish(callback); + + response.Finish(*callback.get()); }); logger->Log(opentelemetry::logs::Severity::kInfo, "Log name", "Log message", @@ -217,8 +218,8 @@ TEST_F(OtlpHttpLogExporterTestPeer, ExportBinaryIntegrationTest) auto mock_session = std::static_pointer_cast(no_send_client->session_); EXPECT_CALL(*mock_session, SendRequest) - .WillOnce([&mock_session, report_trace_id, - report_span_id](opentelemetry::ext::http::client::EventHandler &callback) { + .WillOnce([&mock_session, report_trace_id, report_span_id]( + std::shared_ptr callback) { opentelemetry::proto::collector::logs::v1::ExportLogsServiceRequest request_body; request_body.ParseFromArray(&mock_session->GetRequest()->body_[0], static_cast(mock_session->GetRequest()->body_.size())); @@ -239,7 +240,7 @@ TEST_F(OtlpHttpLogExporterTestPeer, ExportBinaryIntegrationTest) } ASSERT_TRUE(check_service_name); http_client::nosend::Response response; - callback.OnResponse(response); + callback->OnResponse(response); }); logger->Log(opentelemetry::logs::Severity::kInfo, "Log name", "Log message", diff --git a/exporters/zipkin/include/opentelemetry/exporters/zipkin/zipkin_exporter.h b/exporters/zipkin/include/opentelemetry/exporters/zipkin/zipkin_exporter.h index ae0e8173f9..3ac8c04028 100644 --- a/exporters/zipkin/include/opentelemetry/exporters/zipkin/zipkin_exporter.h +++ b/exporters/zipkin/include/opentelemetry/exporters/zipkin/zipkin_exporter.h @@ -78,6 +78,15 @@ class ZipkinExporter final : public opentelemetry::sdk::trace::SpanExporter const nostd::span> &spans) noexcept override; + /** + * Export asynchronosly a batch of span recordables in JSON format + * @param spans a span of unique pointers to span recordables + * @param result_callback callback function accepting ExportResult as argument + */ + void Export(const nostd::span> &spans, + nostd::function_ref + result_callback) noexcept override; + /** * Shut down the exporter. * @param timeout an optional timeout, default to max. diff --git a/exporters/zipkin/src/zipkin_exporter.cc b/exporters/zipkin/src/zipkin_exporter.cc index 240144599f..050ed4c9a8 100644 --- a/exporters/zipkin/src/zipkin_exporter.cc +++ b/exporters/zipkin/src/zipkin_exporter.cc @@ -93,6 +93,15 @@ sdk::common::ExportResult ZipkinExporter::Export( return sdk::common::ExportResult::kSuccess; } +void ZipkinExporter::Export( + const nostd::span> &spans, + nostd::function_ref result_callback) noexcept +{ + OTEL_INTERNAL_LOG_WARN("[ZIPKIN EXPORTER] async not supported. Making sync interface call"); + auto status = Export(spans); + result_callback(status); +} + void ZipkinExporter::InitializeLocalEndpoint() { if (options_.service_name.length()) diff --git a/ext/include/opentelemetry/ext/http/client/curl/http_client_curl.h b/ext/include/opentelemetry/ext/http/client/curl/http_client_curl.h index 1448bbbea3..ac263193bc 100644 --- a/ext/include/opentelemetry/ext/http/client/curl/http_client_curl.h +++ b/ext/include/opentelemetry/ext/http/client/curl/http_client_curl.h @@ -143,19 +143,19 @@ class Session : public opentelemetry::ext::http::client::Session } virtual void SendRequest( - opentelemetry::ext::http::client::EventHandler &callback) noexcept override + std::shared_ptr callback) noexcept override { is_session_active_ = true; std::string url = host_ + std::string(http_request_->uri_); - auto callback_ptr = &callback; + auto callback_ptr = callback.get(); curl_operation_.reset(new HttpOperation( http_request_->method_, url, callback_ptr, RequestMode::Async, http_request_->headers_, http_request_->body_, false, http_request_->timeout_ms_)); - curl_operation_->SendAsync([this, callback_ptr](HttpOperation &operation) { + curl_operation_->SendAsync([this, callback](HttpOperation &operation) { if (operation.WasAborted()) { // Manually cancelled - callback_ptr->OnEvent(opentelemetry::ext::http::client::SessionState::Cancelled, ""); + callback->OnEvent(opentelemetry::ext::http::client::SessionState::Cancelled, ""); } if (operation.GetResponseCode() >= CURL_LAST) @@ -165,7 +165,7 @@ class Session : public opentelemetry::ext::http::client::Session response->headers_ = operation.GetResponseHeaders(); response->body_ = operation.GetResponseBody(); response->status_code_ = operation.GetResponseCode(); - callback_ptr->OnResponse(*response); + callback->OnResponse(*response); } is_session_active_ = false; }); diff --git a/ext/include/opentelemetry/ext/http/client/http_client.h b/ext/include/opentelemetry/ext/http/client/http_client.h index 308335e492..e939962653 100644 --- a/ext/include/opentelemetry/ext/http/client/http_client.h +++ b/ext/include/opentelemetry/ext/http/client/http_client.h @@ -212,7 +212,7 @@ class Session public: virtual std::shared_ptr CreateRequest() noexcept = 0; - virtual void SendRequest(EventHandler &) noexcept = 0; + virtual void SendRequest(std::shared_ptr) noexcept = 0; virtual bool IsSessionActive() noexcept = 0; diff --git a/ext/include/opentelemetry/ext/http/client/nosend/http_client_nosend.h b/ext/include/opentelemetry/ext/http/client/nosend/http_client_nosend.h index c89d0cb5f2..405381c285 100644 --- a/ext/include/opentelemetry/ext/http/client/nosend/http_client_nosend.h +++ b/ext/include/opentelemetry/ext/http/client/nosend/http_client_nosend.h @@ -134,7 +134,7 @@ class Session : public opentelemetry::ext::http::client::Session MOCK_METHOD(void, SendRequest, - (opentelemetry::ext::http::client::EventHandler &), + (std::shared_ptr), (noexcept, override)); virtual bool CancelSession() noexcept override; diff --git a/ext/test/http/curl_http_test.cc b/ext/test/http/curl_http_test.cc index f8d248bae4..7085a1da33 100644 --- a/ext/test/http/curl_http_test.cc +++ b/ext/test/http/curl_http_test.cc @@ -196,12 +196,11 @@ TEST_F(BasicCurlHttpTests, SendGetRequest) auto session = session_manager->CreateSession("http://127.0.0.1:19000"); auto request = session->CreateRequest(); request->SetUri("get/"); - GetEventHandler *handler = new GetEventHandler(); - session->SendRequest(*handler); + auto handler = std::make_shared(); + session->SendRequest(handler); ASSERT_TRUE(waitForRequests(30, 1)); session->FinishSession(); ASSERT_TRUE(handler->is_called_); - delete handler; } TEST_F(BasicCurlHttpTests, SendPostRequest) @@ -219,16 +218,14 @@ TEST_F(BasicCurlHttpTests, SendPostRequest) http_client::Body body = {b, b + strlen(b)}; request->SetBody(body); request->AddHeader("Content-Type", "text/plain"); - PostEventHandler *handler = new PostEventHandler(); - session->SendRequest(*handler); + auto handler = std::make_shared(); + session->SendRequest(handler); ASSERT_TRUE(waitForRequests(30, 1)); session->FinishSession(); ASSERT_TRUE(handler->is_called_); session_manager->CancelAllSessions(); session_manager->FinishAllSessions(); - - delete handler; } TEST_F(BasicCurlHttpTests, RequestTimeout) @@ -240,11 +237,10 @@ TEST_F(BasicCurlHttpTests, RequestTimeout) auto session = session_manager->CreateSession("222.222.222.200:19000"); // Non Existing address auto request = session->CreateRequest(); request->SetUri("get/"); - GetEventHandler *handler = new GetEventHandler(); - session->SendRequest(*handler); + auto handler = std::make_shared(); + session->SendRequest(handler); session->FinishSession(); ASSERT_FALSE(handler->is_called_); - delete handler; } TEST_F(BasicCurlHttpTests, CurlHttpOperations) diff --git a/ext/test/w3c_tracecontext_test/main.cc b/ext/test/w3c_tracecontext_test/main.cc index 79aa4c9169..ca54475540 100644 --- a/ext/test/w3c_tracecontext_test/main.cc +++ b/ext/test/w3c_tracecontext_test/main.cc @@ -100,7 +100,7 @@ class NoopEventHandler : public http_client::EventHandler // Sends an HTTP POST request to the given url, with the given body. void send_request(curl::HttpClient &client, const std::string &url, const std::string &body) { - static std::unique_ptr handler(new NoopEventHandler()); + static std::shared_ptr handler(new NoopEventHandler()); auto request_span = get_tracer()->StartSpan(__func__); trace_api::Scope scope(request_span); @@ -126,7 +126,7 @@ void send_request(curl::HttpClient &client, const std::string &url, const std::s request->AddHeader(hdr.first, hdr.second); } - session->SendRequest(*handler); + session->SendRequest(handler); session->FinishSession(); } diff --git a/sdk/include/opentelemetry/sdk/logs/batch_log_processor.h b/sdk/include/opentelemetry/sdk/logs/batch_log_processor.h index 6ec66a6f16..1387c9c82b 100644 --- a/sdk/include/opentelemetry/sdk/logs/batch_log_processor.h +++ b/sdk/include/opentelemetry/sdk/logs/batch_log_processor.h @@ -41,7 +41,8 @@ class BatchLogProcessor : public LogProcessor std::unique_ptr &&exporter, const size_t max_queue_size = 2048, const std::chrono::milliseconds scheduled_delay_millis = std::chrono::milliseconds(5000), - const size_t max_export_batch_size = 512); + const size_t max_export_batch_size = 512, + const bool is_export_async = false); /** Makes a new recordable **/ std::unique_ptr MakeRecordable() noexcept override; @@ -98,6 +99,19 @@ class BatchLogProcessor : public LogProcessor */ void DrainQueue(); + /** + * Should be called from Export to notify the main thread on Force Flush Completion + * @param was_force_flush_called - A flag to check if the current export is the result + * of a call to ForceFlush method. If true, then we have to + * notify the main thread to wake it up in the ForceFlush + * method. + */ + void NotifyForceFlushCompletion(const bool was_for_flush_called); + + /* In case of async export, wait and notify for shutdown to be completed.*/ + void WaitForShutdownCompletion(); + void NotifyShutdownCompletion(); + /* The configured backend log exporter */ std::unique_ptr exporter_; @@ -105,10 +119,11 @@ class BatchLogProcessor : public LogProcessor const size_t max_queue_size_; const std::chrono::milliseconds scheduled_delay_millis_; const size_t max_export_batch_size_; + const bool is_export_async_; /* Synchronization primitives */ - std::condition_variable cv_, force_flush_cv_; - std::mutex cv_m_, force_flush_cv_m_, shutdown_m_; + std::condition_variable cv_, force_flush_cv_, async_shutdown_cv_; + std::mutex cv_m_, force_flush_cv_m_, shutdown_m_, async_shutdown_m_; /* The buffer/queue to which the ended logs are added */ common::CircularBuffer buffer_; @@ -117,6 +132,7 @@ class BatchLogProcessor : public LogProcessor std::atomic is_shutdown_; std::atomic is_force_flush_; std::atomic is_force_flush_notified_; + std::atomic is_async_shutdown_notified_; /* The background worker thread */ std::thread worker_thread_; @@ -125,4 +141,4 @@ class BatchLogProcessor : public LogProcessor } // namespace logs } // namespace sdk OPENTELEMETRY_END_NAMESPACE -#endif +#endif \ No newline at end of file diff --git a/sdk/include/opentelemetry/sdk/logs/exporter.h b/sdk/include/opentelemetry/sdk/logs/exporter.h index 85c58e9f12..05990471d0 100644 --- a/sdk/include/opentelemetry/sdk/logs/exporter.h +++ b/sdk/include/opentelemetry/sdk/logs/exporter.h @@ -46,6 +46,15 @@ class LogExporter virtual sdk::common::ExportResult Export( const nostd::span> &records) noexcept = 0; + /** + * Exports asynchronously the batch of log records to their export destination + * @param records a span of unique pointers to log records + * @param result_callback callback function accepting ExportResult as argument + */ + virtual void Export( + const nostd::span> &records, + nostd::function_ref result_callback) noexcept = 0; + /** * Marks the exporter as ShutDown and cleans up any resources as required. * Shutdown should be called only once for each Exporter instance. diff --git a/sdk/include/opentelemetry/sdk/logs/simple_log_processor.h b/sdk/include/opentelemetry/sdk/logs/simple_log_processor.h index cc3aec47b2..28fcca78a6 100644 --- a/sdk/include/opentelemetry/sdk/logs/simple_log_processor.h +++ b/sdk/include/opentelemetry/sdk/logs/simple_log_processor.h @@ -28,7 +28,8 @@ class SimpleLogProcessor : public LogProcessor { public: - explicit SimpleLogProcessor(std::unique_ptr &&exporter); + explicit SimpleLogProcessor(std::unique_ptr &&exporter, + bool is_export_async = false); virtual ~SimpleLogProcessor() = default; std::unique_ptr MakeRecordable() noexcept override; @@ -48,6 +49,7 @@ class SimpleLogProcessor : public LogProcessor opentelemetry::common::SpinLockMutex lock_; // The atomic boolean flag to ensure the ShutDown() function is only called once std::atomic_flag shutdown_latch_ = ATOMIC_FLAG_INIT; + bool is_export_async_ = false; }; } // namespace logs } // namespace sdk diff --git a/sdk/include/opentelemetry/sdk/trace/batch_span_processor.h b/sdk/include/opentelemetry/sdk/trace/batch_span_processor.h index d9486ac58f..ba923dd7e7 100644 --- a/sdk/include/opentelemetry/sdk/trace/batch_span_processor.h +++ b/sdk/include/opentelemetry/sdk/trace/batch_span_processor.h @@ -37,6 +37,12 @@ struct BatchSpanProcessorOptions * equal to max_queue_size. */ size_t max_export_batch_size = 512; + + /** + * Determines whether the export happens asynchronously. + * Default implementation is synchronous. + */ + bool is_export_async = false; }; /** @@ -129,6 +135,19 @@ class BatchSpanProcessor : public SpanProcessor */ void DrainQueue(); + /** + * Should be called from Export to notify the main thread on Force Flush Completion + * @param was_force_flush_called - A flag to check if the current export is the result + * of a call to ForceFlush method. If true, then we have to + * notify the main thread to wake it up in the ForceFlush + * method. + */ + void NotifyForceFlushCompletion(const bool was_for_flush_called); + + /* In case of async export, wait and notify for shutdown to be completed.*/ + void WaitForShutdownCompletion(); + void NotifyShutdownCompletion(); + /* The configured backend exporter */ std::unique_ptr exporter_; @@ -136,10 +155,11 @@ class BatchSpanProcessor : public SpanProcessor const size_t max_queue_size_; const std::chrono::milliseconds schedule_delay_millis_; const size_t max_export_batch_size_; + const bool is_export_async_; /* Synchronization primitives */ - std::condition_variable cv_, force_flush_cv_; - std::mutex cv_m_, force_flush_cv_m_, shutdown_m_; + std::condition_variable cv_, force_flush_cv_, async_shutdown_cv_; + std::mutex cv_m_, force_flush_cv_m_, shutdown_m_, async_shutdown_m_; /* The buffer/queue to which the ended spans are added */ common::CircularBuffer buffer_; @@ -148,6 +168,7 @@ class BatchSpanProcessor : public SpanProcessor std::atomic is_shutdown_; std::atomic is_force_flush_; std::atomic is_force_flush_notified_; + std::atomic is_async_shutdown_notified_; /* The background worker thread */ std::thread worker_thread_; diff --git a/sdk/include/opentelemetry/sdk/trace/exporter.h b/sdk/include/opentelemetry/sdk/trace/exporter.h index 5826b5f454..8078b23e48 100644 --- a/sdk/include/opentelemetry/sdk/trace/exporter.h +++ b/sdk/include/opentelemetry/sdk/trace/exporter.h @@ -42,6 +42,15 @@ class SpanExporter const nostd::span> &spans) noexcept = 0; + /** + * Exports a batch of span recordables asynchronously. + * @param spans a span of unique pointers to span recordables + * @param result_callback callback function accepting ExportResult as argument + */ + virtual void Export( + const nostd::span> &spans, + nostd::function_ref result_callback) noexcept = 0; + /** * Shut down the exporter. * @param timeout an optional timeout. diff --git a/sdk/include/opentelemetry/sdk/trace/simple_processor.h b/sdk/include/opentelemetry/sdk/trace/simple_processor.h index accc685965..982a432e0c 100644 --- a/sdk/include/opentelemetry/sdk/trace/simple_processor.h +++ b/sdk/include/opentelemetry/sdk/trace/simple_processor.h @@ -31,8 +31,9 @@ class SimpleSpanProcessor : public SpanProcessor * Initialize a simple span processor. * @param exporter the exporter used by the span processor */ - explicit SimpleSpanProcessor(std::unique_ptr &&exporter) noexcept - : exporter_(std::move(exporter)) + explicit SimpleSpanProcessor(std::unique_ptr &&exporter, + bool is_export_async = false) noexcept + : exporter_(std::move(exporter)), is_export_async_(is_export_async) {} std::unique_ptr MakeRecordable() noexcept override @@ -48,10 +49,21 @@ class SimpleSpanProcessor : public SpanProcessor { nostd::span> batch(&span, 1); const std::lock_guard locked(lock_); - if (exporter_->Export(batch) == sdk::common::ExportResult::kFailure) + if (is_export_async_ == false) { - /* Once it is defined how the SDK does logging, an error should be - * logged in this case. */ + if (exporter_->Export(batch) == sdk::common::ExportResult::kFailure) + { + /* Once it is defined how the SDK does logging, an error should be + * logged in this case. */ + } + } + else + { + exporter_->Export(batch, [](sdk::common::ExportResult result) { + /* Log the result + */ + return true; + }); } } @@ -78,6 +90,7 @@ class SimpleSpanProcessor : public SpanProcessor std::unique_ptr exporter_; opentelemetry::common::SpinLockMutex lock_; std::atomic_flag shutdown_latch_ = ATOMIC_FLAG_INIT; + bool is_export_async_ = false; }; } // namespace trace } // namespace sdk diff --git a/sdk/src/logs/batch_log_processor.cc b/sdk/src/logs/batch_log_processor.cc index 59e5d29551..b8db44f861 100644 --- a/sdk/src/logs/batch_log_processor.cc +++ b/sdk/src/logs/batch_log_processor.cc @@ -16,17 +16,20 @@ namespace logs BatchLogProcessor::BatchLogProcessor(std::unique_ptr &&exporter, const size_t max_queue_size, const std::chrono::milliseconds scheduled_delay_millis, - const size_t max_export_batch_size) + const size_t max_export_batch_size, + const bool is_export_async) : exporter_(std::move(exporter)), max_queue_size_(max_queue_size), scheduled_delay_millis_(scheduled_delay_millis), max_export_batch_size_(max_export_batch_size), buffer_(max_queue_size_), + is_export_async_(is_export_async), worker_thread_(&BatchLogProcessor::DoBackgroundWork, this) { is_shutdown_.store(false); is_force_flush_.store(false); is_force_flush_notified_.store(false); + is_async_shutdown_notified_.store(false); } std::unique_ptr BatchLogProcessor::MakeRecordable() noexcept @@ -171,10 +174,34 @@ void BatchLogProcessor::Export(const bool was_force_flush_called) }); }); - exporter_->Export( - nostd::span>(records_arr.data(), records_arr.size())); + if (is_export_async_ == false) + { + exporter_->Export( + nostd::span>(records_arr.data(), records_arr.size())); + } + else + { + exporter_->Export( + nostd::span>(records_arr.data(), records_arr.size()), + [this, was_force_flush_called](sdk::common::ExportResult result) { + // TODO: Print result + NotifyForceFlushCompletion(was_force_flush_called); + + // Notify the thread which is waiting on shutdown to complete. + NotifyShutdownCompletion(); + return true; + }); + } } while (was_force_flush_called); + if (is_export_async_ == false) + { + NotifyForceFlushCompletion(was_force_flush_called); + } +} + +void BatchLogProcessor::NotifyForceFlushCompletion(const bool was_force_flush_called) +{ // Notify the main thread in case this export was the result of a force flush. if (was_force_flush_called == true) { @@ -185,11 +212,39 @@ void BatchLogProcessor::Export(const bool was_force_flush_called) } } +void BatchLogProcessor::WaitForShutdownCompletion() +{ + // Since async export is invoked due to shutdown, need to wait + // for async thread to complete. + if (is_export_async_) + { + std::unique_lock lk(async_shutdown_m_); + while (is_async_shutdown_notified_.load() == false) + { + async_shutdown_cv_.wait(lk); + } + } +} + +void BatchLogProcessor::NotifyShutdownCompletion() +{ + // Notify the thread which is waiting on shutdown to complete. + if (is_shutdown_.load() == true) + { + is_async_shutdown_notified_.store(true); + async_shutdown_cv_.notify_one(); + } +} + void BatchLogProcessor::DrainQueue() { while (buffer_.empty() == false) { Export(false); + + // Since async export is invoked due to shutdown, need to wait + // for async thread to complete. + WaitForShutdownCompletion(); } } diff --git a/sdk/src/logs/simple_log_processor.cc b/sdk/src/logs/simple_log_processor.cc index 6e2fde9f14..e16a5e631e 100644 --- a/sdk/src/logs/simple_log_processor.cc +++ b/sdk/src/logs/simple_log_processor.cc @@ -16,8 +16,9 @@ namespace logs * Initialize a simple log processor. * @param exporter the configured exporter where log records are sent */ -SimpleLogProcessor::SimpleLogProcessor(std::unique_ptr &&exporter) - : exporter_(std::move(exporter)) +SimpleLogProcessor::SimpleLogProcessor(std::unique_ptr &&exporter, + bool is_export_async) + : exporter_(std::move(exporter)), is_export_async_(is_export_async) {} std::unique_ptr SimpleLogProcessor::MakeRecordable() noexcept @@ -35,9 +36,20 @@ void SimpleLogProcessor::OnReceive(std::unique_ptr &&record) noexcep // Get lock to ensure Export() is never called concurrently const std::lock_guard locked(lock_); - if (exporter_->Export(batch) != sdk::common::ExportResult::kSuccess) + if (is_export_async_ == false) { - /* Alert user of the failed export */ + if (exporter_->Export(batch) != sdk::common::ExportResult::kSuccess) + { + /* Alert user of the failed export */ + } + } + else + { + exporter_->Export(batch, [](sdk::common::ExportResult result) { + /* Log the result + */ + return true; + }); } } /** diff --git a/sdk/src/trace/batch_span_processor.cc b/sdk/src/trace/batch_span_processor.cc index 387ad43ce3..a02e3fcaee 100644 --- a/sdk/src/trace/batch_span_processor.cc +++ b/sdk/src/trace/batch_span_processor.cc @@ -21,11 +21,13 @@ BatchSpanProcessor::BatchSpanProcessor(std::unique_ptr &&exporter, schedule_delay_millis_(options.schedule_delay_millis), max_export_batch_size_(options.max_export_batch_size), buffer_(max_queue_size_), + is_export_async_(options.is_export_async), worker_thread_(&BatchSpanProcessor::DoBackgroundWork, this) { is_shutdown_.store(false); is_force_flush_.store(false); is_force_flush_notified_.store(false); + is_async_shutdown_notified_.store(false); } std::unique_ptr BatchSpanProcessor::MakeRecordable() noexcept @@ -175,9 +177,36 @@ void BatchSpanProcessor::Export(const bool was_force_flush_called) }); }); - exporter_->Export(nostd::span>(spans_arr.data(), spans_arr.size())); + /* Call the sync Export when force flush was called, even if + is_export_async_ is true. + */ + if (is_export_async_ == false) + { + exporter_->Export( + nostd::span>(spans_arr.data(), spans_arr.size())); + } + else + { + exporter_->Export( + nostd::span>(spans_arr.data(), spans_arr.size()), + [this, was_force_flush_called](sdk::common::ExportResult result) { + // TODO: Print result + NotifyForceFlushCompletion(was_force_flush_called); + // If export was called due to shutdown, notify the worker thread + NotifyShutdownCompletion(); + return true; + }); + } } while (was_force_flush_called); + if (is_export_async_ == false) + { + NotifyForceFlushCompletion(was_force_flush_called); + } +} + +void BatchSpanProcessor::NotifyForceFlushCompletion(const bool was_force_flush_called) +{ // Notify the main thread in case this export was the result of a force flush. if (was_force_flush_called == true) { @@ -188,11 +217,36 @@ void BatchSpanProcessor::Export(const bool was_force_flush_called) } } +void BatchSpanProcessor::WaitForShutdownCompletion() +{ + // Since async export is invoked due to shutdown, need to wait + // for async thread to complete. + if (is_export_async_) + { + std::unique_lock lk(async_shutdown_m_); + while (is_async_shutdown_notified_.load() == false) + { + async_shutdown_cv_.wait(lk); + } + } +} + +void BatchSpanProcessor::NotifyShutdownCompletion() +{ + // Notify the thread which is waiting on shutdown to complete. + if (is_shutdown_.load() == true) + { + is_async_shutdown_notified_.store(true); + async_shutdown_cv_.notify_one(); + } +} + void BatchSpanProcessor::DrainQueue() { while (buffer_.empty() == false) { Export(false); + WaitForShutdownCompletion(); } } diff --git a/sdk/test/logs/batch_log_processor_test.cc b/sdk/test/logs/batch_log_processor_test.cc index df503cb2aa..55850bf414 100644 --- a/sdk/test/logs/batch_log_processor_test.cc +++ b/sdk/test/logs/batch_log_processor_test.cc @@ -55,6 +55,17 @@ class MockLogExporter final : public LogExporter return ExportResult::kSuccess; } + void Export( + const opentelemetry::nostd::span> &records, + opentelemetry::nostd::function_ref result_callback) noexcept override + { + auto th = std::thread([this, records, result_callback]() { + auto result = Export(records); + result_callback(result); + }); + th.join(); + } + // toggles the boolean flag marking this exporter as shut down bool Shutdown( std::chrono::microseconds timeout = std::chrono::microseconds::max()) noexcept override @@ -86,12 +97,13 @@ class BatchLogProcessorTest : public testing::Test // ::testing::Test const std::chrono::milliseconds export_delay = std::chrono::milliseconds(0), const std::chrono::milliseconds scheduled_delay_millis = std::chrono::milliseconds(5000), const size_t max_queue_size = 2048, - const size_t max_export_batch_size = 512) + const size_t max_export_batch_size = 512, + const bool is_export_async = false) { - return std::shared_ptr( - new BatchLogProcessor(std::unique_ptr(new MockLogExporter( - logs_received, is_shutdown, is_export_completed, export_delay)), - max_queue_size, scheduled_delay_millis, max_export_batch_size)); + return std::shared_ptr(new BatchLogProcessor( + std::unique_ptr( + new MockLogExporter(logs_received, is_shutdown, is_export_completed, export_delay)), + max_queue_size, scheduled_delay_millis, max_export_batch_size, is_export_async)); } }; @@ -133,6 +145,53 @@ TEST_F(BatchLogProcessorTest, TestShutdown) EXPECT_TRUE(is_shutdown->load()); } +TEST_F(BatchLogProcessorTest, TestAsyncShutdown) +{ + // initialize a batch log processor with the test exporter + std::shared_ptr>> logs_received( + new std::vector>); + std::shared_ptr> is_shutdown(new std::atomic(false)); + std::shared_ptr> is_export_completed(new std::atomic(false)); + + const std::chrono::milliseconds export_delay(0); + const std::chrono::milliseconds scheduled_delay_millis(5000); + const size_t max_export_batch_size = 512; + const size_t max_queue_size = 2048; + const bool is_export_async = true; + + auto batch_processor = GetMockProcessor(logs_received, is_shutdown, is_export_completed, + export_delay, scheduled_delay_millis, max_queue_size, + max_export_batch_size, is_export_async); + + // Create a few test log records and send them to the processor + const int num_logs = 3; + + for (int i = 0; i < num_logs; ++i) + { + auto log = batch_processor->MakeRecordable(); + log->SetName("Log" + std::to_string(i)); + batch_processor->OnReceive(std::move(log)); + } + + // Test that shutting down the processor will first wait for the + // current batch of logs to be sent to the log exporter + // by checking the number of logs sent and the names of the logs sent + EXPECT_EQ(true, batch_processor->Shutdown()); + // It's safe to shutdown again + EXPECT_TRUE(batch_processor->Shutdown()); + + EXPECT_EQ(num_logs, logs_received->size()); + + // Assume logs are received by exporter in same order as sent by processor + for (int i = 0; i < num_logs; ++i) + { + EXPECT_EQ("Log" + std::to_string(i), logs_received->at(i)->GetName()); + } + + // Also check that the processor is shut down at the end + EXPECT_TRUE(is_shutdown->load()); +} + TEST_F(BatchLogProcessorTest, TestForceFlush) { std::shared_ptr> is_shutdown(new std::atomic(false)); @@ -174,6 +233,57 @@ TEST_F(BatchLogProcessorTest, TestForceFlush) } } +TEST_F(BatchLogProcessorTest, TestAsyncForceFlush) +{ + std::shared_ptr> is_shutdown(new std::atomic(false)); + std::shared_ptr>> logs_received( + new std::vector>); + std::shared_ptr> is_export_completed(new std::atomic(false)); + + const std::chrono::milliseconds export_delay(0); + const std::chrono::milliseconds scheduled_delay_millis(5000); + const size_t max_export_batch_size = 512; + const size_t max_queue_size = 2048; + const bool is_export_async = true; + + auto batch_processor = GetMockProcessor(logs_received, is_shutdown, is_export_completed, + export_delay, scheduled_delay_millis, max_queue_size, + max_export_batch_size, is_export_async); + + const int num_logs = 2048; + + for (int i = 0; i < num_logs; ++i) + { + auto log = batch_processor->MakeRecordable(); + log->SetName("Log" + std::to_string(i)); + batch_processor->OnReceive(std::move(log)); + } + + EXPECT_TRUE(batch_processor->ForceFlush()); + + EXPECT_EQ(num_logs, logs_received->size()); + for (int i = 0; i < num_logs; ++i) + { + EXPECT_EQ("Log" + std::to_string(i), logs_received->at(i)->GetName()); + } + + // Create some more logs to make sure that the processor still works + for (int i = 0; i < num_logs; ++i) + { + auto log = batch_processor->MakeRecordable(); + log->SetName("Log" + std::to_string(i)); + batch_processor->OnReceive(std::move(log)); + } + + EXPECT_TRUE(batch_processor->ForceFlush()); + + EXPECT_EQ(num_logs * 2, logs_received->size()); + for (int i = 0; i < num_logs * 2; ++i) + { + EXPECT_EQ("Log" + std::to_string(i % num_logs), logs_received->at(i)->GetName()); + } +} + TEST_F(BatchLogProcessorTest, TestManyLogsLoss) { /* Test that when exporting more than max_queue_size logs, some are most likely lost*/ diff --git a/sdk/test/logs/simple_log_processor_test.cc b/sdk/test/logs/simple_log_processor_test.cc index 0bb6ba2667..8b8efa6636 100644 --- a/sdk/test/logs/simple_log_processor_test.cc +++ b/sdk/test/logs/simple_log_processor_test.cc @@ -53,6 +53,14 @@ class TestExporter final : public LogExporter return ExportResult::kSuccess; } + // Dummy Async Export implementation + void Export(const nostd::span> &records, + nostd::function_ref result_callback) noexcept override + { + auto result = Export(records); + result_callback(result); + } + // Increment the shutdown counter everytime this method is called bool Shutdown(std::chrono::microseconds timeout) noexcept override { @@ -137,6 +145,12 @@ class FailShutDownExporter final : public LogExporter return ExportResult::kSuccess; } + void Export(const nostd::span> &records, + nostd::function_ref result_callback) noexcept override + { + result_callback(ExportResult::kSuccess); + } + bool Shutdown(std::chrono::microseconds timeout) noexcept override { return false; } }; diff --git a/sdk/test/trace/batch_span_processor_test.cc b/sdk/test/trace/batch_span_processor_test.cc index 0e6f9c35aa..91561ac658 100644 --- a/sdk/test/trace/batch_span_processor_test.cc +++ b/sdk/test/trace/batch_span_processor_test.cc @@ -56,6 +56,17 @@ class MockSpanExporter final : public sdk::trace::SpanExporter return sdk::common::ExportResult::kSuccess; } + void Export( + const nostd::span> &spans, + nostd::function_ref result_callback) noexcept override + { + auto th = std::thread([this, spans, result_callback]() { + auto result = Export(spans); + result_callback(result); + }); + th.join(); + } + bool Shutdown( std::chrono::microseconds timeout = std::chrono::microseconds::max()) noexcept override { @@ -131,7 +142,95 @@ TEST_F(BatchSpanProcessorTestPeer, TestShutdown) EXPECT_TRUE(is_shutdown->load()); } +TEST_F(BatchSpanProcessorTestPeer, TestAsyncShutdown) +{ + std::shared_ptr> is_shutdown(new std::atomic(false)); + std::shared_ptr>> spans_received( + new std::vector>); + + sdk::trace::BatchSpanProcessorOptions options{}; + options.is_export_async = true; + + auto batch_processor = + std::shared_ptr(new sdk::trace::BatchSpanProcessor( + std::unique_ptr(new MockSpanExporter(spans_received, is_shutdown)), + options)); + const int num_spans = 3; + + auto test_spans = GetTestSpans(batch_processor, num_spans); + + for (int i = 0; i < num_spans; ++i) + { + batch_processor->OnEnd(std::move(test_spans->at(i))); + } + + EXPECT_TRUE(batch_processor->Shutdown()); + // It's safe to shutdown again + EXPECT_TRUE(batch_processor->Shutdown()); + + EXPECT_EQ(num_spans, spans_received->size()); + for (int i = 0; i < num_spans; ++i) + { + EXPECT_EQ("Span " + std::to_string(i), spans_received->at(i)->GetName()); + } + + EXPECT_TRUE(is_shutdown->load()); +} + TEST_F(BatchSpanProcessorTestPeer, TestForceFlush) +{ + std::shared_ptr> is_shutdown(new std::atomic(false)); + std::shared_ptr>> spans_received( + new std::vector>); + + sdk::trace::BatchSpanProcessorOptions options{}; + options.is_export_async = true; + + auto batch_processor = + std::shared_ptr(new sdk::trace::BatchSpanProcessor( + std::unique_ptr(new MockSpanExporter(spans_received, is_shutdown)), + options)); + const int num_spans = 2048; + + auto test_spans = GetTestSpans(batch_processor, num_spans); + + for (int i = 0; i < num_spans; ++i) + { + batch_processor->OnEnd(std::move(test_spans->at(i))); + } + + // Give some time to export + std::this_thread::sleep_for(std::chrono::milliseconds(50)); + + EXPECT_TRUE(batch_processor->ForceFlush()); + + EXPECT_EQ(num_spans, spans_received->size()); + for (int i = 0; i < num_spans; ++i) + { + EXPECT_EQ("Span " + std::to_string(i), spans_received->at(i)->GetName()); + } + + // Create some more spans to make sure that the processor still works + auto more_test_spans = GetTestSpans(batch_processor, num_spans); + for (int i = 0; i < num_spans; ++i) + { + batch_processor->OnEnd(std::move(more_test_spans->at(i))); + } + + // Give some time to export the spans + std::this_thread::sleep_for(std::chrono::milliseconds(50)); + + EXPECT_TRUE(batch_processor->ForceFlush()); + + EXPECT_EQ(num_spans * 2, spans_received->size()); + for (int i = 0; i < num_spans; ++i) + { + EXPECT_EQ("Span " + std::to_string(i % num_spans), + spans_received->at(num_spans + i)->GetName()); + } +} + +TEST_F(BatchSpanProcessorTestPeer, TestAsyncForceFlush) { std::shared_ptr> is_shutdown(new std::atomic(false)); std::shared_ptr>> spans_received( diff --git a/sdk/test/trace/simple_processor_test.cc b/sdk/test/trace/simple_processor_test.cc index 9398b922a5..b8bad5962d 100644 --- a/sdk/test/trace/simple_processor_test.cc +++ b/sdk/test/trace/simple_processor_test.cc @@ -51,6 +51,13 @@ class RecordShutdownExporter final : public SpanExporter return ExportResult::kSuccess; } + void Export(const opentelemetry::nostd::span> &spans, + opentelemetry::nostd::function_ref + result_callback) noexcept override + { + result_callback(ExportResult::kSuccess); + } + bool Shutdown( std::chrono::microseconds timeout = std::chrono::microseconds::max()) noexcept override {