From ad3bdfe4053051bdeb5788e3fa4457cb22a27851 Mon Sep 17 00:00:00 2001 From: DEBAJIT DAS <85024550+DebajitDas@users.noreply.github.com> Date: Thu, 31 Mar 2022 09:56:24 +0530 Subject: [PATCH] Added feature flag for asynchronous export (#1295) --- CMakeLists.txt | 6 +++++ api/CMakeLists.txt | 4 ++++ .../exporters/elasticsearch/es_log_exporter.h | 2 ++ .../elasticsearch/src/es_log_exporter.cc | 4 ++++ .../exporters/jaeger/jaeger_exporter.h | 2 ++ exporters/jaeger/src/jaeger_exporter.cc | 2 ++ .../memory/in_memory_span_exporter.h | 2 ++ .../exporters/ostream/log_exporter.h | 2 ++ .../exporters/ostream/span_exporter.h | 2 ++ exporters/ostream/src/log_exporter.cc | 2 ++ exporters/ostream/src/span_exporter.cc | 2 ++ .../exporters/otlp/otlp_grpc_exporter.h | 2 ++ .../exporters/otlp/otlp_grpc_log_exporter.h | 2 ++ .../exporters/otlp/otlp_http_exporter.h | 4 ++++ .../exporters/otlp/otlp_http_log_exporter.h | 4 ++++ exporters/otlp/src/otlp_grpc_exporter.cc | 2 ++ exporters/otlp/src/otlp_grpc_log_exporter.cc | 2 ++ exporters/otlp/src/otlp_http_exporter.cc | 10 +++++++-- exporters/otlp/src/otlp_http_log_exporter.cc | 10 +++++++-- .../otlp/test/otlp_http_exporter_test.cc | 15 +++++++++---- .../otlp/test/otlp_http_log_exporter_test.cc | 8 ++++++- .../exporters/zipkin/zipkin_exporter.h | 2 ++ exporters/zipkin/src/zipkin_exporter.cc | 2 ++ .../sdk/logs/batch_log_processor.h | 10 ++++++--- sdk/include/opentelemetry/sdk/logs/exporter.h | 2 ++ .../sdk/logs/simple_log_processor.h | 13 ++++++++--- .../sdk/trace/batch_span_processor.h | 6 +++++ .../opentelemetry/sdk/trace/exporter.h | 2 ++ .../sdk/trace/simple_processor.h | 22 +++++++++++++++---- sdk/src/logs/batch_log_processor.cc | 12 ++++++++++ sdk/src/logs/simple_log_processor.cc | 18 ++++++++++++--- sdk/src/trace/batch_span_processor.cc | 13 ++++++++--- sdk/test/logs/batch_log_processor_test.cc | 6 +++++ sdk/test/logs/simple_log_processor_test.cc | 5 ++++- sdk/test/trace/batch_span_processor_test.cc | 4 ++++ sdk/test/trace/simple_processor_test.cc | 2 ++ 36 files changed, 182 insertions(+), 26 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 5573d156e8..efc63249e1 100755 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -226,6 +226,12 @@ if(WITH_LOGS_PREVIEW) add_definitions(-DENABLE_LOGS_PREVIEW) endif() +option(WITH_ASYNC_EXPORT_PREVIEW "Whether enable async export" OFF) + +if(WITH_ASYNC_EXPORT_PREVIEW) + add_definitions(-DENABLE_ASYNC_EXPORT) +endif() + find_package(Threads) function(install_windows_deps) diff --git a/api/CMakeLists.txt b/api/CMakeLists.txt index adea77ca2d..17ad746fff 100644 --- a/api/CMakeLists.txt +++ b/api/CMakeLists.txt @@ -63,3 +63,7 @@ endif() if(CORE_RUNTIME_LIBS) target_link_libraries(opentelemetry_api INTERFACE ${CORE_RUNTIME_LIBS}) endif() + +if(WITH_ASYNC_EXPORT_PREVIEW) + target_compile_definitions(opentelemetry_api INTERFACE ENABLE_ASYNC_EXPORT) +endif() diff --git a/exporters/elasticsearch/include/opentelemetry/exporters/elasticsearch/es_log_exporter.h b/exporters/elasticsearch/include/opentelemetry/exporters/elasticsearch/es_log_exporter.h index 88de284831..a5c6623228 100644 --- a/exporters/elasticsearch/include/opentelemetry/exporters/elasticsearch/es_log_exporter.h +++ b/exporters/elasticsearch/include/opentelemetry/exporters/elasticsearch/es_log_exporter.h @@ -89,6 +89,7 @@ class ElasticsearchLogExporter final : public opentelemetry::sdk::logs::LogExpor const opentelemetry::nostd::span> &records) noexcept override; +# ifdef ENABLE_ASYNC_EXPORT /** * Exports a vector of log records to the Elasticsearch instance asynchronously. * @param records A list of log records to send to Elasticsearch. @@ -99,6 +100,7 @@ class ElasticsearchLogExporter final : public opentelemetry::sdk::logs::LogExpor &records, std::function &&result_callback) noexcept override; +# endif /** * Shutdown this exporter. diff --git a/exporters/elasticsearch/src/es_log_exporter.cc b/exporters/elasticsearch/src/es_log_exporter.cc index 6a511d197f..1abe4ca00e 100644 --- a/exporters/elasticsearch/src/es_log_exporter.cc +++ b/exporters/elasticsearch/src/es_log_exporter.cc @@ -110,6 +110,7 @@ class ResponseHandler : public http_client::EventHandler bool console_debug_ = false; }; +# ifdef ENABLE_ASYNC_EXPORT /** * This class handles the async response message from the Elasticsearch request */ @@ -192,6 +193,7 @@ class AsyncResponseHandler : public http_client::EventHandler // Whether to print the results from the callback bool console_debug_ = false; }; +# endif ElasticsearchLogExporter::ElasticsearchLogExporter() : options_{ElasticsearchExporterOptions()}, @@ -281,6 +283,7 @@ sdk::common::ExportResult ElasticsearchLogExporter::Export( return sdk::common::ExportResult::kSuccess; } +# ifdef ENABLE_ASYNC_EXPORT void ElasticsearchLogExporter::Export( const opentelemetry::nostd::span> &records, @@ -325,6 +328,7 @@ void ElasticsearchLogExporter::Export( options_.console_debug_); session->SendRequest(handler); } +# endif bool ElasticsearchLogExporter::Shutdown(std::chrono::microseconds timeout) noexcept { diff --git a/exporters/jaeger/include/opentelemetry/exporters/jaeger/jaeger_exporter.h b/exporters/jaeger/include/opentelemetry/exporters/jaeger/jaeger_exporter.h index c6d057b4ba..85003689c5 100644 --- a/exporters/jaeger/include/opentelemetry/exporters/jaeger/jaeger_exporter.h +++ b/exporters/jaeger/include/opentelemetry/exporters/jaeger/jaeger_exporter.h @@ -61,6 +61,7 @@ class JaegerExporter final : public opentelemetry::sdk::trace::SpanExporter const nostd::span> &spans) noexcept override; +#ifdef ENABLE_ASYNC_EXPORT /** * Exports a batch of span recordables asynchronously. * @param spans a span of unique pointers to span recordables @@ -69,6 +70,7 @@ class JaegerExporter final : public opentelemetry::sdk::trace::SpanExporter void Export(const nostd::span> &spans, std::function &&result_callback) noexcept override; +#endif /** * Shutdown the exporter. diff --git a/exporters/jaeger/src/jaeger_exporter.cc b/exporters/jaeger/src/jaeger_exporter.cc index f35b5bccd7..b77b445100 100644 --- a/exporters/jaeger/src/jaeger_exporter.cc +++ b/exporters/jaeger/src/jaeger_exporter.cc @@ -70,6 +70,7 @@ sdk_common::ExportResult JaegerExporter::Export( return sdk_common::ExportResult::kSuccess; } +#ifdef ENABLE_ASYNC_EXPORT void JaegerExporter::Export( const nostd::span> &spans, std::function &&result_callback) noexcept @@ -78,6 +79,7 @@ void JaegerExporter::Export( auto status = Export(spans); result_callback(status); } +#endif void JaegerExporter::InitializeEndpoint() { diff --git a/exporters/memory/include/opentelemetry/exporters/memory/in_memory_span_exporter.h b/exporters/memory/include/opentelemetry/exporters/memory/in_memory_span_exporter.h index ee9d9e9876..8d620d7ee6 100644 --- a/exporters/memory/include/opentelemetry/exporters/memory/in_memory_span_exporter.h +++ b/exporters/memory/include/opentelemetry/exporters/memory/in_memory_span_exporter.h @@ -64,6 +64,7 @@ class InMemorySpanExporter final : public opentelemetry::sdk::trace::SpanExporte return sdk::common::ExportResult::kSuccess; } +#ifdef ENABLE_ASYNC_EXPORT /** * Exports a batch of span recordables asynchronously. * @param spans a span of unique pointers to span recordables @@ -77,6 +78,7 @@ class InMemorySpanExporter final : public opentelemetry::sdk::trace::SpanExporte auto status = Export(spans); result_callback(status); } +#endif /** * @param timeout an optional value containing the timeout of the exporter diff --git a/exporters/ostream/include/opentelemetry/exporters/ostream/log_exporter.h b/exporters/ostream/include/opentelemetry/exporters/ostream/log_exporter.h index deb8645926..65969a40a9 100644 --- a/exporters/ostream/include/opentelemetry/exporters/ostream/log_exporter.h +++ b/exporters/ostream/include/opentelemetry/exporters/ostream/log_exporter.h @@ -39,12 +39,14 @@ class OStreamLogExporter final : public opentelemetry::sdk::logs::LogExporter const opentelemetry::nostd::span> &records) noexcept override; +# ifdef ENABLE_ASYNC_EXPORT /** * Exports a span of logs sent from the processor asynchronously. */ void Export( const opentelemetry::nostd::span> &records, std::function &&result_callback) noexcept; +# endif /** * Marks the OStream Log Exporter as shut down. diff --git a/exporters/ostream/include/opentelemetry/exporters/ostream/span_exporter.h b/exporters/ostream/include/opentelemetry/exporters/ostream/span_exporter.h index 0044a29e47..334c4d6419 100644 --- a/exporters/ostream/include/opentelemetry/exporters/ostream/span_exporter.h +++ b/exporters/ostream/include/opentelemetry/exporters/ostream/span_exporter.h @@ -38,11 +38,13 @@ class OStreamSpanExporter final : public opentelemetry::sdk::trace::SpanExporter const opentelemetry::nostd::span> &spans) noexcept override; +#ifdef ENABLE_ASYNC_EXPORT void Export( const opentelemetry::nostd::span> &spans, std::function &&result_callback) noexcept override; +#endif bool Shutdown( std::chrono::microseconds timeout = std::chrono::microseconds::max()) noexcept override; diff --git a/exporters/ostream/src/log_exporter.cc b/exporters/ostream/src/log_exporter.cc index 46f88b46d4..52d42cc5c4 100644 --- a/exporters/ostream/src/log_exporter.cc +++ b/exporters/ostream/src/log_exporter.cc @@ -180,6 +180,7 @@ sdk::common::ExportResult OStreamLogExporter::Export( return sdk::common::ExportResult::kSuccess; } +# ifdef ENABLE_ASYNC_EXPORT void OStreamLogExporter::Export( const opentelemetry::nostd::span> &records, std::function &&result_callback) noexcept @@ -188,6 +189,7 @@ void OStreamLogExporter::Export( auto result = Export(records); result_callback(result); } +# endif bool OStreamLogExporter::Shutdown(std::chrono::microseconds) noexcept { diff --git a/exporters/ostream/src/span_exporter.cc b/exporters/ostream/src/span_exporter.cc index d566a66f93..67c1a51a4b 100644 --- a/exporters/ostream/src/span_exporter.cc +++ b/exporters/ostream/src/span_exporter.cc @@ -96,6 +96,7 @@ sdk::common::ExportResult OStreamSpanExporter::Export( return sdk::common::ExportResult::kSuccess; } +#ifdef ENABLE_ASYNC_EXPORT void OStreamSpanExporter::Export( const opentelemetry::nostd::span> &spans, std::function &&result_callback) noexcept @@ -103,6 +104,7 @@ void OStreamSpanExporter::Export( auto result = Export(spans); result_callback(result); } +#endif bool OStreamSpanExporter::Shutdown(std::chrono::microseconds timeout) noexcept { diff --git a/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_grpc_exporter.h b/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_grpc_exporter.h index df02f2e215..4660903595 100644 --- a/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_grpc_exporter.h +++ b/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_grpc_exporter.h @@ -52,6 +52,7 @@ class OtlpGrpcExporter final : public opentelemetry::sdk::trace::SpanExporter sdk::common::ExportResult Export( const nostd::span> &spans) noexcept override; +#ifdef ENABLE_ASYNC_EXPORT /** * Exports a batch of span recordables asynchronously. * @param spans a span of unique pointers to span recordables @@ -60,6 +61,7 @@ class OtlpGrpcExporter final : public opentelemetry::sdk::trace::SpanExporter virtual void Export(const nostd::span> &spans, std::function &&result_callback) noexcept override; +#endif /** * Shut down the exporter. diff --git a/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_grpc_log_exporter.h b/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_grpc_log_exporter.h index 13aba881bc..5652a21f72 100644 --- a/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_grpc_log_exporter.h +++ b/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_grpc_log_exporter.h @@ -55,6 +55,7 @@ class OtlpGrpcLogExporter : public opentelemetry::sdk::logs::LogExporter const nostd::span> &records) noexcept override; +# ifdef ENABLE_ASYNC_EXPORT /** * Exports a vector of log records asynchronously. * @param records A list of log records. @@ -64,6 +65,7 @@ class OtlpGrpcLogExporter : public opentelemetry::sdk::logs::LogExporter const nostd::span> &records, std::function &&result_callback) noexcept override; +# endif /** * Shutdown this exporter. diff --git a/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_http_exporter.h b/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_http_exporter.h index 8884c6c307..a2f9eb3176 100644 --- a/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_http_exporter.h +++ b/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_http_exporter.h @@ -52,9 +52,11 @@ struct OtlpHttpExporterOptions // Additional HTTP headers OtlpHeaders http_headers = GetOtlpDefaultHeaders(); +#ifdef ENABLE_ASYNC_EXPORT // Concurrent requests // https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/protocol/otlp.md#otlpgrpc-concurrent-requests std::size_t max_concurrent_requests = 8; +#endif }; /** @@ -87,6 +89,7 @@ class OtlpHttpExporter final : public opentelemetry::sdk::trace::SpanExporter const nostd::span> &spans) noexcept override; +#ifdef ENABLE_ASYNC_EXPORT /** * Exports a batch of span recordables asynchronously. * @param spans a span of unique pointers to span recordables @@ -96,6 +99,7 @@ class OtlpHttpExporter final : public opentelemetry::sdk::trace::SpanExporter const nostd::span> &spans, std::function &&result_callback) noexcept override; +#endif /** * Shut down the exporter. diff --git a/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_http_log_exporter.h b/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_http_log_exporter.h index aecacb2686..2bd8697076 100644 --- a/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_http_log_exporter.h +++ b/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_http_log_exporter.h @@ -52,9 +52,11 @@ struct OtlpHttpLogExporterOptions // Additional HTTP headers OtlpHeaders http_headers = GetOtlpDefaultLogHeaders(); +# ifdef ENABLE_ASYNC_EXPORT // Concurrent requests // https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/protocol/otlp.md#otlpgrpc-concurrent-requests std::size_t max_concurrent_requests = 8; +# endif }; /** @@ -88,6 +90,7 @@ class OtlpHttpLogExporter final : public opentelemetry::sdk::logs::LogExporter const nostd::span> &records) noexcept override; +# ifdef ENABLE_ASYNC_EXPORT /** * Exports a vector of log records asynchronously. * @param records A list of log records. @@ -97,6 +100,7 @@ class OtlpHttpLogExporter final : public opentelemetry::sdk::logs::LogExporter const nostd::span> &records, std::function &&result_callback) noexcept override; +# endif /** * Shutdown this exporter. diff --git a/exporters/otlp/src/otlp_grpc_exporter.cc b/exporters/otlp/src/otlp_grpc_exporter.cc index 2192533f70..c7734ad3e2 100644 --- a/exporters/otlp/src/otlp_grpc_exporter.cc +++ b/exporters/otlp/src/otlp_grpc_exporter.cc @@ -143,6 +143,7 @@ sdk::common::ExportResult OtlpGrpcExporter::Export( return sdk::common::ExportResult::kSuccess; } +#ifdef ENABLE_ASYNC_EXPORT void OtlpGrpcExporter::Export( const nostd::span> &spans, std::function &&result_callback) noexcept @@ -152,6 +153,7 @@ void OtlpGrpcExporter::Export( auto status = Export(spans); result_callback(status); } +#endif bool OtlpGrpcExporter::Shutdown(std::chrono::microseconds timeout) noexcept { diff --git a/exporters/otlp/src/otlp_grpc_log_exporter.cc b/exporters/otlp/src/otlp_grpc_log_exporter.cc index 06ee5de7ba..e506cde0c5 100644 --- a/exporters/otlp/src/otlp_grpc_log_exporter.cc +++ b/exporters/otlp/src/otlp_grpc_log_exporter.cc @@ -161,6 +161,7 @@ opentelemetry::sdk::common::ExportResult OtlpGrpcLogExporter::Export( return sdk::common::ExportResult::kSuccess; } +# ifdef ENABLE_ASYNC_EXPORT void OtlpGrpcLogExporter::Export( const nostd::span> &logs, std::function &&result_callback) noexcept @@ -170,6 +171,7 @@ void OtlpGrpcLogExporter::Export( auto status = Export(logs); result_callback(status); } +# endif bool OtlpGrpcLogExporter::Shutdown(std::chrono::microseconds timeout) noexcept { diff --git a/exporters/otlp/src/otlp_http_exporter.cc b/exporters/otlp/src/otlp_http_exporter.cc index a3c3354997..8f51165cc8 100644 --- a/exporters/otlp/src/otlp_http_exporter.cc +++ b/exporters/otlp/src/otlp_http_exporter.cc @@ -31,8 +31,12 @@ OtlpHttpExporter::OtlpHttpExporter(const OtlpHttpExporterOptions &options) options.use_json_name, options.console_debug, options.timeout, - options.http_headers, - options.max_concurrent_requests))) + options.http_headers +#ifdef ENABLE_ASYNC_EXPORT + , + options.max_concurrent_requests +#endif + ))) {} OtlpHttpExporter::OtlpHttpExporter(std::unique_ptr http_client) @@ -59,6 +63,7 @@ opentelemetry::sdk::common::ExportResult OtlpHttpExporter::Export( return http_client_->Export(service_request); } +#ifdef ENABLE_ASYNC_EXPORT void OtlpHttpExporter::Export( const nostd::span> &spans, std::function &&result_callback) noexcept @@ -72,6 +77,7 @@ void OtlpHttpExporter::Export( OtlpRecordableUtils::PopulateRequest(spans, &service_request); http_client_->Export(service_request, std::move(result_callback)); } +#endif bool OtlpHttpExporter::Shutdown(std::chrono::microseconds timeout) noexcept { diff --git a/exporters/otlp/src/otlp_http_log_exporter.cc b/exporters/otlp/src/otlp_http_log_exporter.cc index c66ab228f2..2e377699ed 100644 --- a/exporters/otlp/src/otlp_http_log_exporter.cc +++ b/exporters/otlp/src/otlp_http_log_exporter.cc @@ -33,8 +33,12 @@ OtlpHttpLogExporter::OtlpHttpLogExporter(const OtlpHttpLogExporterOptions &optio options.use_json_name, options.console_debug, options.timeout, - options.http_headers, - options.max_concurrent_requests))) + options.http_headers +# ifdef ENABLE_ASYNC_EXPORT + , + options.max_concurrent_requests +# endif + ))) {} OtlpHttpLogExporter::OtlpHttpLogExporter(std::unique_ptr http_client) @@ -60,6 +64,7 @@ opentelemetry::sdk::common::ExportResult OtlpHttpLogExporter::Export( return http_client_->Export(service_request); } +# ifdef ENABLE_ASYNC_EXPORT void OtlpHttpLogExporter::Export( const nostd::span> &logs, std::function &&result_callback) noexcept @@ -72,6 +77,7 @@ void OtlpHttpLogExporter::Export( OtlpRecordableUtils::PopulateRequest(logs, &service_request); http_client_->Export(service_request, std::move(result_callback)); } +# endif bool OtlpHttpLogExporter::Shutdown(std::chrono::microseconds timeout) noexcept { diff --git a/exporters/otlp/test/otlp_http_exporter_test.cc b/exporters/otlp/test/otlp_http_exporter_test.cc index 8f38685963..9d7cd2a5ce 100644 --- a/exporters/otlp/test/otlp_http_exporter_test.cc +++ b/exporters/otlp/test/otlp_http_exporter_test.cc @@ -114,8 +114,10 @@ class OtlpHttpExporterTestPeer : public ::testing::Test processor_opts.max_export_batch_size = 5; processor_opts.max_queue_size = 5; processor_opts.schedule_delay_millis = std::chrono::milliseconds(256); - processor_opts.is_export_async = is_async; - auto processor = std::unique_ptr( +# ifdef ENABLE_ASYNC_EXPORT + processor_opts.is_export_async = is_async; +# endif + auto processor = std::unique_ptr( new sdk::trace::BatchSpanProcessor(std::move(exporter), processor_opts)); auto provider = nostd::shared_ptr( new sdk::trace::TracerProvider(std::move(processor), resource)); @@ -210,8 +212,9 @@ class OtlpHttpExporterTestPeer : public ::testing::Test processor_opts.max_export_batch_size = 5; processor_opts.max_queue_size = 5; processor_opts.schedule_delay_millis = std::chrono::milliseconds(256); - processor_opts.is_export_async = is_async; - +# ifdef ENABLE_ASYNC_EXPORT + processor_opts.is_export_async = is_async; +# endif auto processor = std::unique_ptr( new sdk::trace::BatchSpanProcessor(std::move(exporter), processor_opts)); auto provider = nostd::shared_ptr( @@ -282,10 +285,12 @@ TEST_F(OtlpHttpExporterTestPeer, ExportJsonIntegrationTestSync) ExportJsonIntegrationTest(false); } +# ifdef ENABLE_ASYNC_EXPORT TEST_F(OtlpHttpExporterTestPeer, ExportJsonIntegrationTestAsync) { ExportJsonIntegrationTest(true); } +# endif // Create spans, let processor call Export() TEST_F(OtlpHttpExporterTestPeer, ExportBinaryIntegrationTestSync) @@ -293,10 +298,12 @@ TEST_F(OtlpHttpExporterTestPeer, ExportBinaryIntegrationTestSync) ExportBinaryIntegrationTest(false); } +# ifdef ENABLE_ASYNC_EXPORT TEST_F(OtlpHttpExporterTestPeer, ExportBinaryIntegrationTestAsync) { ExportBinaryIntegrationTest(true); } +# endif // Test exporter configuration options TEST_F(OtlpHttpExporterTestPeer, ConfigTest) diff --git a/exporters/otlp/test/otlp_http_log_exporter_test.cc b/exporters/otlp/test/otlp_http_log_exporter_test.cc index 50034ed12d..17723f2ac1 100644 --- a/exporters/otlp/test/otlp_http_log_exporter_test.cc +++ b/exporters/otlp/test/otlp_http_log_exporter_test.cc @@ -213,7 +213,9 @@ class OtlpHttpLogExporterTestPeer : public ::testing::Test processor_options.max_export_batch_size = 5; processor_options.max_queue_size = 5; processor_options.schedule_delay_millis = std::chrono::milliseconds(256); - processor_options.is_export_async = is_async; +# ifdef ENABLE_ASYNC_EXPORT + processor_options.is_export_async = is_async; +# endif auto provider = nostd::shared_ptr(new sdk::logs::LoggerProvider()); provider->AddProcessor(std::unique_ptr( new sdk::logs::BatchLogProcessor(std::move(exporter), processor_options))); @@ -306,10 +308,12 @@ TEST_F(OtlpHttpLogExporterTestPeer, ExportJsonIntegrationTestSync) ExportJsonIntegrationTest(false); } +# ifdef ENABLE_ASYNC_EXPORT TEST_F(OtlpHttpLogExporterTestPeer, ExportJsonIntegrationTestAsync) { ExportJsonIntegrationTest(true); } +# endif // Create log records, let processor call Export() TEST_F(OtlpHttpLogExporterTestPeer, ExportBinaryIntegrationTestSync) @@ -317,10 +321,12 @@ TEST_F(OtlpHttpLogExporterTestPeer, ExportBinaryIntegrationTestSync) ExportBinaryIntegrationTest(false); } +# ifdef ENABLE_ASYNC_EXPORT TEST_F(OtlpHttpLogExporterTestPeer, ExportBinaryIntegrationTestAsync) { ExportBinaryIntegrationTest(true); } +# endif // Test exporter configuration options TEST_F(OtlpHttpLogExporterTestPeer, ConfigTest) diff --git a/exporters/zipkin/include/opentelemetry/exporters/zipkin/zipkin_exporter.h b/exporters/zipkin/include/opentelemetry/exporters/zipkin/zipkin_exporter.h index 28f09deaba..f730f1e111 100644 --- a/exporters/zipkin/include/opentelemetry/exporters/zipkin/zipkin_exporter.h +++ b/exporters/zipkin/include/opentelemetry/exporters/zipkin/zipkin_exporter.h @@ -78,6 +78,7 @@ class ZipkinExporter final : public opentelemetry::sdk::trace::SpanExporter const nostd::span> &spans) noexcept override; +#ifdef ENABLE_ASYNC_EXPORT /** * Export asynchronosly a batch of span recordables in JSON format * @param spans a span of unique pointers to span recordables @@ -86,6 +87,7 @@ class ZipkinExporter final : public opentelemetry::sdk::trace::SpanExporter void Export(const nostd::span> &spans, std::function &&result_callback) noexcept override; +#endif /** * Shut down the exporter. diff --git a/exporters/zipkin/src/zipkin_exporter.cc b/exporters/zipkin/src/zipkin_exporter.cc index 202c048cca..92a6bfc37f 100644 --- a/exporters/zipkin/src/zipkin_exporter.cc +++ b/exporters/zipkin/src/zipkin_exporter.cc @@ -93,6 +93,7 @@ sdk::common::ExportResult ZipkinExporter::Export( return sdk::common::ExportResult::kSuccess; } +#ifdef ENABLE_ASYNC_EXPORT void ZipkinExporter::Export( const nostd::span> &spans, std::function &&result_callback) noexcept @@ -101,6 +102,7 @@ void ZipkinExporter::Export( auto status = Export(spans); result_callback(status); } +#endif void ZipkinExporter::InitializeLocalEndpoint() { diff --git a/sdk/include/opentelemetry/sdk/logs/batch_log_processor.h b/sdk/include/opentelemetry/sdk/logs/batch_log_processor.h index 9a8448a119..d93a9d3294 100644 --- a/sdk/include/opentelemetry/sdk/logs/batch_log_processor.h +++ b/sdk/include/opentelemetry/sdk/logs/batch_log_processor.h @@ -41,11 +41,13 @@ struct BatchLogProcessorOptions */ size_t max_export_batch_size = 512; +# ifdef ENABLE_ASYNC_EXPORT /** * Determines whether the export happens asynchronously. * Default implementation is synchronous. */ bool is_export_async = false; +# endif }; /** @@ -134,9 +136,10 @@ class BatchLogProcessor : public LogProcessor */ void DrainQueue(); +# ifdef ENABLE_ASYNC_EXPORT /* In case of async export, wait and notify for shutdown to be completed.*/ void WaitForShutdownCompletion(); - +# endif struct SynchronizationData { /* Synchronization primitives */ @@ -168,8 +171,9 @@ class BatchLogProcessor : public LogProcessor const size_t max_queue_size_; const std::chrono::milliseconds scheduled_delay_millis_; const size_t max_export_batch_size_; +# ifdef ENABLE_ASYNC_EXPORT const bool is_export_async_; - +# endif /* The buffer/queue to which the ended logs are added */ common::CircularBuffer buffer_; @@ -182,4 +186,4 @@ class BatchLogProcessor : public LogProcessor } // namespace logs } // namespace sdk OPENTELEMETRY_END_NAMESPACE -#endif \ No newline at end of file +#endif diff --git a/sdk/include/opentelemetry/sdk/logs/exporter.h b/sdk/include/opentelemetry/sdk/logs/exporter.h index ee3bac92b4..5c28d73538 100644 --- a/sdk/include/opentelemetry/sdk/logs/exporter.h +++ b/sdk/include/opentelemetry/sdk/logs/exporter.h @@ -46,6 +46,7 @@ class LogExporter virtual sdk::common::ExportResult Export( const nostd::span> &records) noexcept = 0; +# ifdef ENABLE_ASYNC_EXPORT /** * Exports asynchronously the batch of log records to their export destination * @param records a span of unique pointers to log records @@ -54,6 +55,7 @@ class LogExporter virtual void Export( const nostd::span> &records, std::function &&result_callback) noexcept = 0; +# endif /** * Marks the exporter as ShutDown and cleans up any resources as required. diff --git a/sdk/include/opentelemetry/sdk/logs/simple_log_processor.h b/sdk/include/opentelemetry/sdk/logs/simple_log_processor.h index 28fcca78a6..a92380cd01 100644 --- a/sdk/include/opentelemetry/sdk/logs/simple_log_processor.h +++ b/sdk/include/opentelemetry/sdk/logs/simple_log_processor.h @@ -28,8 +28,12 @@ class SimpleLogProcessor : public LogProcessor { public: - explicit SimpleLogProcessor(std::unique_ptr &&exporter, - bool is_export_async = false); + explicit SimpleLogProcessor(std::unique_ptr &&exporter +# ifdef ENABLE_ASYNC_EXPORT + , + bool is_export_async = false +# endif + ); virtual ~SimpleLogProcessor() = default; std::unique_ptr MakeRecordable() noexcept override; @@ -49,7 +53,10 @@ class SimpleLogProcessor : public LogProcessor opentelemetry::common::SpinLockMutex lock_; // The atomic boolean flag to ensure the ShutDown() function is only called once std::atomic_flag shutdown_latch_ = ATOMIC_FLAG_INIT; - bool is_export_async_ = false; + +# ifdef ENABLE_ASYNC_EXPORT + bool is_export_async_ = false; +# endif }; } // namespace logs } // namespace sdk diff --git a/sdk/include/opentelemetry/sdk/trace/batch_span_processor.h b/sdk/include/opentelemetry/sdk/trace/batch_span_processor.h index 4233653d93..cbd0cddfbb 100644 --- a/sdk/include/opentelemetry/sdk/trace/batch_span_processor.h +++ b/sdk/include/opentelemetry/sdk/trace/batch_span_processor.h @@ -38,11 +38,13 @@ struct BatchSpanProcessorOptions */ size_t max_export_batch_size = 512; +#ifdef ENABLE_ASYNC_EXPORT /** * Determines whether the export happens asynchronously. * Default implementation is synchronous. */ bool is_export_async = false; +#endif }; /** @@ -131,8 +133,10 @@ class BatchSpanProcessor : public SpanProcessor */ void DrainQueue(); +#ifdef ENABLE_ASYNC_EXPORT /* In case of async export, wait and notify for shutdown to be completed.*/ void WaitForShutdownCompletion(); +#endif struct SynchronizationData { @@ -165,7 +169,9 @@ class BatchSpanProcessor : public SpanProcessor const size_t max_queue_size_; const std::chrono::milliseconds schedule_delay_millis_; const size_t max_export_batch_size_; +#ifdef ENABLE_ASYNC_EXPORT const bool is_export_async_; +#endif /* The buffer/queue to which the ended spans are added */ common::CircularBuffer buffer_; diff --git a/sdk/include/opentelemetry/sdk/trace/exporter.h b/sdk/include/opentelemetry/sdk/trace/exporter.h index 749fd897fb..b58bbc8717 100644 --- a/sdk/include/opentelemetry/sdk/trace/exporter.h +++ b/sdk/include/opentelemetry/sdk/trace/exporter.h @@ -42,6 +42,7 @@ class SpanExporter const nostd::span> &spans) noexcept = 0; +#ifdef ENABLE_ASYNC_EXPORT /** * Exports a batch of span recordables asynchronously. * @param spans a span of unique pointers to span recordables @@ -50,6 +51,7 @@ class SpanExporter virtual void Export( const nostd::span> &spans, std::function &&result_callback) noexcept = 0; +#endif /** * Shut down the exporter. diff --git a/sdk/include/opentelemetry/sdk/trace/simple_processor.h b/sdk/include/opentelemetry/sdk/trace/simple_processor.h index 982a432e0c..576f9ebe9f 100644 --- a/sdk/include/opentelemetry/sdk/trace/simple_processor.h +++ b/sdk/include/opentelemetry/sdk/trace/simple_processor.h @@ -31,9 +31,17 @@ class SimpleSpanProcessor : public SpanProcessor * Initialize a simple span processor. * @param exporter the exporter used by the span processor */ - explicit SimpleSpanProcessor(std::unique_ptr &&exporter, - bool is_export_async = false) noexcept - : exporter_(std::move(exporter)), is_export_async_(is_export_async) + explicit SimpleSpanProcessor(std::unique_ptr &&exporter +#ifdef ENABLE_ASYNC_EXPORT + , + bool is_export_async = false +#endif + ) noexcept + : exporter_(std::move(exporter)) +#ifdef ENABLE_ASYNC_EXPORT + , + is_export_async_(is_export_async) +#endif {} std::unique_ptr MakeRecordable() noexcept override @@ -49,13 +57,16 @@ class SimpleSpanProcessor : public SpanProcessor { nostd::span> batch(&span, 1); const std::lock_guard locked(lock_); +#ifdef ENABLE_ASYNC_EXPORT if (is_export_async_ == false) { +#endif if (exporter_->Export(batch) == sdk::common::ExportResult::kFailure) { /* Once it is defined how the SDK does logging, an error should be * logged in this case. */ } +#ifdef ENABLE_ASYNC_EXPORT } else { @@ -65,6 +76,7 @@ class SimpleSpanProcessor : public SpanProcessor return true; }); } +#endif } bool ForceFlush( @@ -90,7 +102,9 @@ class SimpleSpanProcessor : public SpanProcessor std::unique_ptr exporter_; opentelemetry::common::SpinLockMutex lock_; std::atomic_flag shutdown_latch_ = ATOMIC_FLAG_INIT; - bool is_export_async_ = false; +#ifdef ENABLE_ASYNC_EXPORT + bool is_export_async_ = false; +#endif }; } // namespace trace } // namespace sdk diff --git a/sdk/src/logs/batch_log_processor.cc b/sdk/src/logs/batch_log_processor.cc index 6e6c7319b0..ee80d21ce7 100644 --- a/sdk/src/logs/batch_log_processor.cc +++ b/sdk/src/logs/batch_log_processor.cc @@ -23,7 +23,9 @@ BatchLogProcessor::BatchLogProcessor(std::unique_ptr &&exporter, max_queue_size_(max_queue_size), scheduled_delay_millis_(scheduled_delay_millis), max_export_batch_size_(max_export_batch_size), +# ifdef ENABLE_ASYNC_EXPORT is_export_async_(is_export_async), +# endif buffer_(max_queue_size_), synchronization_data_(std::make_shared()), worker_thread_(&BatchLogProcessor::DoBackgroundWork, this) @@ -41,7 +43,9 @@ BatchLogProcessor::BatchLogProcessor(std::unique_ptr &&exporter, max_queue_size_(options.max_queue_size), scheduled_delay_millis_(options.schedule_delay_millis), max_export_batch_size_(options.max_export_batch_size), +# ifdef ENABLE_ASYNC_EXPORT is_export_async_(options.is_export_async), +# endif buffer_(options.max_queue_size), synchronization_data_(std::make_shared()), worker_thread_(&BatchLogProcessor::DoBackgroundWork, this) @@ -221,11 +225,14 @@ void BatchLogProcessor::Export() }); }); +# ifdef ENABLE_ASYNC_EXPORT if (is_export_async_ == false) { +# endif exporter_->Export( nostd::span>(records_arr.data(), records_arr.size())); NotifyCompletion(notify_force_flush, synchronization_data_); +# ifdef ENABLE_ASYNC_EXPORT } else { @@ -243,9 +250,11 @@ void BatchLogProcessor::Export() return true; }); } +# endif } while (true); } +# ifdef ENABLE_ASYNC_EXPORT void BatchLogProcessor::WaitForShutdownCompletion() { // Since async export is invoked due to shutdown, need to wait @@ -267,6 +276,7 @@ void BatchLogProcessor::WaitForShutdownCompletion() } } } +# endif void BatchLogProcessor::NotifyCompletion( bool notify_force_flush, @@ -303,9 +313,11 @@ void BatchLogProcessor::DrainQueue() Export(); +# ifdef ENABLE_ASYNC_EXPORT // Since async export is invoked due to shutdown, need to wait // for async thread to complete. WaitForShutdownCompletion(); +# endif } } diff --git a/sdk/src/logs/simple_log_processor.cc b/sdk/src/logs/simple_log_processor.cc index e16a5e631e..844a84c6fb 100644 --- a/sdk/src/logs/simple_log_processor.cc +++ b/sdk/src/logs/simple_log_processor.cc @@ -16,9 +16,17 @@ namespace logs * Initialize a simple log processor. * @param exporter the configured exporter where log records are sent */ -SimpleLogProcessor::SimpleLogProcessor(std::unique_ptr &&exporter, - bool is_export_async) - : exporter_(std::move(exporter)), is_export_async_(is_export_async) +SimpleLogProcessor::SimpleLogProcessor(std::unique_ptr &&exporter +# ifdef ENABLE_ASYNC_EXPORT + , + bool is_export_async +# endif + ) + : exporter_(std::move(exporter)) +# ifdef ENABLE_ASYNC_EXPORT + , + is_export_async_(is_export_async) +# endif {} std::unique_ptr SimpleLogProcessor::MakeRecordable() noexcept @@ -36,12 +44,15 @@ void SimpleLogProcessor::OnReceive(std::unique_ptr &&record) noexcep // Get lock to ensure Export() is never called concurrently const std::lock_guard locked(lock_); +# ifdef ENABLE_ASYNC_EXPORT if (is_export_async_ == false) { +# endif if (exporter_->Export(batch) != sdk::common::ExportResult::kSuccess) { /* Alert user of the failed export */ } +# ifdef ENABLE_ASYNC_EXPORT } else { @@ -51,6 +62,7 @@ void SimpleLogProcessor::OnReceive(std::unique_ptr &&record) noexcep return true; }); } +# endif } /** * The simple processor does not have any log records to flush so this method is not used diff --git a/sdk/src/trace/batch_span_processor.cc b/sdk/src/trace/batch_span_processor.cc index 8aaa6d0095..6a356c17a0 100644 --- a/sdk/src/trace/batch_span_processor.cc +++ b/sdk/src/trace/batch_span_processor.cc @@ -22,7 +22,9 @@ BatchSpanProcessor::BatchSpanProcessor(std::unique_ptr &&exporter, max_queue_size_(options.max_queue_size), schedule_delay_millis_(options.schedule_delay_millis), max_export_batch_size_(options.max_export_batch_size), +#ifdef ENABLE_ASYNC_EXPORT is_export_async_(options.is_export_async), +#endif buffer_(max_queue_size_), synchronization_data_(std::make_shared()), worker_thread_(&BatchSpanProcessor::DoBackgroundWork, this) @@ -205,14 +207,14 @@ void BatchSpanProcessor::Export() }); }); - /* Call the sync Export when force flush was called, even if - is_export_async_ is true. - */ +#ifdef ENABLE_ASYNC_EXPORT if (is_export_async_ == false) { +#endif exporter_->Export( nostd::span>(spans_arr.data(), spans_arr.size())); NotifyCompletion(notify_force_flush, synchronization_data_); +#ifdef ENABLE_ASYNC_EXPORT } else { @@ -230,9 +232,11 @@ void BatchSpanProcessor::Export() return true; }); } +#endif } while (true); } +#ifdef ENABLE_ASYNC_EXPORT void BatchSpanProcessor::WaitForShutdownCompletion() { // Since async export is invoked due to shutdown, need to wait @@ -254,6 +258,7 @@ void BatchSpanProcessor::WaitForShutdownCompletion() } } } +#endif void BatchSpanProcessor::NotifyCompletion( bool notify_force_flush, @@ -289,7 +294,9 @@ void BatchSpanProcessor::DrainQueue() } Export(); +#ifdef ENABLE_ASYNC_EXPORT WaitForShutdownCompletion(); +#endif } } diff --git a/sdk/test/logs/batch_log_processor_test.cc b/sdk/test/logs/batch_log_processor_test.cc index 8902f23d15..2ad4eaec67 100644 --- a/sdk/test/logs/batch_log_processor_test.cc +++ b/sdk/test/logs/batch_log_processor_test.cc @@ -57,6 +57,7 @@ class MockLogExporter final : public LogExporter return ExportResult::kSuccess; } +# ifdef ENABLE_ASYNC_EXPORT void Export(const opentelemetry::nostd::span> &records, std::function &&result_callback) noexcept override @@ -69,6 +70,7 @@ class MockLogExporter final : public LogExporter }, std::move(result_callback))); } +# endif // toggles the boolean flag marking this exporter as shut down bool Shutdown( @@ -162,6 +164,7 @@ TEST_F(BatchLogProcessorTest, TestShutdown) EXPECT_TRUE(is_shutdown->load()); } +# ifdef ENABLE_ASYNC_EXPORT TEST_F(BatchLogProcessorTest, TestAsyncShutdown) { // initialize a batch log processor with the test exporter @@ -208,6 +211,7 @@ TEST_F(BatchLogProcessorTest, TestAsyncShutdown) // Also check that the processor is shut down at the end EXPECT_TRUE(is_shutdown->load()); } +# endif TEST_F(BatchLogProcessorTest, TestForceFlush) { @@ -250,6 +254,7 @@ TEST_F(BatchLogProcessorTest, TestForceFlush) } } +# ifdef ENABLE_ASYNC_EXPORT TEST_F(BatchLogProcessorTest, TestAsyncForceFlush) { std::shared_ptr> is_shutdown(new std::atomic(false)); @@ -300,6 +305,7 @@ TEST_F(BatchLogProcessorTest, TestAsyncForceFlush) EXPECT_EQ("Log" + std::to_string(i % num_logs), logs_received->at(i)->GetName()); } } +# endif TEST_F(BatchLogProcessorTest, TestManyLogsLoss) { diff --git a/sdk/test/logs/simple_log_processor_test.cc b/sdk/test/logs/simple_log_processor_test.cc index e0c02203fa..888ce51c7a 100644 --- a/sdk/test/logs/simple_log_processor_test.cc +++ b/sdk/test/logs/simple_log_processor_test.cc @@ -53,6 +53,7 @@ class TestExporter final : public LogExporter return ExportResult::kSuccess; } +# ifdef ENABLE_ASYNC_EXPORT // Dummy Async Export implementation void Export(const nostd::span> &records, std::function @@ -61,6 +62,7 @@ class TestExporter final : public LogExporter auto result = Export(records); result_callback(result); } +# endif // Increment the shutdown counter everytime this method is called bool Shutdown(std::chrono::microseconds timeout) noexcept override @@ -146,13 +148,14 @@ class FailShutDownExporter final : public LogExporter return ExportResult::kSuccess; } +# ifdef ENABLE_ASYNC_EXPORT void Export(const nostd::span> &records, std::function &&result_callback) noexcept override { result_callback(ExportResult::kSuccess); } - +# endif bool Shutdown(std::chrono::microseconds timeout) noexcept override { return false; } }; diff --git a/sdk/test/trace/batch_span_processor_test.cc b/sdk/test/trace/batch_span_processor_test.cc index 0b7e4c2e39..122cbbedfe 100644 --- a/sdk/test/trace/batch_span_processor_test.cc +++ b/sdk/test/trace/batch_span_processor_test.cc @@ -58,6 +58,7 @@ class MockSpanExporter final : public sdk::trace::SpanExporter return sdk::common::ExportResult::kSuccess; } +#ifdef ENABLE_ASYNC_EXPORT void Export(const nostd::span> &records, std::function &&result_callback) noexcept override @@ -70,6 +71,7 @@ class MockSpanExporter final : public sdk::trace::SpanExporter }, std::move(result_callback))); } +#endif bool Shutdown( std::chrono::microseconds timeout = std::chrono::microseconds::max()) noexcept override @@ -159,6 +161,7 @@ TEST_F(BatchSpanProcessorTestPeer, TestShutdown) EXPECT_TRUE(is_shutdown->load()); } +#ifdef ENABLE_ASYNC_EXPORT TEST_F(BatchSpanProcessorTestPeer, TestAsyncShutdown) { std::shared_ptr> is_shutdown(new std::atomic(false)); @@ -246,6 +249,7 @@ TEST_F(BatchSpanProcessorTestPeer, TestAsyncForceFlush) spans_received->at(num_spans + i)->GetName()); } } +#endif TEST_F(BatchSpanProcessorTestPeer, TestForceFlush) { diff --git a/sdk/test/trace/simple_processor_test.cc b/sdk/test/trace/simple_processor_test.cc index 6c8a00ead2..aa59fa850c 100644 --- a/sdk/test/trace/simple_processor_test.cc +++ b/sdk/test/trace/simple_processor_test.cc @@ -51,12 +51,14 @@ class RecordShutdownExporter final : public SpanExporter return ExportResult::kSuccess; } +#ifdef ENABLE_ASYNC_EXPORT void Export(const opentelemetry::nostd::span> &spans, std::function &&result_callback) noexcept override { result_callback(ExportResult::kSuccess); } +#endif bool Shutdown( std::chrono::microseconds timeout = std::chrono::microseconds::max()) noexcept override