diff --git a/CHANGELOG.md b/CHANGELOG.md index 934320ee2e..eb66255c9e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,7 +15,6 @@ Increment the: ## [Unreleased] -* [SDK] Async Batch Span/Log processor with max async support ([#1306](https://github.com/open-telemetry/opentelemetry-cpp/pull/1306)) * [EXPORTER] OTLP http exporter allow concurrency session ([#1209](https://github.com/open-telemetry/opentelemetry-cpp/pull/1209)) * [EXT] `curl::HttpClient` use `curl_multi_handle` instead of creating a thread for every request and it's able to reuse connections now. ([#1317](https://github.com/open-telemetry/opentelemetry-cpp/pull/1317)) diff --git a/exporters/elasticsearch/include/opentelemetry/exporters/elasticsearch/es_log_exporter.h b/exporters/elasticsearch/include/opentelemetry/exporters/elasticsearch/es_log_exporter.h index a5c6623228..ea58807e96 100644 --- a/exporters/elasticsearch/include/opentelemetry/exporters/elasticsearch/es_log_exporter.h +++ b/exporters/elasticsearch/include/opentelemetry/exporters/elasticsearch/es_log_exporter.h @@ -89,19 +89,6 @@ class ElasticsearchLogExporter final : public opentelemetry::sdk::logs::LogExpor const opentelemetry::nostd::span> &records) noexcept override; -# ifdef ENABLE_ASYNC_EXPORT - /** - * Exports a vector of log records to the Elasticsearch instance asynchronously. - * @param records A list of log records to send to Elasticsearch. - * @param result_callback callback function accepting ExportResult as argument - */ - void Export( - const opentelemetry::nostd::span> - &records, - std::function &&result_callback) noexcept - override; -# endif - /** * Shutdown this exporter. * @param timeout The maximum time to wait for the shutdown method to return diff --git a/exporters/elasticsearch/src/es_log_exporter.cc b/exporters/elasticsearch/src/es_log_exporter.cc index 1abe4ca00e..46b7dfcd88 100644 --- a/exporters/elasticsearch/src/es_log_exporter.cc +++ b/exporters/elasticsearch/src/es_log_exporter.cc @@ -246,6 +246,29 @@ sdk::common::ExportResult ElasticsearchLogExporter::Export( std::vector body_vec(body.begin(), body.end()); request->SetBody(body_vec); +# ifdef ENABLE_ASYNC_EXPORT + // Send the request + std::size_t span_count = records.size(); + auto handler = std::make_shared( + session, + [span_count](opentelemetry::sdk::common::ExportResult result) { + if (result != opentelemetry::sdk::common::ExportResult::kSuccess) + { + OTEL_INTERNAL_LOG_ERROR("[ES Trace Exporter] ERROR: Export " + << span_count + << " trace span(s) error: " << static_cast(result)); + } + else + { + OTEL_INTERNAL_LOG_DEBUG("[ES Trace Exporter] DEBUG: Export " << span_count + << " trace span(s) success"); + } + return true; + }, + options_.console_debug_); + session->SendRequest(handler); + return sdk::common::ExportResult::kSuccess; +# else // Send the request auto handler = std::make_shared(options_.console_debug_); session->SendRequest(handler); @@ -281,54 +304,8 @@ sdk::common::ExportResult ElasticsearchLogExporter::Export( } return sdk::common::ExportResult::kSuccess; -} - -# ifdef ENABLE_ASYNC_EXPORT -void ElasticsearchLogExporter::Export( - const opentelemetry::nostd::span> - &records, - std::function &&result_callback) noexcept -{ - // Return failure if this exporter has been shutdown - if (isShutdown()) - { - OTEL_INTERNAL_LOG_ERROR("[ES Log Exporter] Exporting " - << records.size() << " log(s) failed, exporter is shutdown"); - return; - } - - // Create a connection to the ElasticSearch instance - auto session = http_client_->CreateSession(options_.host_ + std::to_string(options_.port_)); - auto request = session->CreateRequest(); - - // Populate the request with headers and methods - request->SetUri(options_.index_ + "/_bulk?pretty"); - request->SetMethod(http_client::Method::Post); - request->AddHeader("Content-Type", "application/json"); - request->SetTimeoutMs(std::chrono::milliseconds(1000 * options_.response_timeout_)); - - // Create the request body - std::string body = ""; - for (auto &record : records) - { - // Append {"index":{}} before JSON body, which tells Elasticsearch to write to index specified - // in URI - body += "{\"index\" : {}}\n"; - - // Add the context of the Recordable - auto json_record = std::unique_ptr( - static_cast(record.release())); - body += json_record->GetJSON().dump() + "\n"; - } - std::vector body_vec(body.begin(), body.end()); - request->SetBody(body_vec); - - // Send the request - auto handler = std::make_shared(session, std::move(result_callback), - options_.console_debug_); - session->SendRequest(handler); -} # endif +} bool ElasticsearchLogExporter::Shutdown(std::chrono::microseconds timeout) noexcept { diff --git a/exporters/jaeger/include/opentelemetry/exporters/jaeger/jaeger_exporter.h b/exporters/jaeger/include/opentelemetry/exporters/jaeger/jaeger_exporter.h index 85003689c5..284bab2cab 100644 --- a/exporters/jaeger/include/opentelemetry/exporters/jaeger/jaeger_exporter.h +++ b/exporters/jaeger/include/opentelemetry/exporters/jaeger/jaeger_exporter.h @@ -61,17 +61,6 @@ class JaegerExporter final : public opentelemetry::sdk::trace::SpanExporter const nostd::span> &spans) noexcept override; -#ifdef ENABLE_ASYNC_EXPORT - /** - * Exports a batch of span recordables asynchronously. - * @param spans a span of unique pointers to span recordables - * @param result_callback callback function accepting ExportResult as argument - */ - void Export(const nostd::span> &spans, - std::function - &&result_callback) noexcept override; -#endif - /** * Shutdown the exporter. * @param timeout an option timeout, default to max. diff --git a/exporters/jaeger/src/jaeger_exporter.cc b/exporters/jaeger/src/jaeger_exporter.cc index b77b445100..c07f2f0100 100644 --- a/exporters/jaeger/src/jaeger_exporter.cc +++ b/exporters/jaeger/src/jaeger_exporter.cc @@ -70,17 +70,6 @@ sdk_common::ExportResult JaegerExporter::Export( return sdk_common::ExportResult::kSuccess; } -#ifdef ENABLE_ASYNC_EXPORT -void JaegerExporter::Export( - const nostd::span> &spans, - std::function &&result_callback) noexcept -{ - OTEL_INTERNAL_LOG_WARN(" async not supported. Making sync interface call"); - auto status = Export(spans); - result_callback(status); -} -#endif - void JaegerExporter::InitializeEndpoint() { if (options_.transport_format == TransportFormat::kThriftUdpCompact) diff --git a/exporters/memory/include/opentelemetry/exporters/memory/in_memory_span_exporter.h b/exporters/memory/include/opentelemetry/exporters/memory/in_memory_span_exporter.h index 2eae4fc3c0..28b7bc34e8 100644 --- a/exporters/memory/include/opentelemetry/exporters/memory/in_memory_span_exporter.h +++ b/exporters/memory/include/opentelemetry/exporters/memory/in_memory_span_exporter.h @@ -64,22 +64,6 @@ class InMemorySpanExporter final : public opentelemetry::sdk::trace::SpanExporte return sdk::common::ExportResult::kSuccess; } -#ifdef ENABLE_ASYNC_EXPORT - /** - * Exports a batch of span recordables asynchronously. - * @param spans a span of unique pointers to span recordables - * @param result_callback callback function accepting ExportResult as argument - */ - void Export(const nostd::span> &spans, - std::function - &&result_callback) noexcept override - { - OTEL_INTERNAL_LOG_WARN(" async not supported. Making sync interface call"); - auto status = Export(spans); - result_callback(status); - } -#endif - /** * @param timeout an optional value containing the timeout of the exporter * note: passing custom timeout values is not currently supported for this exporter diff --git a/exporters/ostream/include/opentelemetry/exporters/ostream/log_exporter.h b/exporters/ostream/include/opentelemetry/exporters/ostream/log_exporter.h index c263ac63ae..2f6acbb486 100644 --- a/exporters/ostream/include/opentelemetry/exporters/ostream/log_exporter.h +++ b/exporters/ostream/include/opentelemetry/exporters/ostream/log_exporter.h @@ -39,15 +39,6 @@ class OStreamLogExporter final : public opentelemetry::sdk::logs::LogExporter const opentelemetry::nostd::span> &records) noexcept override; -# ifdef ENABLE_ASYNC_EXPORT - /** - * Exports a span of logs sent from the processor asynchronously. - */ - void Export( - const opentelemetry::nostd::span> &records, - std::function &&result_callback) noexcept; -# endif - /** * Marks the OStream Log Exporter as shut down. */ diff --git a/exporters/ostream/include/opentelemetry/exporters/ostream/span_exporter.h b/exporters/ostream/include/opentelemetry/exporters/ostream/span_exporter.h index 2bc7a9da97..c8603db027 100644 --- a/exporters/ostream/include/opentelemetry/exporters/ostream/span_exporter.h +++ b/exporters/ostream/include/opentelemetry/exporters/ostream/span_exporter.h @@ -38,14 +38,6 @@ class OStreamSpanExporter final : public opentelemetry::sdk::trace::SpanExporter const opentelemetry::nostd::span> &spans) noexcept override; -#ifdef ENABLE_ASYNC_EXPORT - void Export( - const opentelemetry::nostd::span> - &spans, - std::function &&result_callback) noexcept - override; -#endif - bool Shutdown( std::chrono::microseconds timeout = std::chrono::microseconds::max()) noexcept override; diff --git a/exporters/ostream/src/log_exporter.cc b/exporters/ostream/src/log_exporter.cc index 2396101930..ecbccf8c6e 100644 --- a/exporters/ostream/src/log_exporter.cc +++ b/exporters/ostream/src/log_exporter.cc @@ -102,17 +102,6 @@ sdk::common::ExportResult OStreamLogExporter::Export( return sdk::common::ExportResult::kSuccess; } -# ifdef ENABLE_ASYNC_EXPORT -void OStreamLogExporter::Export( - const opentelemetry::nostd::span> &records, - std::function &&result_callback) noexcept -{ - // Do not have async support - auto result = Export(records); - result_callback(result); -} -# endif - bool OStreamLogExporter::Shutdown(std::chrono::microseconds) noexcept { const std::lock_guard locked(lock_); diff --git a/exporters/ostream/src/span_exporter.cc b/exporters/ostream/src/span_exporter.cc index 00a36c2144..226f987378 100644 --- a/exporters/ostream/src/span_exporter.cc +++ b/exporters/ostream/src/span_exporter.cc @@ -98,16 +98,6 @@ sdk::common::ExportResult OStreamSpanExporter::Export( return sdk::common::ExportResult::kSuccess; } -#ifdef ENABLE_ASYNC_EXPORT -void OStreamSpanExporter::Export( - const opentelemetry::nostd::span> &spans, - std::function &&result_callback) noexcept -{ - auto result = Export(spans); - result_callback(result); -} -#endif - bool OStreamSpanExporter::Shutdown(std::chrono::microseconds timeout) noexcept { const std::lock_guard locked(lock_); diff --git a/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_grpc_exporter.h b/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_grpc_exporter.h index 4660903595..a28e6fca85 100644 --- a/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_grpc_exporter.h +++ b/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_grpc_exporter.h @@ -52,17 +52,6 @@ class OtlpGrpcExporter final : public opentelemetry::sdk::trace::SpanExporter sdk::common::ExportResult Export( const nostd::span> &spans) noexcept override; -#ifdef ENABLE_ASYNC_EXPORT - /** - * Exports a batch of span recordables asynchronously. - * @param spans a span of unique pointers to span recordables - * @param result_callback callback function accepting ExportResult as argument - */ - virtual void Export(const nostd::span> &spans, - std::function - &&result_callback) noexcept override; -#endif - /** * Shut down the exporter. * @param timeout an optional timeout, the default timeout of 0 means that no diff --git a/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_grpc_log_exporter.h b/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_grpc_log_exporter.h index 5652a21f72..a8aeda85b8 100644 --- a/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_grpc_log_exporter.h +++ b/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_grpc_log_exporter.h @@ -55,18 +55,6 @@ class OtlpGrpcLogExporter : public opentelemetry::sdk::logs::LogExporter const nostd::span> &records) noexcept override; -# ifdef ENABLE_ASYNC_EXPORT - /** - * Exports a vector of log records asynchronously. - * @param records A list of log records. - * @param result_callback callback function accepting ExportResult as argument - */ - virtual void Export( - const nostd::span> &records, - std::function &&result_callback) noexcept - override; -# endif - /** * Shutdown this exporter. * @param timeout The maximum time to wait for the shutdown method to return. diff --git a/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_http_client.h b/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_http_client.h index edb0f59afc..0eddc85524 100644 --- a/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_http_client.h +++ b/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_http_client.h @@ -122,6 +122,7 @@ class OtlpHttpClient * Sync export * @param message message to export, it should be ExportTraceServiceRequest, * ExportMetricsServiceRequest or ExportLogsServiceRequest + * @return return the status of this operation */ sdk::common::ExportResult Export(const google::protobuf::Message &message) noexcept; @@ -130,11 +131,25 @@ class OtlpHttpClient * @param message message to export, it should be ExportTraceServiceRequest, * ExportMetricsServiceRequest or ExportLogsServiceRequest * @param result_callback callback to call when the exporting is done + * @return return the status of this operation */ - void Export( + sdk::common::ExportResult Export( const google::protobuf::Message &message, std::function &&result_callback) noexcept; + /** + * Async export + * @param message message to export, it should be ExportTraceServiceRequest, + * ExportMetricsServiceRequest or ExportLogsServiceRequest + * @param result_callback callback to call when the exporting is done + * @param max_running_requests wait for at most max_running_requests running requests + * @return return the status of this operation + */ + sdk::common::ExportResult Export( + const google::protobuf::Message &message, + std::function &&result_callback, + std::size_t max_running_requests) noexcept; + /** * Shut down the HTTP client. * @param timeout an optional timeout, the default timeout of 0 means that no @@ -150,6 +165,12 @@ class OtlpHttpClient */ void ReleaseSession(const opentelemetry::ext::http::client::Session &session) noexcept; + /** + * Get options of current OTLP http client. + * @return options of current OTLP http client. + */ + inline const OtlpHttpClientOptions &GetOptions() const noexcept { return options_; } + private: struct HttpSessionData { diff --git a/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_http_exporter.h b/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_http_exporter.h index 6dfe8f1f34..de8fcd1381 100644 --- a/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_http_exporter.h +++ b/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_http_exporter.h @@ -92,18 +92,6 @@ class OtlpHttpExporter final : public opentelemetry::sdk::trace::SpanExporter const nostd::span> &spans) noexcept override; -#ifdef ENABLE_ASYNC_EXPORT - /** - * Exports a batch of span recordables asynchronously. - * @param spans a span of unique pointers to span recordables - * @param result_callback callback function accepting ExportResult as argument - */ - virtual void Export( - const nostd::span> &spans, - std::function &&result_callback) noexcept - override; -#endif - /** * Shut down the exporter. * @param timeout an optional timeout, the default timeout of 0 means that no diff --git a/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_http_log_exporter.h b/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_http_log_exporter.h index 91e4ccf714..3bd5c4b7cb 100644 --- a/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_http_log_exporter.h +++ b/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_http_log_exporter.h @@ -93,18 +93,6 @@ class OtlpHttpLogExporter final : public opentelemetry::sdk::logs::LogExporter const nostd::span> &records) noexcept override; -# ifdef ENABLE_ASYNC_EXPORT - /** - * Exports a vector of log records asynchronously. - * @param records A list of log records. - * @param result_callback callback function accepting ExportResult as argument - */ - virtual void Export( - const nostd::span> &records, - std::function &&result_callback) noexcept - override; -# endif - /** * Shutdown this exporter. * @param timeout The maximum time to wait for the shutdown method to return diff --git a/exporters/otlp/src/otlp_grpc_exporter.cc b/exporters/otlp/src/otlp_grpc_exporter.cc index c7734ad3e2..32f4a60a52 100644 --- a/exporters/otlp/src/otlp_grpc_exporter.cc +++ b/exporters/otlp/src/otlp_grpc_exporter.cc @@ -143,18 +143,6 @@ sdk::common::ExportResult OtlpGrpcExporter::Export( return sdk::common::ExportResult::kSuccess; } -#ifdef ENABLE_ASYNC_EXPORT -void OtlpGrpcExporter::Export( - const nostd::span> &spans, - std::function &&result_callback) noexcept -{ - OTEL_INTERNAL_LOG_WARN( - "[OTLP TRACE GRPC Exporter] async not supported. Making sync interface call"); - auto status = Export(spans); - result_callback(status); -} -#endif - bool OtlpGrpcExporter::Shutdown(std::chrono::microseconds timeout) noexcept { const std::lock_guard locked(lock_); diff --git a/exporters/otlp/src/otlp_grpc_log_exporter.cc b/exporters/otlp/src/otlp_grpc_log_exporter.cc index e506cde0c5..38bfb0a5bb 100644 --- a/exporters/otlp/src/otlp_grpc_log_exporter.cc +++ b/exporters/otlp/src/otlp_grpc_log_exporter.cc @@ -161,18 +161,6 @@ opentelemetry::sdk::common::ExportResult OtlpGrpcLogExporter::Export( return sdk::common::ExportResult::kSuccess; } -# ifdef ENABLE_ASYNC_EXPORT -void OtlpGrpcLogExporter::Export( - const nostd::span> &logs, - std::function &&result_callback) noexcept -{ - OTEL_INTERNAL_LOG_WARN( - "[OTLP LOG GRPC Exporter] async not supported. Making sync interface call"); - auto status = Export(logs); - result_callback(status); -} -# endif - bool OtlpGrpcLogExporter::Shutdown(std::chrono::microseconds timeout) noexcept { const std::lock_guard locked(lock_); diff --git a/exporters/otlp/src/otlp_http_client.cc b/exporters/otlp/src/otlp_http_client.cc index b2fc75838d..a8b35574f9 100644 --- a/exporters/otlp/src/otlp_http_client.cc +++ b/exporters/otlp/src/otlp_http_client.cc @@ -692,61 +692,46 @@ OtlpHttpClient::OtlpHttpClient(OtlpHttpClientOptions &&options, opentelemetry::sdk::common::ExportResult OtlpHttpClient::Export( const google::protobuf::Message &message) noexcept { - opentelemetry::sdk::common::ExportResult result = + opentelemetry::sdk::common::ExportResult session_result = opentelemetry::sdk::common::ExportResult::kSuccess; - auto session = - createSession(message, [&result](opentelemetry::sdk::common::ExportResult export_result) { - result = export_result; - return export_result == opentelemetry::sdk::common::ExportResult::kSuccess; - }); - - if (opentelemetry::nostd::holds_alternative(session)) - { - return opentelemetry::nostd::get(session); - } + opentelemetry::sdk::common::ExportResult export_result = Export( + message, + [&session_result](opentelemetry::sdk::common::ExportResult result) { + session_result = result; + return result == opentelemetry::sdk::common::ExportResult::kSuccess; + }, + 0); - // Wait for the response to be received - if (options_.console_debug) + if (opentelemetry::sdk::common::ExportResult::kSuccess != export_result) { - OTEL_INTERNAL_LOG_DEBUG( - "[OTLP HTTP Client] DEBUG: Waiting for response from " - << options_.url << " (timeout = " - << std::chrono::duration_cast(options_.timeout).count() - << " milliseconds)"); + return export_result; } - addSession(std::move(opentelemetry::nostd::get(session))); - - // Wait for any session to finish if there are to many sessions - std::unique_lock lock(session_waker_lock_); - bool wait_successful = session_waker_.wait_for(lock, options_.timeout, [this] { - std::lock_guard guard{session_manager_lock_}; - return running_sessions_.empty(); - }); - - cleanupGCSessions(); - - // If an error occurred with the HTTP request - if (!wait_successful) - { - return opentelemetry::sdk::common::ExportResult::kFailure; - } - - return result; + return session_result; } -void OtlpHttpClient::Export( +sdk::common::ExportResult OtlpHttpClient::Export( const google::protobuf::Message &message, std::function &&result_callback) noexcept +{ + return Export(message, std::move(result_callback), options_.max_concurrent_requests); +} + +sdk::common::ExportResult OtlpHttpClient::Export( + const google::protobuf::Message &message, + std::function &&result_callback, + std::size_t max_running_requests) noexcept { auto session = createSession(message, std::move(result_callback)); if (opentelemetry::nostd::holds_alternative(session)) { + sdk::common::ExportResult result = + opentelemetry::nostd::get(session); if (result_callback) { - result_callback(opentelemetry::nostd::get(session)); + result_callback(result); } - return; + return result; } addSession(std::move(opentelemetry::nostd::get(session))); @@ -763,12 +748,20 @@ void OtlpHttpClient::Export( // Wait for any session to finish if there are to many sessions std::unique_lock lock(session_waker_lock_); - session_waker_.wait_for(lock, options_.timeout, [this] { - std::lock_guard guard{session_manager_lock_}; - return running_sessions_.size() <= options_.max_concurrent_requests; - }); + bool wait_successful = + session_waker_.wait_for(lock, options_.timeout, [this, max_running_requests] { + std::lock_guard guard{session_manager_lock_}; + return running_sessions_.size() <= max_running_requests; + }); cleanupGCSessions(); + + if (!wait_successful) + { + return opentelemetry::sdk::common::ExportResult::kFailure; + } + + return opentelemetry::sdk::common::ExportResult::kSuccess; } bool OtlpHttpClient::Shutdown(std::chrono::microseconds timeout) noexcept diff --git a/exporters/otlp/src/otlp_http_exporter.cc b/exporters/otlp/src/otlp_http_exporter.cc index f5b50162f3..70bdd8c90d 100644 --- a/exporters/otlp/src/otlp_http_exporter.cc +++ b/exporters/otlp/src/otlp_http_exporter.cc @@ -42,7 +42,20 @@ OtlpHttpExporter::OtlpHttpExporter(const OtlpHttpExporterOptions &options) OtlpHttpExporter::OtlpHttpExporter(std::unique_ptr http_client) : options_(OtlpHttpExporterOptions()), http_client_(std::move(http_client)) -{} +{ + OtlpHttpExporterOptions &options = const_cast(options_); + options.url = http_client_->GetOptions().url; + options.content_type = http_client_->GetOptions().content_type; + options.json_bytes_mapping = http_client_->GetOptions().json_bytes_mapping; + options.use_json_name = http_client_->GetOptions().use_json_name; + options.console_debug = http_client_->GetOptions().console_debug; + options.timeout = http_client_->GetOptions().timeout; + options.http_headers = http_client_->GetOptions().http_headers; +#ifdef ENABLE_ASYNC_EXPORT + options.max_concurrent_requests = http_client_->GetOptions().max_concurrent_requests; + options.max_requests_per_connection = http_client_->GetOptions().max_requests_per_connection; +#endif +} // ----------------------------- Exporter methods ------------------------------ std::unique_ptr OtlpHttpExporter::MakeRecordable() noexcept @@ -61,24 +74,39 @@ opentelemetry::sdk::common::ExportResult OtlpHttpExporter::Export( proto::collector::trace::v1::ExportTraceServiceRequest service_request; OtlpRecordableUtils::PopulateRequest(spans, &service_request); - return http_client_->Export(service_request); -} - + std::size_t span_count = spans.size(); #ifdef ENABLE_ASYNC_EXPORT -void OtlpHttpExporter::Export( - const nostd::span> &spans, - std::function &&result_callback) noexcept -{ - if (spans.empty()) + http_client_->Export( + service_request, [span_count](opentelemetry::sdk::common::ExportResult result) { + if (result != opentelemetry::sdk::common::ExportResult::kSuccess) + { + OTEL_INTERNAL_LOG_ERROR("[OTLP HTTP Client] ERROR: Export " + << span_count + << " trace span(s) error: " << static_cast(result)); + } + else + { + OTEL_INTERNAL_LOG_DEBUG("[OTLP HTTP Client] DEBUG: Export " << span_count + << " trace span(s) success"); + } + return true; + }); + return opentelemetry::sdk::common::ExportResult::kSuccess; +#else + opentelemetry::sdk::common::ExportResult result = http_client_->Export(service_request); + if (result != opentelemetry::sdk::common::ExportResult::kSuccess) { - return; + OTEL_INTERNAL_LOG_ERROR("[OTLP HTTP Client] ERROR: Export " + << span_count << " trace span(s) error: " << static_cast(result)); } - - proto::collector::trace::v1::ExportTraceServiceRequest service_request; - OtlpRecordableUtils::PopulateRequest(spans, &service_request); - http_client_->Export(service_request, std::move(result_callback)); -} + else + { + OTEL_INTERNAL_LOG_DEBUG("[OTLP HTTP Client] DEBUG: Export " << span_count + << " trace span(s) success"); + } + return opentelemetry::sdk::common::ExportResult::kSuccess; #endif +} bool OtlpHttpExporter::Shutdown(std::chrono::microseconds timeout) noexcept { diff --git a/exporters/otlp/src/otlp_http_log_exporter.cc b/exporters/otlp/src/otlp_http_log_exporter.cc index d2430bf924..4eb992a61b 100644 --- a/exporters/otlp/src/otlp_http_log_exporter.cc +++ b/exporters/otlp/src/otlp_http_log_exporter.cc @@ -44,7 +44,20 @@ OtlpHttpLogExporter::OtlpHttpLogExporter(const OtlpHttpLogExporterOptions &optio OtlpHttpLogExporter::OtlpHttpLogExporter(std::unique_ptr http_client) : options_(OtlpHttpLogExporterOptions()), http_client_(std::move(http_client)) -{} +{ + OtlpHttpLogExporterOptions &options = const_cast(options_); + options.url = http_client_->GetOptions().url; + options.content_type = http_client_->GetOptions().content_type; + options.json_bytes_mapping = http_client_->GetOptions().json_bytes_mapping; + options.use_json_name = http_client_->GetOptions().use_json_name; + options.console_debug = http_client_->GetOptions().console_debug; + options.timeout = http_client_->GetOptions().timeout; + options.http_headers = http_client_->GetOptions().http_headers; +# ifdef ENABLE_ASYNC_EXPORT + options.max_concurrent_requests = http_client_->GetOptions().max_concurrent_requests; + options.max_requests_per_connection = http_client_->GetOptions().max_requests_per_connection; +# endif +} // ----------------------------- Exporter methods ------------------------------ std::unique_ptr OtlpHttpLogExporter::MakeRecordable() noexcept @@ -62,23 +75,37 @@ opentelemetry::sdk::common::ExportResult OtlpHttpLogExporter::Export( } proto::collector::logs::v1::ExportLogsServiceRequest service_request; OtlpRecordableUtils::PopulateRequest(logs, &service_request); - return http_client_->Export(service_request); -} - + std::size_t log_count = logs.size(); # ifdef ENABLE_ASYNC_EXPORT -void OtlpHttpLogExporter::Export( - const nostd::span> &logs, - std::function &&result_callback) noexcept -{ - if (logs.empty()) + http_client_->Export( + service_request, [log_count](opentelemetry::sdk::common::ExportResult result) { + if (result != opentelemetry::sdk::common::ExportResult::kSuccess) + { + OTEL_INTERNAL_LOG_ERROR("[OTLP HTTP Client] ERROR: Export " + << log_count << " log(s) error: " << static_cast(result)); + } + else + { + OTEL_INTERNAL_LOG_DEBUG("[OTLP HTTP Client] DEBUG: Export " << log_count + << " log(s) success"); + } + return true; + }); + return opentelemetry::sdk::common::ExportResult::kSuccess; +# else + opentelemetry::sdk::common::ExportResult result = http_client_->Export(service_request); + if (result != opentelemetry::sdk::common::ExportResult::kSuccess) { - return; + OTEL_INTERNAL_LOG_ERROR("[OTLP HTTP Client] ERROR: Export " + << log_count << " log(s) error: " << static_cast(result)); } - proto::collector::logs::v1::ExportLogsServiceRequest service_request; - OtlpRecordableUtils::PopulateRequest(logs, &service_request); - http_client_->Export(service_request, std::move(result_callback)); -} + else + { + OTEL_INTERNAL_LOG_DEBUG("[OTLP HTTP Client] DEBUG: Export " << log_count << " log(s) success"); + } + return opentelemetry::sdk::common::ExportResult::kSuccess; # endif +} bool OtlpHttpLogExporter::Shutdown(std::chrono::microseconds timeout) noexcept { diff --git a/exporters/otlp/test/otlp_http_exporter_test.cc b/exporters/otlp/test/otlp_http_exporter_test.cc index c25c0111b2..6a388a40e4 100644 --- a/exporters/otlp/test/otlp_http_exporter_test.cc +++ b/exporters/otlp/test/otlp_http_exporter_test.cc @@ -17,7 +17,6 @@ # include "opentelemetry/ext/http/client/http_client_factory.h" # include "opentelemetry/ext/http/client/nosend/http_client_nosend.h" # include "opentelemetry/ext/http/server/http_server.h" -# include "opentelemetry/sdk/trace/async_batch_span_processor.h" # include "opentelemetry/sdk/trace/batch_span_processor.h" # include "opentelemetry/sdk/trace/tracer_provider.h" # include "opentelemetry/trace/provider.h" @@ -49,7 +48,8 @@ static nostd::span MakeSpan(T (&array)[N]) return nostd::span(array); } -OtlpHttpClientOptions MakeOtlpHttpClientOptions(HttpRequestContentType content_type) +OtlpHttpClientOptions MakeOtlpHttpClientOptions(HttpRequestContentType content_type, + bool async_mode) { OtlpHttpExporterOptions options; options.content_type = content_type; @@ -60,6 +60,10 @@ OtlpHttpClientOptions MakeOtlpHttpClientOptions(HttpRequestContentType content_t OtlpHttpClientOptions otlp_http_client_options( options.url, options.content_type, options.json_bytes_mapping, options.use_json_name, options.console_debug, options.timeout, options.http_headers); + if (!async_mode) + { + otlp_http_client_options.max_concurrent_requests = 0; + } return otlp_http_client_options; } @@ -80,10 +84,11 @@ class OtlpHttpExporterTestPeer : public ::testing::Test } static std::pair> - GetMockOtlpHttpClient(HttpRequestContentType content_type) + GetMockOtlpHttpClient(HttpRequestContentType content_type, bool async_mode = false) { auto http_client = http_client::HttpClientFactory::CreateNoSend(); - return {new OtlpHttpClient(MakeOtlpHttpClientOptions(content_type), http_client), http_client}; + return {new OtlpHttpClient(MakeOtlpHttpClientOptions(content_type, async_mode), http_client), + http_client}; } void ExportJsonIntegrationTest() @@ -174,7 +179,7 @@ class OtlpHttpExporterTestPeer : public ::testing::Test void ExportJsonIntegrationTestAsync() { auto mock_otlp_client = - OtlpHttpExporterTestPeer::GetMockOtlpHttpClient(HttpRequestContentType::kJson); + OtlpHttpExporterTestPeer::GetMockOtlpHttpClient(HttpRequestContentType::kJson, true); auto mock_otlp_http_client = mock_otlp_client.first; auto client = mock_otlp_client.second; auto exporter = GetExporter(std::unique_ptr{mock_otlp_http_client}); @@ -196,13 +201,13 @@ class OtlpHttpExporterTestPeer : public ::testing::Test resource_attributes["vec_string_value"] = std::vector{"vector", "string"}; auto resource = resource::Resource::Create(resource_attributes); - auto processor_opts = sdk::trace::AsyncBatchSpanProcessorOptions(); + auto processor_opts = sdk::trace::BatchSpanProcessorOptions(); processor_opts.max_export_batch_size = 5; processor_opts.max_queue_size = 5; processor_opts.schedule_delay_millis = std::chrono::milliseconds(256); auto processor = std::unique_ptr( - new sdk::trace::AsyncBatchSpanProcessor(std::move(exporter), processor_opts)); + new sdk::trace::BatchSpanProcessor(std::move(exporter), processor_opts)); auto provider = nostd::shared_ptr( new sdk::trace::TracerProvider(std::move(processor), resource)); @@ -344,7 +349,7 @@ class OtlpHttpExporterTestPeer : public ::testing::Test void ExportBinaryIntegrationTestAsync() { auto mock_otlp_client = - OtlpHttpExporterTestPeer::GetMockOtlpHttpClient(HttpRequestContentType::kBinary); + OtlpHttpExporterTestPeer::GetMockOtlpHttpClient(HttpRequestContentType::kBinary, true); auto mock_otlp_http_client = mock_otlp_client.first; auto client = mock_otlp_client.second; auto exporter = GetExporter(std::unique_ptr{mock_otlp_http_client}); @@ -366,13 +371,13 @@ class OtlpHttpExporterTestPeer : public ::testing::Test resource_attributes["vec_string_value"] = std::vector{"vector", "string"}; auto resource = resource::Resource::Create(resource_attributes); - auto processor_opts = sdk::trace::AsyncBatchSpanProcessorOptions(); + auto processor_opts = sdk::trace::BatchSpanProcessorOptions(); processor_opts.max_export_batch_size = 5; processor_opts.max_queue_size = 5; processor_opts.schedule_delay_millis = std::chrono::milliseconds(256); auto processor = std::unique_ptr( - new sdk::trace::AsyncBatchSpanProcessor(std::move(exporter), processor_opts)); + new sdk::trace::BatchSpanProcessor(std::move(exporter), processor_opts)); auto provider = nostd::shared_ptr( new sdk::trace::TracerProvider(std::move(processor), resource)); diff --git a/exporters/otlp/test/otlp_http_log_exporter_test.cc b/exporters/otlp/test/otlp_http_log_exporter_test.cc index 77106f3e47..498ae79111 100644 --- a/exporters/otlp/test/otlp_http_log_exporter_test.cc +++ b/exporters/otlp/test/otlp_http_log_exporter_test.cc @@ -20,7 +20,6 @@ # include "opentelemetry/ext/http/client/nosend/http_client_nosend.h" # include "opentelemetry/ext/http/server/http_server.h" # include "opentelemetry/logs/provider.h" -# include "opentelemetry/sdk/logs/async_batch_log_processor.h" # include "opentelemetry/sdk/logs/batch_log_processor.h" # include "opentelemetry/sdk/logs/exporter.h" # include "opentelemetry/sdk/logs/log_record.h" @@ -52,7 +51,8 @@ static nostd::span MakeSpan(T (&array)[N]) return nostd::span(array); } -OtlpHttpClientOptions MakeOtlpHttpClientOptions(HttpRequestContentType content_type) +OtlpHttpClientOptions MakeOtlpHttpClientOptions(HttpRequestContentType content_type, + bool async_mode) { OtlpHttpLogExporterOptions options; options.content_type = content_type; @@ -62,6 +62,10 @@ OtlpHttpClientOptions MakeOtlpHttpClientOptions(HttpRequestContentType content_t OtlpHttpClientOptions otlp_http_client_options( options.url, options.content_type, options.json_bytes_mapping, options.use_json_name, options.console_debug, options.timeout, options.http_headers); + if (!async_mode) + { + otlp_http_client_options.max_concurrent_requests = 0; + } return otlp_http_client_options; } @@ -81,10 +85,11 @@ class OtlpHttpLogExporterTestPeer : public ::testing::Test return exporter->options_; } static std::pair> - GetMockOtlpHttpClient(HttpRequestContentType content_type) + GetMockOtlpHttpClient(HttpRequestContentType content_type, bool async_mode = false) { auto http_client = http_client::HttpClientFactory::CreateNoSend(); - return {new OtlpHttpClient(MakeOtlpHttpClientOptions(content_type), http_client), http_client}; + return {new OtlpHttpClient(MakeOtlpHttpClientOptions(content_type, async_mode), http_client), + http_client}; } void ExportJsonIntegrationTest() @@ -184,7 +189,7 @@ class OtlpHttpLogExporterTestPeer : public ::testing::Test void ExportJsonIntegrationTestAsync() { auto mock_otlp_client = - OtlpHttpLogExporterTestPeer::GetMockOtlpHttpClient(HttpRequestContentType::kJson); + OtlpHttpLogExporterTestPeer::GetMockOtlpHttpClient(HttpRequestContentType::kJson, true); auto mock_otlp_http_client = mock_otlp_client.first; auto client = mock_otlp_client.second; auto exporter = GetExporter(std::unique_ptr{mock_otlp_http_client}); @@ -198,13 +203,13 @@ class OtlpHttpLogExporterTestPeer : public ::testing::Test std::string attribute_storage_string_value[] = {"vector", "string"}; auto provider = nostd::shared_ptr(new sdk::logs::LoggerProvider()); - sdk::logs::AsyncBatchLogProcessorOptions options; + sdk::logs::BatchLogProcessorOptions options; options.max_queue_size = 5; options.schedule_delay_millis = std::chrono::milliseconds(256); options.max_export_batch_size = 5; provider->AddProcessor(std::unique_ptr( - new sdk::logs::AsyncBatchLogProcessor(std::move(exporter), options))); + new sdk::logs::BatchLogProcessor(std::move(exporter), options))); std::string report_trace_id; std::string report_span_id; @@ -380,7 +385,7 @@ class OtlpHttpLogExporterTestPeer : public ::testing::Test void ExportBinaryIntegrationTestAsync() { auto mock_otlp_client = - OtlpHttpLogExporterTestPeer::GetMockOtlpHttpClient(HttpRequestContentType::kBinary); + OtlpHttpLogExporterTestPeer::GetMockOtlpHttpClient(HttpRequestContentType::kBinary, true); auto mock_otlp_http_client = mock_otlp_client.first; auto client = mock_otlp_client.second; auto exporter = GetExporter(std::unique_ptr{mock_otlp_http_client}); @@ -395,12 +400,12 @@ class OtlpHttpLogExporterTestPeer : public ::testing::Test auto provider = nostd::shared_ptr(new sdk::logs::LoggerProvider()); - sdk::logs::AsyncBatchLogProcessorOptions processor_options; + sdk::logs::BatchLogProcessorOptions processor_options; processor_options.max_export_batch_size = 5; processor_options.max_queue_size = 5; processor_options.schedule_delay_millis = std::chrono::milliseconds(256); provider->AddProcessor(std::unique_ptr( - new sdk::logs::AsyncBatchLogProcessor(std::move(exporter), processor_options))); + new sdk::logs::BatchLogProcessor(std::move(exporter), processor_options))); std::string report_trace_id; std::string report_span_id; diff --git a/exporters/zipkin/include/opentelemetry/exporters/zipkin/zipkin_exporter.h b/exporters/zipkin/include/opentelemetry/exporters/zipkin/zipkin_exporter.h index f730f1e111..ae0e8173f9 100644 --- a/exporters/zipkin/include/opentelemetry/exporters/zipkin/zipkin_exporter.h +++ b/exporters/zipkin/include/opentelemetry/exporters/zipkin/zipkin_exporter.h @@ -78,17 +78,6 @@ class ZipkinExporter final : public opentelemetry::sdk::trace::SpanExporter const nostd::span> &spans) noexcept override; -#ifdef ENABLE_ASYNC_EXPORT - /** - * Export asynchronosly a batch of span recordables in JSON format - * @param spans a span of unique pointers to span recordables - * @param result_callback callback function accepting ExportResult as argument - */ - void Export(const nostd::span> &spans, - std::function - &&result_callback) noexcept override; -#endif - /** * Shut down the exporter. * @param timeout an optional timeout, default to max. diff --git a/exporters/zipkin/src/zipkin_exporter.cc b/exporters/zipkin/src/zipkin_exporter.cc index 92a6bfc37f..240144599f 100644 --- a/exporters/zipkin/src/zipkin_exporter.cc +++ b/exporters/zipkin/src/zipkin_exporter.cc @@ -93,17 +93,6 @@ sdk::common::ExportResult ZipkinExporter::Export( return sdk::common::ExportResult::kSuccess; } -#ifdef ENABLE_ASYNC_EXPORT -void ZipkinExporter::Export( - const nostd::span> &spans, - std::function &&result_callback) noexcept -{ - OTEL_INTERNAL_LOG_WARN("[ZIPKIN EXPORTER] async not supported. Making sync interface call"); - auto status = Export(spans); - result_callback(status); -} -#endif - void ZipkinExporter::InitializeLocalEndpoint() { if (options_.service_name.length()) diff --git a/sdk/include/opentelemetry/sdk/logs/async_batch_log_processor.h b/sdk/include/opentelemetry/sdk/logs/async_batch_log_processor.h deleted file mode 100644 index 7cc6af8738..0000000000 --- a/sdk/include/opentelemetry/sdk/logs/async_batch_log_processor.h +++ /dev/null @@ -1,104 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -#pragma once -#ifdef ENABLE_LOGS_PREVIEW -# ifdef ENABLE_ASYNC_EXPORT - -# include "opentelemetry/sdk/common/circular_buffer.h" -# include "opentelemetry/sdk/logs/batch_log_processor.h" -# include "opentelemetry/sdk/logs/exporter.h" - -# include -# include -# include -# include -# include -# include - -OPENTELEMETRY_BEGIN_NAMESPACE -namespace sdk -{ - -namespace logs -{ - -/** - * Struct to hold batch SpanProcessor options. - */ -struct AsyncBatchLogProcessorOptions : public BatchLogProcessorOptions -{ - /* Denotes the maximum number of async exports to continue - */ - size_t max_export_async = 8; -}; - -/** - * This is an implementation of the LogProcessor which creates batches of finished logs and passes - * the export-friendly log data representations to the configured LogExporter. - */ -class AsyncBatchLogProcessor : public BatchLogProcessor -{ -public: - /** - * Creates a batch log processor by configuring the specified exporter and other parameters - * as per the official, language-agnostic opentelemetry specs. - * - * @param exporter - The backend exporter to pass the logs to - * @param options - The batch SpanProcessor options. - */ - explicit AsyncBatchLogProcessor(std::unique_ptr &&exporter, - const AsyncBatchLogProcessorOptions &options); - - /** - * Shuts down the processor and does any cleanup required. Completely drains the buffer/queue of - * all its logs and passes them to the exporter. Any subsequent calls to - * ForceFlush or Shutdown will return immediately without doing anything. - * - * NOTE: Timeout functionality not supported yet. - */ - bool Shutdown( - std::chrono::microseconds timeout = std::chrono::microseconds::max()) noexcept override; - - /** - * Class destructor which invokes the Shutdown() method. - */ - virtual ~AsyncBatchLogProcessor(); - -private: - /** - * Exports all logs to the configured exporter. - * - */ - void Export() override; - - struct ExportDataStorage - { - std::queue export_ids; - std::vector export_ids_flag; - - std::condition_variable async_export_waker; - std::mutex async_export_data_m; - }; - std::shared_ptr export_data_storage_; - - const size_t max_export_async_; - static constexpr size_t kInvalidExportId = static_cast(-1); - - /** - * @brief Notify completion of shutdown and force flush. This may be called from the any thread at - * any time - * - * @param notify_force_flush Flag to indicate whether to notify force flush completion. - * @param synchronization_data Synchronization data to be notified. - */ - static void NotifyCompletion(bool notify_force_flush, - const std::shared_ptr &synchronization_data, - const std::shared_ptr &export_data_storage); -}; - -} // namespace logs -} // namespace sdk -OPENTELEMETRY_END_NAMESPACE -# endif -#endif diff --git a/sdk/include/opentelemetry/sdk/logs/exporter.h b/sdk/include/opentelemetry/sdk/logs/exporter.h index 5c28d73538..85c58e9f12 100644 --- a/sdk/include/opentelemetry/sdk/logs/exporter.h +++ b/sdk/include/opentelemetry/sdk/logs/exporter.h @@ -46,17 +46,6 @@ class LogExporter virtual sdk::common::ExportResult Export( const nostd::span> &records) noexcept = 0; -# ifdef ENABLE_ASYNC_EXPORT - /** - * Exports asynchronously the batch of log records to their export destination - * @param records a span of unique pointers to log records - * @param result_callback callback function accepting ExportResult as argument - */ - virtual void Export( - const nostd::span> &records, - std::function &&result_callback) noexcept = 0; -# endif - /** * Marks the exporter as ShutDown and cleans up any resources as required. * Shutdown should be called only once for each Exporter instance. diff --git a/sdk/include/opentelemetry/sdk/logs/simple_log_processor.h b/sdk/include/opentelemetry/sdk/logs/simple_log_processor.h index a92380cd01..cc3aec47b2 100644 --- a/sdk/include/opentelemetry/sdk/logs/simple_log_processor.h +++ b/sdk/include/opentelemetry/sdk/logs/simple_log_processor.h @@ -28,12 +28,7 @@ class SimpleLogProcessor : public LogProcessor { public: - explicit SimpleLogProcessor(std::unique_ptr &&exporter -# ifdef ENABLE_ASYNC_EXPORT - , - bool is_export_async = false -# endif - ); + explicit SimpleLogProcessor(std::unique_ptr &&exporter); virtual ~SimpleLogProcessor() = default; std::unique_ptr MakeRecordable() noexcept override; @@ -53,10 +48,6 @@ class SimpleLogProcessor : public LogProcessor opentelemetry::common::SpinLockMutex lock_; // The atomic boolean flag to ensure the ShutDown() function is only called once std::atomic_flag shutdown_latch_ = ATOMIC_FLAG_INIT; - -# ifdef ENABLE_ASYNC_EXPORT - bool is_export_async_ = false; -# endif }; } // namespace logs } // namespace sdk diff --git a/sdk/include/opentelemetry/sdk/trace/async_batch_span_processor.h b/sdk/include/opentelemetry/sdk/trace/async_batch_span_processor.h deleted file mode 100644 index 4cbb234e71..0000000000 --- a/sdk/include/opentelemetry/sdk/trace/async_batch_span_processor.h +++ /dev/null @@ -1,103 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -#pragma once -#ifdef ENABLE_ASYNC_EXPORT - -# include "opentelemetry/sdk/common/circular_buffer.h" -# include "opentelemetry/sdk/trace/batch_span_processor.h" -# include "opentelemetry/sdk/trace/exporter.h" - -# include -# include -# include -# include - -OPENTELEMETRY_BEGIN_NAMESPACE -namespace sdk -{ - -namespace trace -{ - -/** - * Struct to hold batch SpanProcessor options. - */ -struct AsyncBatchSpanProcessorOptions : public BatchSpanProcessorOptions -{ - /* Denotes the maximum number of async exports to continue - */ - size_t max_export_async = 8; -}; - -/** - * This is an implementation of the SpanProcessor which creates batches of finished spans and passes - * the export-friendly span data representations to the configured SpanExporter. - */ -class AsyncBatchSpanProcessor : public BatchSpanProcessor -{ -public: - /** - * Creates a batch span processor by configuring the specified exporter and other parameters - * as per the official, language-agnostic opentelemetry specs. - * - * @param exporter - The backend exporter to pass the ended spans to. - * @param options - The batch SpanProcessor options. - */ - AsyncBatchSpanProcessor(std::unique_ptr &&exporter, - const AsyncBatchSpanProcessorOptions &options); - - /** - * Shuts down the processor and does any cleanup required. Completely drains the buffer/queue of - * all its ended spans and passes them to the exporter. Any subsequent calls to OnStart, OnEnd, - * ForceFlush or Shutdown will return immediately without doing anything. - * - * NOTE: Timeout functionality not supported yet. - */ - bool Shutdown( - std::chrono::microseconds timeout = std::chrono::microseconds::max()) noexcept override; - - /** - * Class destructor which invokes the Shutdown() method. The Shutdown() method is supposed to be - * invoked when the Tracer is shutdown (as per other languages), but the C++ Tracer only takes - * shared ownership of the processor, and thus doesn't call Shutdown (as the processor might be - * shared with other Tracers). - */ - virtual ~AsyncBatchSpanProcessor(); - -private: - /** - * Exports all ended spans to the configured exporter. - * - */ - void Export() override; - - struct ExportDataStorage - { - std::queue export_ids; - std::vector export_ids_flag; - - std::condition_variable async_export_waker; - std::mutex async_export_data_m; - }; - std::shared_ptr export_data_storage_; - - const size_t max_export_async_; - static constexpr size_t kInvalidExportId = static_cast(-1); - - /** - * @brief Notify completion of shutdown and force flush. This may be called from the any thread at - * any time - * - * @param notify_force_flush Flag to indicate whether to notify force flush completion. - * @param synchronization_data Synchronization data to be notified. - */ - static void NotifyCompletion(bool notify_force_flush, - const std::shared_ptr &synchronization_data, - const std::shared_ptr &export_data_storage); -}; - -} // namespace trace -} // namespace sdk -OPENTELEMETRY_END_NAMESPACE -#endif \ No newline at end of file diff --git a/sdk/include/opentelemetry/sdk/trace/exporter.h b/sdk/include/opentelemetry/sdk/trace/exporter.h index b58bbc8717..5826b5f454 100644 --- a/sdk/include/opentelemetry/sdk/trace/exporter.h +++ b/sdk/include/opentelemetry/sdk/trace/exporter.h @@ -42,17 +42,6 @@ class SpanExporter const nostd::span> &spans) noexcept = 0; -#ifdef ENABLE_ASYNC_EXPORT - /** - * Exports a batch of span recordables asynchronously. - * @param spans a span of unique pointers to span recordables - * @param result_callback callback function accepting ExportResult as argument - */ - virtual void Export( - const nostd::span> &spans, - std::function &&result_callback) noexcept = 0; -#endif - /** * Shut down the exporter. * @param timeout an optional timeout. diff --git a/sdk/include/opentelemetry/sdk/trace/simple_processor.h b/sdk/include/opentelemetry/sdk/trace/simple_processor.h index 576f9ebe9f..accc685965 100644 --- a/sdk/include/opentelemetry/sdk/trace/simple_processor.h +++ b/sdk/include/opentelemetry/sdk/trace/simple_processor.h @@ -31,17 +31,8 @@ class SimpleSpanProcessor : public SpanProcessor * Initialize a simple span processor. * @param exporter the exporter used by the span processor */ - explicit SimpleSpanProcessor(std::unique_ptr &&exporter -#ifdef ENABLE_ASYNC_EXPORT - , - bool is_export_async = false -#endif - ) noexcept + explicit SimpleSpanProcessor(std::unique_ptr &&exporter) noexcept : exporter_(std::move(exporter)) -#ifdef ENABLE_ASYNC_EXPORT - , - is_export_async_(is_export_async) -#endif {} std::unique_ptr MakeRecordable() noexcept override @@ -57,26 +48,11 @@ class SimpleSpanProcessor : public SpanProcessor { nostd::span> batch(&span, 1); const std::lock_guard locked(lock_); -#ifdef ENABLE_ASYNC_EXPORT - if (is_export_async_ == false) + if (exporter_->Export(batch) == sdk::common::ExportResult::kFailure) { -#endif - if (exporter_->Export(batch) == sdk::common::ExportResult::kFailure) - { - /* Once it is defined how the SDK does logging, an error should be - * logged in this case. */ - } -#ifdef ENABLE_ASYNC_EXPORT + /* Once it is defined how the SDK does logging, an error should be + * logged in this case. */ } - else - { - exporter_->Export(batch, [](sdk::common::ExportResult result) { - /* Log the result - */ - return true; - }); - } -#endif } bool ForceFlush( @@ -102,9 +78,6 @@ class SimpleSpanProcessor : public SpanProcessor std::unique_ptr exporter_; opentelemetry::common::SpinLockMutex lock_; std::atomic_flag shutdown_latch_ = ATOMIC_FLAG_INIT; -#ifdef ENABLE_ASYNC_EXPORT - bool is_export_async_ = false; -#endif }; } // namespace trace } // namespace sdk diff --git a/sdk/src/logs/CMakeLists.txt b/sdk/src/logs/CMakeLists.txt index 1b1db9e778..20f13324e7 100644 --- a/sdk/src/logs/CMakeLists.txt +++ b/sdk/src/logs/CMakeLists.txt @@ -4,7 +4,6 @@ add_library( logger.cc simple_log_processor.cc batch_log_processor.cc - async_batch_log_processor.cc logger_context.cc multi_log_processor.cc multi_recordable.cc) diff --git a/sdk/src/logs/async_batch_log_processor.cc b/sdk/src/logs/async_batch_log_processor.cc deleted file mode 100644 index 5d99754c12..0000000000 --- a/sdk/src/logs/async_batch_log_processor.cc +++ /dev/null @@ -1,194 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -#ifdef ENABLE_LOGS_PREVIEW -# ifdef ENABLE_ASYNC_EXPORT -# include "opentelemetry/sdk/logs/async_batch_log_processor.h" -# include "opentelemetry/common/spin_lock_mutex.h" - -# include - -using opentelemetry::sdk::common::AtomicUniquePtr; -using opentelemetry::sdk::common::CircularBufferRange; - -OPENTELEMETRY_BEGIN_NAMESPACE -namespace sdk -{ -namespace logs -{ -AsyncBatchLogProcessor::AsyncBatchLogProcessor(std::unique_ptr &&exporter, - const AsyncBatchLogProcessorOptions &options) - : BatchLogProcessor(std::move(exporter), options), - max_export_async_(options.max_export_async), - export_data_storage_(std::make_shared()) -{ - export_data_storage_->export_ids_flag.resize(max_export_async_, true); - for (int i = 1; i <= max_export_async_; i++) - { - export_data_storage_->export_ids.push(i); - } -} - -void AsyncBatchLogProcessor::Export() -{ - do - { - std::vector> records_arr; - size_t num_records_to_export; - bool notify_force_flush = - synchronization_data_->is_force_flush_pending.exchange(false, std::memory_order_acq_rel); - if (notify_force_flush) - { - num_records_to_export = buffer_.size(); - } - else - { - num_records_to_export = - buffer_.size() >= max_export_batch_size_ ? max_export_batch_size_ : buffer_.size(); - } - - if (num_records_to_export == 0) - { - NotifyCompletion(notify_force_flush, synchronization_data_, export_data_storage_); - break; - } - - buffer_.Consume(num_records_to_export, - [&](CircularBufferRange> range) noexcept { - range.ForEach([&](AtomicUniquePtr &ptr) { - std::unique_ptr swap_ptr = std::unique_ptr(nullptr); - ptr.Swap(swap_ptr); - records_arr.push_back(std::unique_ptr(swap_ptr.release())); - return true; - }); - }); - - size_t id = kInvalidExportId; - { - std::unique_lock lock(export_data_storage_->async_export_data_m); - export_data_storage_->async_export_waker.wait_for(lock, scheduled_delay_millis_, [this] { - return export_data_storage_->export_ids.size() > 0; - }); - if (export_data_storage_->export_ids.size() > 0) - { - id = export_data_storage_->export_ids.front(); - export_data_storage_->export_ids.pop(); - export_data_storage_->export_ids_flag[id - 1] = false; - } - } - if (id != kInvalidExportId) - { - std::weak_ptr export_data_watcher = export_data_storage_; - std::weak_ptr synchronization_data_watcher = synchronization_data_; - exporter_->Export( - nostd::span>(records_arr.data(), records_arr.size()), - [notify_force_flush, synchronization_data_watcher, export_data_watcher, - id](sdk::common::ExportResult result) { - // TODO: Print result - if (synchronization_data_watcher.expired()) - { - return true; - } - if (export_data_watcher.expired()) - { - return true; - } - bool is_already_notified = false; - auto synchronization_data = synchronization_data_watcher.lock(); - auto export_data = export_data_watcher.lock(); - { - std::unique_lock lk(export_data->async_export_data_m); - // In case callback is called more than once due to some bug in exporter - // we need to ensure export_ids do not contain duplicate. - if (export_data->export_ids_flag[id - 1] == false) - { - export_data->export_ids.push(id); - export_data->export_ids_flag[id - 1] = true; - } - else - { - is_already_notified = true; - } - } - if (is_already_notified == false) - { - NotifyCompletion(notify_force_flush, synchronization_data, export_data); - } - return true; - }); - } - } while (true); -} - -void AsyncBatchLogProcessor::NotifyCompletion( - bool notify_force_flush, - const std::shared_ptr &synchronization_data, - const std::shared_ptr &export_data_storage) -{ - BatchLogProcessor::NotifyCompletion(notify_force_flush, synchronization_data); - export_data_storage->async_export_waker.notify_all(); -} - -bool AsyncBatchLogProcessor::Shutdown(std::chrono::microseconds timeout) noexcept -{ - if (synchronization_data_->is_shutdown.load() == true) - { - return true; - } - auto start_time = std::chrono::system_clock::now(); - - std::lock_guard shutdown_guard{synchronization_data_->shutdown_m}; - bool already_shutdown = synchronization_data_->is_shutdown.exchange(true); - if (worker_thread_.joinable()) - { - synchronization_data_->is_force_wakeup_background_worker.store(true, std::memory_order_release); - synchronization_data_->cv.notify_one(); - worker_thread_.join(); - } - - timeout = opentelemetry::common::DurationUtil::AdjustWaitForTimeout( - timeout, std::chrono::microseconds::zero()); - // wait for all async exports to complete and return if timeout reached. - { - std::unique_lock lock(export_data_storage_->async_export_data_m); - if (timeout <= std::chrono::microseconds::zero()) - { - auto is_wait = false; - while (!is_wait) - { - is_wait = export_data_storage_->async_export_waker.wait_for( - lock, scheduled_delay_millis_, - [this] { return export_data_storage_->export_ids.size() == max_export_async_; }); - } - } - else - { - export_data_storage_->async_export_waker.wait_for(lock, timeout, [this] { - return export_data_storage_->export_ids.size() == max_export_async_; - }); - } - } - - GetWaitAdjustedTime(timeout, start_time); - // Should only shutdown exporter ONCE. - if (!already_shutdown && exporter_ != nullptr) - { - return exporter_->Shutdown(timeout); - } - - return true; -} - -AsyncBatchLogProcessor::~AsyncBatchLogProcessor() -{ - if (synchronization_data_->is_shutdown.load() == false) - { - Shutdown(); - } -} - -} // namespace logs -} // namespace sdk -OPENTELEMETRY_END_NAMESPACE -# endif -#endif diff --git a/sdk/src/logs/simple_log_processor.cc b/sdk/src/logs/simple_log_processor.cc index 844a84c6fb..6e2fde9f14 100644 --- a/sdk/src/logs/simple_log_processor.cc +++ b/sdk/src/logs/simple_log_processor.cc @@ -16,17 +16,8 @@ namespace logs * Initialize a simple log processor. * @param exporter the configured exporter where log records are sent */ -SimpleLogProcessor::SimpleLogProcessor(std::unique_ptr &&exporter -# ifdef ENABLE_ASYNC_EXPORT - , - bool is_export_async -# endif - ) +SimpleLogProcessor::SimpleLogProcessor(std::unique_ptr &&exporter) : exporter_(std::move(exporter)) -# ifdef ENABLE_ASYNC_EXPORT - , - is_export_async_(is_export_async) -# endif {} std::unique_ptr SimpleLogProcessor::MakeRecordable() noexcept @@ -44,25 +35,10 @@ void SimpleLogProcessor::OnReceive(std::unique_ptr &&record) noexcep // Get lock to ensure Export() is never called concurrently const std::lock_guard locked(lock_); -# ifdef ENABLE_ASYNC_EXPORT - if (is_export_async_ == false) + if (exporter_->Export(batch) != sdk::common::ExportResult::kSuccess) { -# endif - if (exporter_->Export(batch) != sdk::common::ExportResult::kSuccess) - { - /* Alert user of the failed export */ - } -# ifdef ENABLE_ASYNC_EXPORT + /* Alert user of the failed export */ } - else - { - exporter_->Export(batch, [](sdk::common::ExportResult result) { - /* Log the result - */ - return true; - }); - } -# endif } /** * The simple processor does not have any log records to flush so this method is not used diff --git a/sdk/src/metrics/metric_reader.cc b/sdk/src/metrics/metric_reader.cc index eafcc60e7e..c8d6de8967 100644 --- a/sdk/src/metrics/metric_reader.cc +++ b/sdk/src/metrics/metric_reader.cc @@ -14,7 +14,7 @@ namespace metrics { MetricReader::MetricReader(AggregationTemporality aggregation_temporality) - : aggregation_temporality_(aggregation_temporality) + : aggregation_temporality_(aggregation_temporality), shutdown_(false), metric_producer_(nullptr) {} void MetricReader::SetMetricProducer(MetricProducer *metric_producer) diff --git a/sdk/src/trace/CMakeLists.txt b/sdk/src/trace/CMakeLists.txt index bd906b1fab..ddef00fb42 100644 --- a/sdk/src/trace/CMakeLists.txt +++ b/sdk/src/trace/CMakeLists.txt @@ -5,7 +5,6 @@ add_library( tracer.cc span.cc batch_span_processor.cc - async_batch_span_processor.cc samplers/parent.cc samplers/trace_id_ratio.cc random_id_generator.cc) diff --git a/sdk/src/trace/async_batch_span_processor.cc b/sdk/src/trace/async_batch_span_processor.cc deleted file mode 100644 index e28fe34e70..0000000000 --- a/sdk/src/trace/async_batch_span_processor.cc +++ /dev/null @@ -1,187 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 -#ifdef ENABLE_ASYNC_EXPORT - -# include "opentelemetry/sdk/trace/async_batch_span_processor.h" -# include "opentelemetry/common/spin_lock_mutex.h" - -# include -using opentelemetry::sdk::common::AtomicUniquePtr; -using opentelemetry::sdk::common::CircularBuffer; -using opentelemetry::sdk::common::CircularBufferRange; -using opentelemetry::trace::SpanContext; - -OPENTELEMETRY_BEGIN_NAMESPACE -namespace sdk -{ -namespace trace -{ - -AsyncBatchSpanProcessor::AsyncBatchSpanProcessor(std::unique_ptr &&exporter, - const AsyncBatchSpanProcessorOptions &options) - : BatchSpanProcessor(std::move(exporter), options), - export_data_storage_(std::make_shared()), - max_export_async_(options.max_export_async) -{ - export_data_storage_->export_ids_flag.resize(max_export_async_, true); - for (size_t i = 1; i <= max_export_async_; i++) - { - export_data_storage_->export_ids.push(i); - } -} - -void AsyncBatchSpanProcessor::Export() -{ - do - { - std::vector> spans_arr; - size_t num_records_to_export; - bool notify_force_flush = - synchronization_data_->is_force_flush_pending.exchange(false, std::memory_order_acq_rel); - if (notify_force_flush) - { - num_records_to_export = buffer_.size(); - } - else - { - num_records_to_export = - buffer_.size() >= max_export_batch_size_ ? max_export_batch_size_ : buffer_.size(); - } - - if (num_records_to_export == 0) - { - NotifyCompletion(notify_force_flush, synchronization_data_, export_data_storage_); - break; - } - buffer_.Consume(num_records_to_export, - [&](CircularBufferRange> range) noexcept { - range.ForEach([&](AtomicUniquePtr &ptr) { - std::unique_ptr swap_ptr = std::unique_ptr(nullptr); - ptr.Swap(swap_ptr); - spans_arr.push_back(std::unique_ptr(swap_ptr.release())); - return true; - }); - }); - - size_t id = kInvalidExportId; - { - std::unique_lock lock(export_data_storage_->async_export_data_m); - export_data_storage_->async_export_waker.wait_for(lock, schedule_delay_millis_, [this] { - return export_data_storage_->export_ids.size() > 0; - }); - if (export_data_storage_->export_ids.size() > 0) - { - id = export_data_storage_->export_ids.front(); - export_data_storage_->export_ids.pop(); - export_data_storage_->export_ids_flag[id - 1] = false; - } - } - if (id != kInvalidExportId) - { - std::weak_ptr export_data_watcher = export_data_storage_; - - std::weak_ptr synchronization_data_watcher = synchronization_data_; - exporter_->Export( - nostd::span>(spans_arr.data(), spans_arr.size()), - [notify_force_flush, synchronization_data_watcher, export_data_watcher, - id](sdk::common::ExportResult result) { - // TODO: Print result - if (synchronization_data_watcher.expired()) - { - return true; - } - - if (export_data_watcher.expired()) - { - return true; - } - - auto synchronization_data = synchronization_data_watcher.lock(); - auto export_data = export_data_watcher.lock(); - { - std::unique_lock lk(export_data->async_export_data_m); - if (export_data->export_ids_flag[id - 1] == false) - { - export_data->export_ids.push(id); - export_data->export_ids_flag[id - 1] = true; - } - } - NotifyCompletion(notify_force_flush, synchronization_data, export_data); - return true; - }); - } - } while (true); -} - -void AsyncBatchSpanProcessor::NotifyCompletion( - bool notify_force_flush, - const std::shared_ptr &synchronization_data, - const std::shared_ptr &export_data_storage) -{ - BatchSpanProcessor::NotifyCompletion(notify_force_flush, synchronization_data); - export_data_storage->async_export_waker.notify_all(); -} - -bool AsyncBatchSpanProcessor::Shutdown(std::chrono::microseconds timeout) noexcept -{ - if (synchronization_data_->is_shutdown.load() == true) - { - return true; - } - - auto start_time = std::chrono::system_clock::now(); - std::lock_guard shutdown_guard{synchronization_data_->shutdown_m}; - bool already_shutdown = synchronization_data_->is_shutdown.exchange(true); - - if (worker_thread_.joinable()) - { - synchronization_data_->is_force_wakeup_background_worker.store(true, std::memory_order_release); - synchronization_data_->cv.notify_one(); - worker_thread_.join(); - } - - timeout = opentelemetry::common::DurationUtil::AdjustWaitForTimeout( - timeout, std::chrono::microseconds::zero()); - // wait for all async exports to complete and return if timeout reached. - { - std::unique_lock lock(export_data_storage_->async_export_data_m); - if (timeout <= std::chrono::microseconds::zero()) - { - auto is_wait = false; - while (!is_wait) - { - is_wait = export_data_storage_->async_export_waker.wait_for( - lock, schedule_delay_millis_, - [this] { return export_data_storage_->export_ids.size() == max_export_async_; }); - } - } - else - { - export_data_storage_->async_export_waker.wait_for(lock, timeout, [this] { - return export_data_storage_->export_ids.size() == max_export_async_; - }); - } - } - - GetWaitAdjustedTime(timeout, start_time); - // Should only shutdown exporter ONCE. - if (!already_shutdown && exporter_ != nullptr) - { - return exporter_->Shutdown(timeout); - } - - return true; -} - -AsyncBatchSpanProcessor::~AsyncBatchSpanProcessor() -{ - if (synchronization_data_->is_shutdown.load() == false) - { - Shutdown(); - } -} - -} // namespace trace -} // namespace sdk -OPENTELEMETRY_END_NAMESPACE -#endif diff --git a/sdk/test/logs/BUILD b/sdk/test/logs/BUILD index f620eaf613..c8f051070f 100644 --- a/sdk/test/logs/BUILD +++ b/sdk/test/logs/BUILD @@ -73,18 +73,3 @@ cc_test( "@com_google_googletest//:gtest_main", ], ) - -cc_test( - name = "async_batch_log_processor_test", - srcs = [ - "async_batch_log_processor_test.cc", - ], - tags = [ - "logs", - "test", - ], - deps = [ - "//sdk/src/logs", - "@com_google_googletest//:gtest_main", - ], -) diff --git a/sdk/test/logs/CMakeLists.txt b/sdk/test/logs/CMakeLists.txt index 550b48edd8..84b865d226 100644 --- a/sdk/test/logs/CMakeLists.txt +++ b/sdk/test/logs/CMakeLists.txt @@ -1,8 +1,5 @@ -foreach( - testname - logger_provider_sdk_test logger_sdk_test log_record_test - simple_log_processor_test batch_log_processor_test - async_batch_log_processor_test) +foreach(testname logger_provider_sdk_test logger_sdk_test log_record_test + simple_log_processor_test batch_log_processor_test) add_executable(${testname} "${testname}.cc") target_link_libraries(${testname} ${GTEST_BOTH_LIBRARIES} ${CMAKE_THREAD_LIBS_INIT} opentelemetry_logs) diff --git a/sdk/test/logs/async_batch_log_processor_test.cc b/sdk/test/logs/async_batch_log_processor_test.cc deleted file mode 100644 index 3c71ce743e..0000000000 --- a/sdk/test/logs/async_batch_log_processor_test.cc +++ /dev/null @@ -1,374 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 -#ifndef ENABLE_ASYNC_EXPORT -# include -TEST(AsyncBatchLogProcessor, DummyTest) -{ - // For linking -} -#endif - -#ifdef ENABLE_LOGS_PREVIEW -# ifdef ENABLE_ASYNC_EXPORT - -# include "opentelemetry/sdk/logs/async_batch_log_processor.h" -# include "opentelemetry/sdk/logs/exporter.h" -# include "opentelemetry/sdk/logs/log_record.h" - -# include -# include -# include -# include -# include - -using namespace opentelemetry::sdk::logs; -using namespace opentelemetry::sdk::common; - -/** - * A sample log exporter - * for testing the batch log processor - */ -class MockLogExporter final : public LogExporter -{ -public: - MockLogExporter(std::shared_ptr>> logs_received, - std::shared_ptr> is_shutdown, - std::shared_ptr> is_export_completed, - const std::chrono::milliseconds export_delay = std::chrono::milliseconds(0), - int callback_count = 1) - : logs_received_(logs_received), - is_shutdown_(is_shutdown), - is_export_completed_(is_export_completed), - export_delay_(export_delay), - callback_count_(callback_count) - {} - - std::unique_ptr MakeRecordable() noexcept - { - return std::unique_ptr(new LogRecord()); - } - - // Export method stores the logs received into a shared list of record names - ExportResult Export( - const opentelemetry::nostd::span> &records) noexcept override - { - *is_export_completed_ = false; // Meant exclusively to test scheduled_delay_millis - - for (auto &record : records) - { - auto log = std::unique_ptr(static_cast(record.release())); - if (log != nullptr) - { - logs_received_->push_back(std::move(log)); - } - } - - *is_export_completed_ = true; - return ExportResult::kSuccess; - } - - void Export(const opentelemetry::nostd::span> &records, - std::function - &&result_callback) noexcept override - { - // We should keep the order of test records - auto result = Export(records); - async_threads_.emplace_back(std::make_shared( - [this, - result](std::function &&result_callback) { - for (int i = 0; i < callback_count_; i++) - { - result_callback(result); - } - }, - std::move(result_callback))); - } - - // toggles the boolean flag marking this exporter as shut down - bool Shutdown( - std::chrono::microseconds timeout = std::chrono::microseconds::max()) noexcept override - { - while (!async_threads_.empty()) - { - std::list> async_threads; - async_threads.swap(async_threads_); - for (auto &async_thread : async_threads) - { - if (async_thread && async_thread->joinable()) - { - async_thread->join(); - } - } - } - *is_shutdown_ = true; - return true; - } - -private: - std::shared_ptr>> logs_received_; - std::shared_ptr> is_shutdown_; - std::shared_ptr> is_export_completed_; - const std::chrono::milliseconds export_delay_; - std::list> async_threads_; - int callback_count_; -}; - -/** - * A fixture class for testing the BatchLogProcessor class that uses the TestExporter defined above. - */ -class AsyncBatchLogProcessorTest : public testing::Test // ::testing::Test -{ -public: - // returns a batch log processor that received a batch of log records, a shared pointer to a - // is_shutdown flag, and the processor configuration options (default if unspecified) - std::shared_ptr GetMockProcessor( - std::shared_ptr>> logs_received, - std::shared_ptr> is_shutdown, - std::shared_ptr> is_export_completed = - std::shared_ptr>(new std::atomic(false)), - const std::chrono::milliseconds export_delay = std::chrono::milliseconds(0), - const std::chrono::milliseconds scheduled_delay_millis = std::chrono::milliseconds(5000), - const size_t max_queue_size = 2048, - const size_t max_export_batch_size = 512, - const size_t max_export_async = 8, - int callback_count = 1) - { - AsyncBatchLogProcessorOptions options; - options.max_queue_size = max_queue_size; - options.schedule_delay_millis = scheduled_delay_millis; - options.max_export_batch_size = max_export_batch_size; - options.max_export_async = max_export_async; - return std::shared_ptr(new AsyncBatchLogProcessor( - std::unique_ptr(new MockLogExporter( - logs_received, is_shutdown, is_export_completed, export_delay, callback_count)), - options)); - } -}; - -TEST_F(AsyncBatchLogProcessorTest, TestAsyncShutdown) -{ - // initialize a batch log processor with the test exporter - std::shared_ptr>> logs_received( - new std::vector>); - std::shared_ptr> is_shutdown(new std::atomic(false)); - std::shared_ptr> is_export_completed(new std::atomic(false)); - - const std::chrono::milliseconds export_delay(0); - const std::chrono::milliseconds scheduled_delay_millis(5000); - const size_t max_export_batch_size = 512; - const size_t max_queue_size = 2048; - const size_t max_export_async = 5; - - auto batch_processor = GetMockProcessor(logs_received, is_shutdown, is_export_completed, - export_delay, scheduled_delay_millis, max_queue_size, - max_export_batch_size, max_export_async); - - // Create a few test log records and send them to the processor - const int num_logs = 2048; - - for (int i = 0; i < num_logs; ++i) - { - auto log = batch_processor->MakeRecordable(); - log->SetBody("Log" + std::to_string(i)); - batch_processor->OnReceive(std::move(log)); - } - - // Test that shutting down the processor will first wait for the - // current batch of logs to be sent to the log exporter - // by checking the number of logs sent and the names of the logs sent - EXPECT_EQ(true, batch_processor->Shutdown()); - // It's safe to shutdown again - EXPECT_TRUE(batch_processor->Shutdown()); - - EXPECT_EQ(num_logs, logs_received->size()); - - // Assume logs are received by exporter in same order as sent by processor - for (int i = 0; i < num_logs; ++i) - { - EXPECT_EQ("Log" + std::to_string(i), logs_received->at(i)->GetBody()); - } - - // Also check that the processor is shut down at the end - EXPECT_TRUE(is_shutdown->load()); -} - -TEST_F(AsyncBatchLogProcessorTest, TestAsyncShutdownNoCallback) -{ - // initialize a batch log processor with the test exporter - std::shared_ptr>> logs_received( - new std::vector>); - std::shared_ptr> is_shutdown(new std::atomic(false)); - std::shared_ptr> is_export_completed(new std::atomic(false)); - - const std::chrono::milliseconds export_delay(0); - const std::chrono::milliseconds scheduled_delay_millis(5000); - const size_t max_export_batch_size = 512; - const size_t max_queue_size = 2048; - const size_t max_export_async = 5; - - auto batch_processor = GetMockProcessor(logs_received, is_shutdown, is_export_completed, - export_delay, scheduled_delay_millis, max_queue_size, - max_export_batch_size, max_export_async, 0); - - // Create a few test log records and send them to the processor - const int num_logs = 2048; - - for (int i = 0; i < num_logs; ++i) - { - auto log = batch_processor->MakeRecordable(); - log->SetBody("Log" + std::to_string(i)); - batch_processor->OnReceive(std::move(log)); - } - - EXPECT_EQ(true, batch_processor->Shutdown(std::chrono::milliseconds(5000))); - // It's safe to shutdown again - EXPECT_TRUE(batch_processor->Shutdown()); - - // Also check that the processor is shut down at the end - EXPECT_TRUE(is_shutdown->load()); -} - -TEST_F(AsyncBatchLogProcessorTest, TestAsyncForceFlush) -{ - std::shared_ptr> is_shutdown(new std::atomic(false)); - std::shared_ptr>> logs_received( - new std::vector>); - std::shared_ptr> is_export_completed(new std::atomic(false)); - - const std::chrono::milliseconds export_delay(0); - const std::chrono::milliseconds scheduled_delay_millis(5000); - const size_t max_export_batch_size = 512; - const size_t max_queue_size = 2048; - - auto batch_processor = - GetMockProcessor(logs_received, is_shutdown, is_export_completed, export_delay, - scheduled_delay_millis, max_queue_size, max_export_batch_size); - - const int num_logs = 2048; - - for (int i = 0; i < num_logs; ++i) - { - auto log = batch_processor->MakeRecordable(); - log->SetBody("Log" + std::to_string(i)); - batch_processor->OnReceive(std::move(log)); - } - - EXPECT_TRUE(batch_processor->ForceFlush()); - - EXPECT_EQ(num_logs, logs_received->size()); - for (int i = 0; i < num_logs; ++i) - { - EXPECT_EQ("Log" + std::to_string(i), logs_received->at(i)->GetBody()); - } - - // Create some more logs to make sure that the processor still works - for (int i = 0; i < num_logs; ++i) - { - auto log = batch_processor->MakeRecordable(); - log->SetBody("Log" + std::to_string(i)); - batch_processor->OnReceive(std::move(log)); - } - - EXPECT_TRUE(batch_processor->ForceFlush()); - - EXPECT_EQ(num_logs * 2, logs_received->size()); - for (int i = 0; i < num_logs * 2; ++i) - { - EXPECT_EQ("Log" + std::to_string(i % num_logs), logs_received->at(i)->GetBody()); - } -} - -TEST_F(AsyncBatchLogProcessorTest, TestManyLogsLoss) -{ - /* Test that when exporting more than max_queue_size logs, some are most likely lost*/ - - std::shared_ptr> is_shutdown(new std::atomic(false)); - std::shared_ptr>> logs_received( - new std::vector>); - - const int max_queue_size = 4096; - - auto batch_processor = GetMockProcessor(logs_received, is_shutdown); - - // Create max_queue_size log records - for (int i = 0; i < max_queue_size; ++i) - { - auto log = batch_processor->MakeRecordable(); - log->SetBody("Log" + std::to_string(i)); - batch_processor->OnReceive(std::move(log)); - } - - EXPECT_TRUE(batch_processor->ForceFlush()); - - // Log should be exported by now - EXPECT_GE(max_queue_size, logs_received->size()); -} - -TEST_F(AsyncBatchLogProcessorTest, TestManyLogsLossLess) -{ - /* Test that no logs are lost when sending max_queue_size logs */ - - std::shared_ptr> is_shutdown(new std::atomic(false)); - std::shared_ptr>> logs_received( - new std::vector>); - auto batch_processor = GetMockProcessor(logs_received, is_shutdown); - - const int num_logs = 2048; - - for (int i = 0; i < num_logs; ++i) - { - auto log = batch_processor->MakeRecordable(); - log->SetBody("Log" + std::to_string(i)); - batch_processor->OnReceive(std::move(log)); - } - - EXPECT_TRUE(batch_processor->ForceFlush()); - - EXPECT_EQ(num_logs, logs_received->size()); - for (int i = 0; i < num_logs; ++i) - { - EXPECT_EQ("Log" + std::to_string(i), logs_received->at(i)->GetBody()); - } -} - -TEST_F(AsyncBatchLogProcessorTest, TestScheduledDelayMillis) -{ - /* Test that max_export_batch_size logs are exported every scheduled_delay_millis - seconds */ - - std::shared_ptr> is_shutdown(new std::atomic(false)); - std::shared_ptr> is_export_completed(new std::atomic(false)); - std::shared_ptr>> logs_received( - new std::vector>); - - const std::chrono::milliseconds export_delay(0); - const std::chrono::milliseconds scheduled_delay_millis(2000); - const size_t max_export_batch_size = 512; - - auto batch_processor = GetMockProcessor(logs_received, is_shutdown, is_export_completed, - export_delay, scheduled_delay_millis); - - for (std::size_t i = 0; i < max_export_batch_size; ++i) - { - auto log = batch_processor->MakeRecordable(); - log->SetBody("Log" + std::to_string(i)); - batch_processor->OnReceive(std::move(log)); - } - // Sleep for scheduled_delay_millis milliseconds - std::this_thread::sleep_for(scheduled_delay_millis); - - // small delay to give time to export, which is being performed - // asynchronously by the worker thread (this thread will not - // forcibly join() the main thread unless processor's shutdown() is called). - std::this_thread::sleep_for(std::chrono::milliseconds(50)); - - // Logs should be exported by now - EXPECT_TRUE(is_export_completed->load()); - EXPECT_EQ(max_export_batch_size, logs_received->size()); - for (size_t i = 0; i < max_export_batch_size; ++i) - { - EXPECT_EQ("Log" + std::to_string(i), logs_received->at(i)->GetBody()); - } -} -# endif -#endif diff --git a/sdk/test/logs/batch_log_processor_test.cc b/sdk/test/logs/batch_log_processor_test.cc index 2379c11c0c..e84cf4eb99 100644 --- a/sdk/test/logs/batch_log_processor_test.cc +++ b/sdk/test/logs/batch_log_processor_test.cc @@ -57,17 +57,6 @@ class MockLogExporter final : public LogExporter return ExportResult::kSuccess; } -# ifdef ENABLE_ASYNC_EXPORT - void Export(const opentelemetry::nostd::span> &records, - std::function - &&result_callback) noexcept override - { - // We should keep the order of test records - auto result = Export(records); - result_callback(result); - } -# endif - // toggles the boolean flag marking this exporter as shut down bool Shutdown( std::chrono::microseconds timeout = std::chrono::microseconds::max()) noexcept override diff --git a/sdk/test/logs/simple_log_processor_test.cc b/sdk/test/logs/simple_log_processor_test.cc index 824bf62696..32c62a5083 100644 --- a/sdk/test/logs/simple_log_processor_test.cc +++ b/sdk/test/logs/simple_log_processor_test.cc @@ -53,17 +53,6 @@ class TestExporter final : public LogExporter return ExportResult::kSuccess; } -# ifdef ENABLE_ASYNC_EXPORT - // Dummy Async Export implementation - void Export(const nostd::span> &records, - std::function - &&result_callback) noexcept override - { - auto result = Export(records); - result_callback(result); - } -# endif - // Increment the shutdown counter everytime this method is called bool Shutdown(std::chrono::microseconds timeout) noexcept override { @@ -148,14 +137,6 @@ class FailShutDownExporter final : public LogExporter return ExportResult::kSuccess; } -# ifdef ENABLE_ASYNC_EXPORT - void Export(const nostd::span> &records, - std::function - &&result_callback) noexcept override - { - result_callback(ExportResult::kSuccess); - } -# endif bool Shutdown(std::chrono::microseconds timeout) noexcept override { return false; } }; diff --git a/sdk/test/trace/CMakeLists.txt b/sdk/test/trace/CMakeLists.txt index 8e9402625b..b02ff705fa 100644 --- a/sdk/test/trace/CMakeLists.txt +++ b/sdk/test/trace/CMakeLists.txt @@ -8,8 +8,7 @@ foreach( always_on_sampler_test parent_sampler_test trace_id_ratio_sampler_test - batch_span_processor_test - async_batch_span_processor_test) + batch_span_processor_test) add_executable(${testname} "${testname}.cc") target_link_libraries( ${testname} diff --git a/sdk/test/trace/async_batch_span_processor_test.cc b/sdk/test/trace/async_batch_span_processor_test.cc deleted file mode 100644 index 1ac28c5bc9..0000000000 --- a/sdk/test/trace/async_batch_span_processor_test.cc +++ /dev/null @@ -1,375 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 -#ifndef ENABLE_ASYNC_EXPORT -# include -TEST(AsyncBatchSpanProcessor, DummyTest) -{ - // For linking -} -#endif - -#ifdef ENABLE_ASYNC_EXPORT - -# include "opentelemetry/sdk/trace/async_batch_span_processor.h" -# include "opentelemetry/sdk/trace/span_data.h" -# include "opentelemetry/sdk/trace/tracer.h" - -# include -# include -# include -# include -# include - -OPENTELEMETRY_BEGIN_NAMESPACE - -/** - * Returns a mock span exporter meant exclusively for testing only - */ -class MockSpanExporter final : public sdk::trace::SpanExporter -{ -public: - MockSpanExporter( - std::shared_ptr>> spans_received, - std::shared_ptr> is_shutdown, - std::shared_ptr> is_export_completed = - std::shared_ptr>(new std::atomic(false)), - const std::chrono::milliseconds export_delay = std::chrono::milliseconds(0), - int callback_count = 1) noexcept - : spans_received_(spans_received), - is_shutdown_(is_shutdown), - is_export_completed_(is_export_completed), - export_delay_(export_delay), - callback_count_(callback_count) - {} - - std::unique_ptr MakeRecordable() noexcept override - { - return std::unique_ptr(new sdk::trace::SpanData); - } - - sdk::common::ExportResult Export( - const nostd::span> &recordables) noexcept override - { - *is_export_completed_ = false; - - std::this_thread::sleep_for(export_delay_); - - for (auto &recordable : recordables) - { - auto span = std::unique_ptr( - static_cast(recordable.release())); - - if (span != nullptr) - { - spans_received_->push_back(std::move(span)); - } - } - - *is_export_completed_ = true; - return sdk::common::ExportResult::kSuccess; - } - - void Export(const nostd::span> &records, - std::function - &&result_callback) noexcept override - { - // We should keep the order of test records - auto result = Export(records); - async_threads_.emplace_back(std::make_shared( - [this, - result](std::function &&result_callback) { - for (int i = 0; i < callback_count_; i++) - { - result_callback(result); - } - }, - std::move(result_callback))); - } - - bool Shutdown( - std::chrono::microseconds timeout = std::chrono::microseconds::max()) noexcept override - { - while (!async_threads_.empty()) - { - std::list> async_threads; - async_threads.swap(async_threads_); - for (auto &async_thread : async_threads) - { - if (async_thread && async_thread->joinable()) - { - async_thread->join(); - } - } - } - *is_shutdown_ = true; - return true; - } - - bool IsExportCompleted() { return is_export_completed_->load(); } - -private: - std::shared_ptr>> spans_received_; - std::shared_ptr> is_shutdown_; - std::shared_ptr> is_export_completed_; - // Meant exclusively to test force flush timeout - const std::chrono::milliseconds export_delay_; - std::list> async_threads_; - int callback_count_; -}; - -/** - * Fixture Class - */ -class AsyncBatchSpanProcessorTestPeer : public testing::Test -{ -public: - std::unique_ptr>> GetTestSpans( - std::shared_ptr processor, - const int num_spans) - { - std::unique_ptr>> test_spans( - new std::vector>); - - for (int i = 0; i < num_spans; ++i) - { - test_spans->push_back(processor->MakeRecordable()); - static_cast(test_spans->at(i).get()) - ->SetName("Span " + std::to_string(i)); - } - - return test_spans; - } -}; - -/* ################################## TESTS ############################################ */ - -TEST_F(AsyncBatchSpanProcessorTestPeer, TestAsyncShutdown) -{ - std::shared_ptr>> spans_received( - new std::vector>); - std::shared_ptr> is_shutdown(new std::atomic(false)); - - sdk::trace::AsyncBatchSpanProcessorOptions options{}; - options.max_export_async = 5; - - auto batch_processor = - std::shared_ptr(new sdk::trace::AsyncBatchSpanProcessor( - std::unique_ptr(new MockSpanExporter(spans_received, is_shutdown)), - options)); - const int num_spans = 2048; - - auto test_spans = GetTestSpans(batch_processor, num_spans); - - for (int i = 0; i < num_spans; ++i) - { - batch_processor->OnEnd(std::move(test_spans->at(i))); - } - - EXPECT_TRUE(batch_processor->Shutdown(std::chrono::milliseconds(5000))); - // It's safe to shutdown again - EXPECT_TRUE(batch_processor->Shutdown()); - - EXPECT_EQ(num_spans, spans_received->size()); - for (int i = 0; i < num_spans; ++i) - { - EXPECT_EQ("Span " + std::to_string(i), spans_received->at(i)->GetName()); - } - - EXPECT_TRUE(is_shutdown->load()); -} - -TEST_F(AsyncBatchSpanProcessorTestPeer, TestAsyncShutdownNoCallback) -{ - std::shared_ptr> is_export_completed(new std::atomic(false)); - std::shared_ptr>> spans_received( - new std::vector>); - const std::chrono::milliseconds export_delay(0); - std::shared_ptr> is_shutdown(new std::atomic(false)); - - sdk::trace::AsyncBatchSpanProcessorOptions options{}; - options.max_export_async = 8; - - auto batch_processor = - std::shared_ptr(new sdk::trace::AsyncBatchSpanProcessor( - std::unique_ptr(new MockSpanExporter( - spans_received, is_shutdown, is_export_completed, export_delay, 0)), - options)); - const int num_spans = 2048; - - auto test_spans = GetTestSpans(batch_processor, num_spans); - - for (int i = 0; i < num_spans; ++i) - { - batch_processor->OnEnd(std::move(test_spans->at(i))); - } - - // Shutdown should never block for ever and return on timeout - EXPECT_TRUE(batch_processor->Shutdown(std::chrono::milliseconds(5000))); - // It's safe to shutdown again - EXPECT_TRUE(batch_processor->Shutdown()); - - EXPECT_TRUE(is_shutdown->load()); -} - -TEST_F(AsyncBatchSpanProcessorTestPeer, TestAsyncForceFlush) -{ - std::shared_ptr> is_shutdown(new std::atomic(false)); - std::shared_ptr>> spans_received( - new std::vector>); - - sdk::trace::AsyncBatchSpanProcessorOptions options{}; - - auto batch_processor = - std::shared_ptr(new sdk::trace::AsyncBatchSpanProcessor( - std::unique_ptr(new MockSpanExporter(spans_received, is_shutdown)), - options)); - const int num_spans = 2048; - - auto test_spans = GetTestSpans(batch_processor, num_spans); - - for (int i = 0; i < num_spans; ++i) - { - batch_processor->OnEnd(std::move(test_spans->at(i))); - } - - // Give some time to export - std::this_thread::sleep_for(std::chrono::milliseconds(50)); - - EXPECT_TRUE(batch_processor->ForceFlush()); - - EXPECT_EQ(num_spans, spans_received->size()); - for (int i = 0; i < num_spans; ++i) - { - EXPECT_EQ("Span " + std::to_string(i), spans_received->at(i)->GetName()); - } - - // Create some more spans to make sure that the processor still works - auto more_test_spans = GetTestSpans(batch_processor, num_spans); - for (int i = 0; i < num_spans; ++i) - { - batch_processor->OnEnd(std::move(more_test_spans->at(i))); - } - - // Give some time to export the spans - std::this_thread::sleep_for(std::chrono::milliseconds(50)); - - EXPECT_TRUE(batch_processor->ForceFlush()); - - EXPECT_EQ(num_spans * 2, spans_received->size()); - for (int i = 0; i < num_spans; ++i) - { - EXPECT_EQ("Span " + std::to_string(i % num_spans), - spans_received->at(num_spans + i)->GetName()); - } -} - -TEST_F(AsyncBatchSpanProcessorTestPeer, TestManySpansLoss) -{ - /* Test that when exporting more than max_queue_size spans, some are most likely lost*/ - - std::shared_ptr> is_shutdown(new std::atomic(false)); - std::shared_ptr>> spans_received( - new std::vector>); - - const int max_queue_size = 4096; - - auto batch_processor = - std::shared_ptr(new sdk::trace::AsyncBatchSpanProcessor( - std::unique_ptr(new MockSpanExporter(spans_received, is_shutdown)), - sdk::trace::AsyncBatchSpanProcessorOptions())); - - auto test_spans = GetTestSpans(batch_processor, max_queue_size); - - for (int i = 0; i < max_queue_size; ++i) - { - batch_processor->OnEnd(std::move(test_spans->at(i))); - } - - // Give some time to export the spans - std::this_thread::sleep_for(std::chrono::milliseconds(700)); - - EXPECT_TRUE(batch_processor->ForceFlush()); - - // Span should be exported by now - EXPECT_GE(max_queue_size, spans_received->size()); -} - -TEST_F(AsyncBatchSpanProcessorTestPeer, TestManySpansLossLess) -{ - /* Test that no spans are lost when sending max_queue_size spans */ - - std::shared_ptr> is_shutdown(new std::atomic(false)); - std::shared_ptr>> spans_received( - new std::vector>); - - const int num_spans = 2048; - - auto batch_processor = - std::shared_ptr(new sdk::trace::AsyncBatchSpanProcessor( - std::unique_ptr(new MockSpanExporter(spans_received, is_shutdown)), - sdk::trace::AsyncBatchSpanProcessorOptions())); - - auto test_spans = GetTestSpans(batch_processor, num_spans); - - for (int i = 0; i < num_spans; ++i) - { - batch_processor->OnEnd(std::move(test_spans->at(i))); - } - - // Give some time to export the spans - std::this_thread::sleep_for(std::chrono::milliseconds(50)); - - EXPECT_TRUE(batch_processor->ForceFlush()); - - EXPECT_EQ(num_spans, spans_received->size()); - for (int i = 0; i < num_spans; ++i) - { - EXPECT_EQ("Span " + std::to_string(i), spans_received->at(i)->GetName()); - } -} - -TEST_F(AsyncBatchSpanProcessorTestPeer, TestScheduleDelayMillis) -{ - /* Test that max_export_batch_size spans are exported every schedule_delay_millis - seconds */ - - std::shared_ptr> is_shutdown(new std::atomic(false)); - std::shared_ptr> is_export_completed(new std::atomic(false)); - std::shared_ptr>> spans_received( - new std::vector>); - const std::chrono::milliseconds export_delay(0); - const size_t max_export_batch_size = 512; - sdk::trace::AsyncBatchSpanProcessorOptions options{}; - options.schedule_delay_millis = std::chrono::milliseconds(2000); - - auto batch_processor = - std::shared_ptr(new sdk::trace::AsyncBatchSpanProcessor( - std::unique_ptr( - new MockSpanExporter(spans_received, is_shutdown, is_export_completed, export_delay)), - options)); - - auto test_spans = GetTestSpans(batch_processor, max_export_batch_size); - - for (size_t i = 0; i < max_export_batch_size; ++i) - { - batch_processor->OnEnd(std::move(test_spans->at(i))); - } - - // Sleep for schedule_delay_millis milliseconds - std::this_thread::sleep_for(options.schedule_delay_millis); - - // small delay to give time to export - std::this_thread::sleep_for(std::chrono::milliseconds(50)); - - // Spans should be exported by now - EXPECT_TRUE(is_export_completed->load()); - EXPECT_EQ(max_export_batch_size, spans_received->size()); - for (size_t i = 0; i < max_export_batch_size; ++i) - { - EXPECT_EQ("Span " + std::to_string(i), spans_received->at(i)->GetName()); - } -} - -OPENTELEMETRY_END_NAMESPACE - -#endif diff --git a/sdk/test/trace/batch_span_processor_test.cc b/sdk/test/trace/batch_span_processor_test.cc index 6f270b9766..b438abfff7 100644 --- a/sdk/test/trace/batch_span_processor_test.cc +++ b/sdk/test/trace/batch_span_processor_test.cc @@ -58,17 +58,6 @@ class MockSpanExporter final : public sdk::trace::SpanExporter return sdk::common::ExportResult::kSuccess; } -#ifdef ENABLE_ASYNC_EXPORT - void Export(const nostd::span> &records, - std::function - &&result_callback) noexcept override - { - // This is just dummy implementation. - auto result = Export(records); - result_callback(result); - } -#endif - bool Shutdown( std::chrono::microseconds timeout = std::chrono::microseconds::max()) noexcept override { diff --git a/sdk/test/trace/simple_processor_test.cc b/sdk/test/trace/simple_processor_test.cc index aa59fa850c..9398b922a5 100644 --- a/sdk/test/trace/simple_processor_test.cc +++ b/sdk/test/trace/simple_processor_test.cc @@ -51,15 +51,6 @@ class RecordShutdownExporter final : public SpanExporter return ExportResult::kSuccess; } -#ifdef ENABLE_ASYNC_EXPORT - void Export(const opentelemetry::nostd::span> &spans, - std::function - &&result_callback) noexcept override - { - result_callback(ExportResult::kSuccess); - } -#endif - bool Shutdown( std::chrono::microseconds timeout = std::chrono::microseconds::max()) noexcept override {