From c614258e5633b2bf21e16fc8d3a479fcadfceea0 Mon Sep 17 00:00:00 2001 From: DEBAJIT DAS <85024550+DebajitDas@users.noreply.github.com> Date: Wed, 4 May 2022 22:55:20 +0530 Subject: [PATCH] Added max async export support using separate AsyncBatchSpan/LogProcessor (#1306) --- .github/workflows/ci.yml | 22 + CHANGELOG.md | 1 + ci/do_ci.ps1 | 6 +- ci/do_ci.sh | 39 +- .../otlp/test/otlp_http_exporter_test.cc | 219 ++++++++-- .../otlp/test/otlp_http_log_exporter_test.cc | 249 ++++++++++-- .../sdk/logs/async_batch_log_processor.h | 104 +++++ .../sdk/logs/batch_log_processor.h | 32 +- .../sdk/trace/async_batch_span_processor.h | 103 +++++ .../sdk/trace/batch_span_processor.h | 29 +- sdk/src/logs/CMakeLists.txt | 1 + sdk/src/logs/async_batch_log_processor.cc | 194 +++++++++ sdk/src/logs/batch_log_processor.cc | 112 ++---- sdk/src/trace/CMakeLists.txt | 1 + sdk/src/trace/async_batch_span_processor.cc | 187 +++++++++ sdk/src/trace/batch_span_processor.cc | 103 ++--- sdk/test/logs/CMakeLists.txt | 7 +- .../logs/async_batch_log_processor_test.cc | 374 +++++++++++++++++ sdk/test/logs/batch_log_processor_test.cc | 132 +----- sdk/test/trace/CMakeLists.txt | 3 +- .../trace/async_batch_span_processor_test.cc | 375 ++++++++++++++++++ sdk/test/trace/batch_span_processor_test.cc | 111 +----- 22 files changed, 1871 insertions(+), 533 deletions(-) create mode 100644 sdk/include/opentelemetry/sdk/logs/async_batch_log_processor.h create mode 100644 sdk/include/opentelemetry/sdk/trace/async_batch_span_processor.h create mode 100644 sdk/src/logs/async_batch_log_processor.cc create mode 100644 sdk/src/trace/async_batch_span_processor.cc create mode 100644 sdk/test/logs/async_batch_log_processor_test.cc create mode 100644 sdk/test/trace/async_batch_span_processor_test.cc diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 7030c719d5..dea6b14c92 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -150,6 +150,28 @@ jobs: - name: run tests run: ./ci/do_ci.sh bazel.test + bazel_test_async: + name: Bazel with async export + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v3 + with: + submodules: 'recursive' + - name: Mount Bazel Cache + uses: actions/cache@v3 + env: + cache-name: bazel_cache + with: + path: /home/runner/.cache/bazel + key: bazel_test + - name: setup + run: | + sudo ./ci/setup_thrift.sh dependencies_only + sudo ./ci/setup_ci_environment.sh + sudo ./ci/install_bazelisk.sh + - name: run tests + run: ./ci/do_ci.sh bazel.with_async_export + bazel_with_abseil: name: Bazel with external abseil runs-on: ubuntu-latest diff --git a/CHANGELOG.md b/CHANGELOG.md index ffc68d6a1f..b7d0597672 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,6 +15,7 @@ Increment the: ## [Unreleased] +* [SDK] Async Batch Span/Log processor with max async support ([#1306](https://github.com/open-telemetry/opentelemetry-cpp/pull/1306)) * [EXPORTER] OTLP http exporter allow concurrency session ([#1209](https://github.com/open-telemetry/opentelemetry-cpp/pull/1209)) ## [1.3.0] 2022-04-11 diff --git a/ci/do_ci.ps1 b/ci/do_ci.ps1 index ca7f8abf83..7b60632422 100644 --- a/ci/do_ci.ps1 +++ b/ci/do_ci.ps1 @@ -5,7 +5,7 @@ $action = $args[0] $SRC_DIR=(Get-Item -Path ".\").FullName -$BAZEL_OPTIONS="--copt=-DENABLE_METRICS_PREVIEW --copt=-DENABLE_LOGS_PREVIEW" +$BAZEL_OPTIONS="--copt=-DENABLE_METRICS_PREVIEW --copt=-DENABLE_LOGS_PREVIEW --copt=-DENABLE_ASYNC_EXPORT" $BAZEL_TEST_OPTIONS="$BAZEL_OPTIONS --test_output=errors" if (!(test-path build)) { @@ -32,6 +32,7 @@ switch ($action) { cd "$BUILD_DIR" cmake $SRC_DIR ` -DVCPKG_TARGET_TRIPLET=x64-windows ` + -DWITH_ASYNC_EXPORT_PREVIEW=ON ` "-DCMAKE_TOOLCHAIN_FILE=$VCPKG_DIR\scripts\buildsystems\vcpkg.cmake" $exit = $LASTEXITCODE if ($exit -ne 0) { @@ -52,6 +53,7 @@ switch ($action) { cd "$BUILD_DIR" cmake $SRC_DIR ` -DVCPKG_TARGET_TRIPLET=x64-windows ` + -DWITH_ASYNC_EXPORT_PREVIEW=ON ` -DWITH_OTPROTCOL=ON ` "-DCMAKE_TOOLCHAIN_FILE=$VCPKG_DIR\scripts\buildsystems\vcpkg.cmake" $exit = $LASTEXITCODE @@ -73,6 +75,7 @@ switch ($action) { cd "$BUILD_DIR" cmake $SRC_DIR ` -DVCPKG_TARGET_TRIPLET=x64-windows ` + -DWITH_ASYNC_EXPORT_PREVIEW=ON ` "-DCMAKE_TOOLCHAIN_FILE=$VCPKG_DIR\scripts\buildsystems\vcpkg.cmake" $exit = $LASTEXITCODE if ($exit -ne 0) { @@ -89,6 +92,7 @@ switch ($action) { cd "$BUILD_DIR" cmake $SRC_DIR ` -DVCPKG_TARGET_TRIPLET=x64-windows ` + -DWITH_ASYNC_EXPORT_PREVIEW=ON ` "-DCMAKE_TOOLCHAIN_FILE=$VCPKG_DIR\scripts\buildsystems\vcpkg.cmake" $exit = $LASTEXITCODE if ($exit -ne 0) { diff --git a/ci/do_ci.sh b/ci/do_ci.sh index 008e983842..2fa202af89 100755 --- a/ci/do_ci.sh +++ b/ci/do_ci.sh @@ -25,7 +25,7 @@ function run_benchmarks [ -z "${BENCHMARK_DIR}" ] && export BENCHMARK_DIR=$HOME/benchmark mkdir -p $BENCHMARK_DIR - bazel $BAZEL_STARTUP_OPTIONS build $BAZEL_OPTIONS -c opt -- \ + bazel $BAZEL_STARTUP_OPTIONS build $BAZEL_OPTIONS_ASYNC -c opt -- \ $(bazel query 'attr("tags", "benchmark_result", ...)') echo "" echo "Benchmark results in $BENCHMARK_DIR:" @@ -66,8 +66,11 @@ if [[ "$1" != "bazel.nortti" ]]; then fi BAZEL_TEST_OPTIONS="$BAZEL_OPTIONS --test_output=errors" +BAZEL_OPTIONS_ASYNC="--copt=-DENABLE_METRICS_PREVIEW --copt=-DENABLE_LOGS_PREVIEW --copt=-DENABLE_TEST --copt=-DENABLE_ASYNC_EXPORT" +BAZEL_TEST_OPTIONS_ASYNC="$BAZEL_OPTIONS_ASYNC --test_output=errors" + # https://github.com/bazelbuild/bazel/issues/4341 -BAZEL_MACOS_OPTIONS="$BAZEL_OPTIONS --features=-supports_dynamic_linker --build_tag_filters=-jaeger" +BAZEL_MACOS_OPTIONS="$BAZEL_OPTIONS_ASYNC --features=-supports_dynamic_linker --build_tag_filters=-jaeger" BAZEL_MACOS_TEST_OPTIONS="$BAZEL_MACOS_OPTIONS --test_output=errors" BAZEL_STARTUP_OPTIONS="--output_user_root=$HOME/.cache/bazel" @@ -85,6 +88,7 @@ if [[ "$1" == "cmake.test" ]]; then -DWITH_METRICS_PREVIEW=ON \ -DWITH_LOGS_PREVIEW=ON \ -DCMAKE_CXX_FLAGS="-Werror" \ + -DWITH_ASYNC_EXPORT_PREVIEW=ON \ "${SRC_DIR}" make make test @@ -96,6 +100,7 @@ elif [[ "$1" == "cmake.abseil.test" ]]; then -DWITH_METRICS_PREVIEW=ON \ -DWITH_LOGS_PREVIEW=ON \ -DCMAKE_CXX_FLAGS="-Werror" \ + -DWITH_ASYNC_EXPORT_PREVIEW=ON \ -DWITH_ABSEIL=ON \ "${SRC_DIR}" make @@ -106,6 +111,7 @@ elif [[ "$1" == "cmake.c++20.test" ]]; then rm -rf * cmake -DCMAKE_BUILD_TYPE=Debug \ -DCMAKE_CXX_FLAGS="-Werror" \ + -DWITH_ASYNC_EXPORT_PREVIEW=ON \ -DCMAKE_CXX_STANDARD=20 \ "${SRC_DIR}" make @@ -118,6 +124,7 @@ elif [[ "$1" == "cmake.c++20.stl.test" ]]; then -DWITH_METRICS_PREVIEW=ON \ -DWITH_LOGS_PREVIEW=ON \ -DCMAKE_CXX_FLAGS="-Werror" \ + -DWITH_ASYNC_EXPORT_PREVIEW=ON \ -DWITH_STL=ON \ "${SRC_DIR}" make @@ -145,6 +152,7 @@ elif [[ "$1" == "cmake.legacy.exporter.otprotocol.test" ]]; then cmake -DCMAKE_BUILD_TYPE=Debug \ -DCMAKE_CXX_STANDARD=11 \ -DWITH_OTLP=ON \ + -DWITH_ASYNC_EXPORT_PREVIEW=ON \ "${SRC_DIR}" grpc_cpp_plugin=`which grpc_cpp_plugin` proto_make_file="CMakeFiles/opentelemetry_proto.dir/build.make" @@ -157,6 +165,7 @@ elif [[ "$1" == "cmake.exporter.otprotocol.test" ]]; then rm -rf * cmake -DCMAKE_BUILD_TYPE=Debug \ -DWITH_OTLP=ON \ + -DWITH_ASYNC_EXPORT_PREVIEW=ON \ "${SRC_DIR}" grpc_cpp_plugin=`which grpc_cpp_plugin` proto_make_file="CMakeFiles/opentelemetry_proto.dir/build.make" @@ -165,8 +174,8 @@ elif [[ "$1" == "cmake.exporter.otprotocol.test" ]]; then cd exporters/otlp && make test exit 0 elif [[ "$1" == "bazel.with_abseil" ]]; then - bazel $BAZEL_STARTUP_OPTIONS build $BAZEL_OPTIONS --//api:with_abseil=true //... - bazel $BAZEL_STARTUP_OPTIONS test $BAZEL_TEST_OPTIONS --//api:with_abseil=true //... + bazel $BAZEL_STARTUP_OPTIONS build $BAZEL_OPTIONS_ASYNC --//api:with_abseil=true //... + bazel $BAZEL_STARTUP_OPTIONS test $BAZEL_TEST_OPTIONS_ASYNC --//api:with_abseil=true //... exit 0 elif [[ "$1" == "cmake.test_example_plugin" ]]; then # Build the plugin @@ -206,6 +215,10 @@ elif [[ "$1" == "bazel.test" ]]; then bazel $BAZEL_STARTUP_OPTIONS build $BAZEL_OPTIONS //... bazel $BAZEL_STARTUP_OPTIONS test $BAZEL_TEST_OPTIONS //... exit 0 +elif [[ "$1" == "bazel.with_async_export" ]]; then + bazel $BAZEL_STARTUP_OPTIONS build $BAZEL_OPTIONS_ASYNC //... + bazel $BAZEL_STARTUP_OPTIONS test $BAZEL_TEST_OPTIONS_ASYNC //... + exit 0 elif [[ "$1" == "bazel.benchmark" ]]; then run_benchmarks exit 0 @@ -216,14 +229,14 @@ elif [[ "$1" == "bazel.macos.test" ]]; then elif [[ "$1" == "bazel.legacy.test" ]]; then # we uses C++ future and async() function to test the Prometheus Exporter functionality, # that make this test always fail. ignore Prometheus exporter here. - bazel $BAZEL_STARTUP_OPTIONS build $BAZEL_OPTIONS -- //... -//exporters/otlp/... -//exporters/prometheus/... - bazel $BAZEL_STARTUP_OPTIONS test $BAZEL_TEST_OPTIONS -- //... -//exporters/otlp/... -//exporters/prometheus/... + bazel $BAZEL_STARTUP_OPTIONS build $BAZEL_OPTIONS_ASYNC -- //... -//exporters/otlp/... -//exporters/prometheus/... + bazel $BAZEL_STARTUP_OPTIONS test $BAZEL_TEST_OPTIONS_ASYNC -- //... -//exporters/otlp/... -//exporters/prometheus/... exit 0 elif [[ "$1" == "bazel.noexcept" ]]; then # there are some exceptions and error handling code from the Prometheus and Jaeger Clients # that make this test always fail. ignore Prometheus and Jaeger exporters in the noexcept here. - bazel $BAZEL_STARTUP_OPTIONS build --copt=-fno-exceptions --build_tag_filters=-jaeger $BAZEL_OPTIONS -- //... -//exporters/prometheus/... -//exporters/jaeger/... - bazel $BAZEL_STARTUP_OPTIONS test --copt=-fno-exceptions --build_tag_filters=-jaeger $BAZEL_TEST_OPTIONS -- //... -//exporters/prometheus/... -//exporters/jaeger/... + bazel $BAZEL_STARTUP_OPTIONS build --copt=-fno-exceptions --build_tag_filters=-jaeger $BAZEL_OPTIONS_ASYNC -- //... -//exporters/prometheus/... -//exporters/jaeger/... + bazel $BAZEL_STARTUP_OPTIONS test --copt=-fno-exceptions --build_tag_filters=-jaeger $BAZEL_TEST_OPTIONS_ASYNC -- //... -//exporters/prometheus/... -//exporters/jaeger/... exit 0 elif [[ "$1" == "bazel.nortti" ]]; then # there are some exceptions and error handling code from the Prometheus and Jaeger Clients @@ -232,18 +245,18 @@ elif [[ "$1" == "bazel.nortti" ]]; then bazel $BAZEL_STARTUP_OPTIONS test --cxxopt=-fno-rtti --build_tag_filters=-jaeger $BAZEL_TEST_OPTIONS -- //... -//exporters/prometheus/... -//exporters/jaeger/... exit 0 elif [[ "$1" == "bazel.asan" ]]; then - bazel $BAZEL_STARTUP_OPTIONS test --config=asan $BAZEL_TEST_OPTIONS //... + bazel $BAZEL_STARTUP_OPTIONS test --config=asan $BAZEL_TEST_OPTIONS_ASYNC //... exit 0 elif [[ "$1" == "bazel.tsan" ]]; then - bazel $BAZEL_STARTUP_OPTIONS test --config=tsan $BAZEL_TEST_OPTIONS //... + bazel $BAZEL_STARTUP_OPTIONS test --config=tsan $BAZEL_TEST_OPTIONS_ASYNC //... exit 0 elif [[ "$1" == "bazel.valgrind" ]]; then - bazel $BAZEL_STARTUP_OPTIONS build $BAZEL_OPTIONS //... - bazel $BAZEL_STARTUP_OPTIONS test --run_under="/usr/bin/valgrind --leak-check=full --error-exitcode=1 --suppressions=\"${SRC_DIR}/ci/valgrind-suppressions\"" $BAZEL_TEST_OPTIONS //... + bazel $BAZEL_STARTUP_OPTIONS build $BAZEL_OPTIONS_ASYNC //... + bazel $BAZEL_STARTUP_OPTIONS test --run_under="/usr/bin/valgrind --leak-check=full --error-exitcode=1 --suppressions=\"${SRC_DIR}/ci/valgrind-suppressions\"" $BAZEL_TEST_OPTIONS_ASYNC //... exit 0 elif [[ "$1" == "benchmark" ]]; then [ -z "${BENCHMARK_DIR}" ] && export BENCHMARK_DIR=$HOME/benchmark - bazel $BAZEL_STARTUP_OPTIONS build $BAZEL_OPTIONS -c opt -- \ + bazel $BAZEL_STARTUP_OPTIONS build $BAZEL_OPTIONS_ASYNC -c opt -- \ $(bazel query 'attr("tags", "benchmark_result", ...)') echo "" echo "Benchmark results in $BENCHMARK_DIR:" diff --git a/exporters/otlp/test/otlp_http_exporter_test.cc b/exporters/otlp/test/otlp_http_exporter_test.cc index 9d7cd2a5ce..c25c0111b2 100644 --- a/exporters/otlp/test/otlp_http_exporter_test.cc +++ b/exporters/otlp/test/otlp_http_exporter_test.cc @@ -17,6 +17,7 @@ # include "opentelemetry/ext/http/client/http_client_factory.h" # include "opentelemetry/ext/http/client/nosend/http_client_nosend.h" # include "opentelemetry/ext/http/server/http_server.h" +# include "opentelemetry/sdk/trace/async_batch_span_processor.h" # include "opentelemetry/sdk/trace/batch_span_processor.h" # include "opentelemetry/sdk/trace/tracer_provider.h" # include "opentelemetry/trace/provider.h" @@ -85,7 +86,7 @@ class OtlpHttpExporterTestPeer : public ::testing::Test return {new OtlpHttpClient(MakeOtlpHttpClientOptions(content_type), http_client), http_client}; } - void ExportJsonIntegrationTest(bool is_async) + void ExportJsonIntegrationTest() { auto mock_otlp_client = OtlpHttpExporterTestPeer::GetMockOtlpHttpClient(HttpRequestContentType::kJson); @@ -114,9 +115,7 @@ class OtlpHttpExporterTestPeer : public ::testing::Test processor_opts.max_export_batch_size = 5; processor_opts.max_queue_size = 5; processor_opts.schedule_delay_millis = std::chrono::milliseconds(256); -# ifdef ENABLE_ASYNC_EXPORT - processor_opts.is_export_async = is_async; -# endif + auto processor = std::unique_ptr( new sdk::trace::BatchSpanProcessor(std::move(exporter), processor_opts)); auto provider = nostd::shared_ptr( @@ -142,7 +141,7 @@ class OtlpHttpExporterTestPeer : public ::testing::Test auto mock_session = std::static_pointer_cast(no_send_client->session_); EXPECT_CALL(*mock_session, SendRequest) - .WillOnce([&mock_session, report_trace_id, is_async]( + .WillOnce([&mock_session, report_trace_id]( std::shared_ptr callback) { auto check_json = nlohmann::json::parse(mock_session->GetRequest()->body_, nullptr, false); @@ -161,20 +160,97 @@ class OtlpHttpExporterTestPeer : public ::testing::Test } // let the otlp_http_client to continue - if (is_async) + http_client::nosend::Response response; + response.Finish(*callback.get()); + }); + + child_span->End(); + parent_span->End(); + + static_cast(provider.get())->ForceFlush(); + } + +# ifdef ENABLE_ASYNC_EXPORT + void ExportJsonIntegrationTestAsync() + { + auto mock_otlp_client = + OtlpHttpExporterTestPeer::GetMockOtlpHttpClient(HttpRequestContentType::kJson); + auto mock_otlp_http_client = mock_otlp_client.first; + auto client = mock_otlp_client.second; + auto exporter = GetExporter(std::unique_ptr{mock_otlp_http_client}); + + resource::ResourceAttributes resource_attributes = {{"service.name", "unit_test_service"}, + {"tenant.id", "test_user"}}; + resource_attributes["bool_value"] = true; + resource_attributes["int32_value"] = static_cast(1); + resource_attributes["uint32_value"] = static_cast(2); + resource_attributes["int64_value"] = static_cast(0x1100000000LL); + resource_attributes["uint64_value"] = static_cast(0x1200000000ULL); + resource_attributes["double_value"] = static_cast(3.1); + resource_attributes["vec_bool_value"] = std::vector{true, false, true}; + resource_attributes["vec_int32_value"] = std::vector{1, 2}; + resource_attributes["vec_uint32_value"] = std::vector{3, 4}; + resource_attributes["vec_int64_value"] = std::vector{5, 6}; + resource_attributes["vec_uint64_value"] = std::vector{7, 8}; + resource_attributes["vec_double_value"] = std::vector{3.2, 3.3}; + resource_attributes["vec_string_value"] = std::vector{"vector", "string"}; + auto resource = resource::Resource::Create(resource_attributes); + + auto processor_opts = sdk::trace::AsyncBatchSpanProcessorOptions(); + processor_opts.max_export_batch_size = 5; + processor_opts.max_queue_size = 5; + processor_opts.schedule_delay_millis = std::chrono::milliseconds(256); + + auto processor = std::unique_ptr( + new sdk::trace::AsyncBatchSpanProcessor(std::move(exporter), processor_opts)); + auto provider = nostd::shared_ptr( + new sdk::trace::TracerProvider(std::move(processor), resource)); + + std::string report_trace_id; + + char trace_id_hex[2 * trace_api::TraceId::kSize] = {0}; + auto tracer = provider->GetTracer("test"); + auto parent_span = tracer->StartSpan("Test parent span"); + + trace_api::StartSpanOptions child_span_opts = {}; + child_span_opts.parent = parent_span->GetContext(); + + auto child_span = tracer->StartSpan("Test child span", child_span_opts); + + nostd::get(child_span_opts.parent) + .trace_id() + .ToLowerBase16(MakeSpan(trace_id_hex)); + report_trace_id.assign(trace_id_hex, sizeof(trace_id_hex)); + + auto no_send_client = std::static_pointer_cast(client); + auto mock_session = + std::static_pointer_cast(no_send_client->session_); + EXPECT_CALL(*mock_session, SendRequest) + .WillOnce([&mock_session, report_trace_id]( + std::shared_ptr callback) { + auto check_json = + nlohmann::json::parse(mock_session->GetRequest()->body_, nullptr, false); + auto resource_span = *check_json["resource_spans"].begin(); + auto instrumentation_library_span = + *resource_span["instrumentation_library_spans"].begin(); + auto span = *instrumentation_library_span["spans"].begin(); + auto received_trace_id = span["trace_id"].get(); + EXPECT_EQ(received_trace_id, report_trace_id); + + auto custom_header = mock_session->GetRequest()->headers_.find("Custom-Header-Key"); + ASSERT_TRUE(custom_header != mock_session->GetRequest()->headers_.end()); + if (custom_header != mock_session->GetRequest()->headers_.end()) { - std::thread async_finish{[callback]() { - std::this_thread::sleep_for(std::chrono::milliseconds(100)); - http_client::nosend::Response response; - response.Finish(*callback.get()); - }}; - async_finish.detach(); + EXPECT_EQ("Custom-Header-Value", custom_header->second); } - else - { + + // let the otlp_http_client to continue + std::thread async_finish{[callback]() { + std::this_thread::sleep_for(std::chrono::milliseconds(100)); http_client::nosend::Response response; response.Finish(*callback.get()); - } + }}; + async_finish.detach(); }); child_span->End(); @@ -182,8 +258,9 @@ class OtlpHttpExporterTestPeer : public ::testing::Test static_cast(provider.get())->ForceFlush(); } +# endif - void ExportBinaryIntegrationTest(bool is_async) + void ExportBinaryIntegrationTest() { auto mock_otlp_client = OtlpHttpExporterTestPeer::GetMockOtlpHttpClient(HttpRequestContentType::kBinary); @@ -212,9 +289,7 @@ class OtlpHttpExporterTestPeer : public ::testing::Test processor_opts.max_export_batch_size = 5; processor_opts.max_queue_size = 5; processor_opts.schedule_delay_millis = std::chrono::milliseconds(256); -# ifdef ENABLE_ASYNC_EXPORT - processor_opts.is_export_async = is_async; -# endif + auto processor = std::unique_ptr( new sdk::trace::BatchSpanProcessor(std::move(exporter), processor_opts)); auto provider = nostd::shared_ptr( @@ -239,7 +314,7 @@ class OtlpHttpExporterTestPeer : public ::testing::Test auto mock_session = std::static_pointer_cast(no_send_client->session_); EXPECT_CALL(*mock_session, SendRequest) - .WillOnce([&mock_session, report_trace_id, is_async]( + .WillOnce([&mock_session, report_trace_id]( std::shared_ptr callback) { opentelemetry::proto::collector::trace::v1::ExportTraceServiceRequest request_body; request_body.ParseFromArray(&mock_session->GetRequest()->body_[0], @@ -255,21 +330,94 @@ class OtlpHttpExporterTestPeer : public ::testing::Test EXPECT_EQ("Custom-Header-Value", custom_header->second); } - // let the otlp_http_client to continue - if (is_async) + http_client::nosend::Response response; + response.Finish(*callback.get()); + }); + + child_span->End(); + parent_span->End(); + + static_cast(provider.get())->ForceFlush(); + } + +# ifdef ENABLE_ASYNC_EXPORT + void ExportBinaryIntegrationTestAsync() + { + auto mock_otlp_client = + OtlpHttpExporterTestPeer::GetMockOtlpHttpClient(HttpRequestContentType::kBinary); + auto mock_otlp_http_client = mock_otlp_client.first; + auto client = mock_otlp_client.second; + auto exporter = GetExporter(std::unique_ptr{mock_otlp_http_client}); + + resource::ResourceAttributes resource_attributes = {{"service.name", "unit_test_service"}, + {"tenant.id", "test_user"}}; + resource_attributes["bool_value"] = true; + resource_attributes["int32_value"] = static_cast(1); + resource_attributes["uint32_value"] = static_cast(2); + resource_attributes["int64_value"] = static_cast(0x1100000000LL); + resource_attributes["uint64_value"] = static_cast(0x1200000000ULL); + resource_attributes["double_value"] = static_cast(3.1); + resource_attributes["vec_bool_value"] = std::vector{true, false, true}; + resource_attributes["vec_int32_value"] = std::vector{1, 2}; + resource_attributes["vec_uint32_value"] = std::vector{3, 4}; + resource_attributes["vec_int64_value"] = std::vector{5, 6}; + resource_attributes["vec_uint64_value"] = std::vector{7, 8}; + resource_attributes["vec_double_value"] = std::vector{3.2, 3.3}; + resource_attributes["vec_string_value"] = std::vector{"vector", "string"}; + auto resource = resource::Resource::Create(resource_attributes); + + auto processor_opts = sdk::trace::AsyncBatchSpanProcessorOptions(); + processor_opts.max_export_batch_size = 5; + processor_opts.max_queue_size = 5; + processor_opts.schedule_delay_millis = std::chrono::milliseconds(256); + + auto processor = std::unique_ptr( + new sdk::trace::AsyncBatchSpanProcessor(std::move(exporter), processor_opts)); + auto provider = nostd::shared_ptr( + new sdk::trace::TracerProvider(std::move(processor), resource)); + + std::string report_trace_id; + + uint8_t trace_id_binary[trace_api::TraceId::kSize] = {0}; + auto tracer = provider->GetTracer("test"); + auto parent_span = tracer->StartSpan("Test parent span"); + + trace_api::StartSpanOptions child_span_opts = {}; + child_span_opts.parent = parent_span->GetContext(); + + auto child_span = tracer->StartSpan("Test child span", child_span_opts); + nostd::get(child_span_opts.parent) + .trace_id() + .CopyBytesTo(MakeSpan(trace_id_binary)); + report_trace_id.assign(reinterpret_cast(trace_id_binary), sizeof(trace_id_binary)); + + auto no_send_client = std::static_pointer_cast(client); + auto mock_session = + std::static_pointer_cast(no_send_client->session_); + EXPECT_CALL(*mock_session, SendRequest) + .WillOnce([&mock_session, report_trace_id]( + std::shared_ptr callback) { + opentelemetry::proto::collector::trace::v1::ExportTraceServiceRequest request_body; + request_body.ParseFromArray(&mock_session->GetRequest()->body_[0], + static_cast(mock_session->GetRequest()->body_.size())); + auto received_trace_id = + request_body.resource_spans(0).instrumentation_library_spans(0).spans(0).trace_id(); + EXPECT_EQ(received_trace_id, report_trace_id); + + auto custom_header = mock_session->GetRequest()->headers_.find("Custom-Header-Key"); + ASSERT_TRUE(custom_header != mock_session->GetRequest()->headers_.end()); + if (custom_header != mock_session->GetRequest()->headers_.end()) { - std::thread async_finish{[callback]() { - std::this_thread::sleep_for(std::chrono::milliseconds(100)); - http_client::nosend::Response response; - response.Finish(*callback.get()); - }}; - async_finish.detach(); + EXPECT_EQ("Custom-Header-Value", custom_header->second); } - else - { + + // let the otlp_http_client to continue + std::thread async_finish{[callback]() { + std::this_thread::sleep_for(std::chrono::milliseconds(100)); http_client::nosend::Response response; response.Finish(*callback.get()); - } + }}; + async_finish.detach(); }); child_span->End(); @@ -277,31 +425,32 @@ class OtlpHttpExporterTestPeer : public ::testing::Test static_cast(provider.get())->ForceFlush(); } +# endif }; // Create spans, let processor call Export() TEST_F(OtlpHttpExporterTestPeer, ExportJsonIntegrationTestSync) { - ExportJsonIntegrationTest(false); + ExportJsonIntegrationTest(); } # ifdef ENABLE_ASYNC_EXPORT TEST_F(OtlpHttpExporterTestPeer, ExportJsonIntegrationTestAsync) { - ExportJsonIntegrationTest(true); + ExportJsonIntegrationTestAsync(); } # endif // Create spans, let processor call Export() TEST_F(OtlpHttpExporterTestPeer, ExportBinaryIntegrationTestSync) { - ExportBinaryIntegrationTest(false); + ExportBinaryIntegrationTest(); } # ifdef ENABLE_ASYNC_EXPORT TEST_F(OtlpHttpExporterTestPeer, ExportBinaryIntegrationTestAsync) { - ExportBinaryIntegrationTest(true); + ExportBinaryIntegrationTestAsync(); } # endif diff --git a/exporters/otlp/test/otlp_http_log_exporter_test.cc b/exporters/otlp/test/otlp_http_log_exporter_test.cc index 17723f2ac1..bdeedf95e8 100644 --- a/exporters/otlp/test/otlp_http_log_exporter_test.cc +++ b/exporters/otlp/test/otlp_http_log_exporter_test.cc @@ -20,6 +20,7 @@ # include "opentelemetry/ext/http/client/nosend/http_client_nosend.h" # include "opentelemetry/ext/http/server/http_server.h" # include "opentelemetry/logs/provider.h" +# include "opentelemetry/sdk/logs/async_batch_log_processor.h" # include "opentelemetry/sdk/logs/batch_log_processor.h" # include "opentelemetry/sdk/logs/exporter.h" # include "opentelemetry/sdk/logs/log_record.h" @@ -86,7 +87,7 @@ class OtlpHttpLogExporterTestPeer : public ::testing::Test return {new OtlpHttpClient(MakeOtlpHttpClientOptions(content_type), http_client), http_client}; } - void ExportJsonIntegrationTest(bool is_async) + void ExportJsonIntegrationTest() { auto mock_otlp_client = OtlpHttpLogExporterTestPeer::GetMockOtlpHttpClient(HttpRequestContentType::kJson); @@ -103,9 +104,10 @@ class OtlpHttpLogExporterTestPeer : public ::testing::Test std::string attribute_storage_string_value[] = {"vector", "string"}; auto provider = nostd::shared_ptr(new sdk::logs::LoggerProvider()); + provider->AddProcessor( std::unique_ptr(new sdk::logs::BatchLogProcessor( - std::move(exporter), 5, std::chrono::milliseconds(256), 5, is_async))); + std::move(exporter), 5, std::chrono::milliseconds(256), 5))); std::string report_trace_id; std::string report_span_id; @@ -131,7 +133,7 @@ class OtlpHttpLogExporterTestPeer : public ::testing::Test auto mock_session = std::static_pointer_cast(no_send_client->session_); EXPECT_CALL(*mock_session, SendRequest) - .WillOnce([&mock_session, report_trace_id, report_span_id, is_async]( + .WillOnce([&mock_session, report_trace_id, report_span_id]( std::shared_ptr callback) { auto check_json = nlohmann::json::parse(mock_session->GetRequest()->body_, nullptr, false); @@ -153,21 +155,112 @@ class OtlpHttpLogExporterTestPeer : public ::testing::Test EXPECT_EQ("Custom-Header-Value", custom_header->second); } - // let the otlp_http_client to continue - if (is_async) + http_client::nosend::Response response; + response.Finish(*callback.get()); + }); + + logger->Log(opentelemetry::logs::Severity::kInfo, "Log name", "Log message", + {{"service.name", "unit_test_service"}, + {"tenant.id", "test_user"}, + {"bool_value", true}, + {"int32_value", static_cast(1)}, + {"uint32_value", static_cast(2)}, + {"int64_value", static_cast(0x1100000000LL)}, + {"uint64_value", static_cast(0x1200000000ULL)}, + {"double_value", static_cast(3.1)}, + {"vec_bool_value", attribute_storage_bool_value}, + {"vec_int32_value", attribute_storage_int32_value}, + {"vec_uint32_value", attribute_storage_uint32_value}, + {"vec_int64_value", attribute_storage_int64_value}, + {"vec_uint64_value", attribute_storage_uint64_value}, + {"vec_double_value", attribute_storage_double_value}, + {"vec_string_value", attribute_storage_string_value}}, + trace_id, span_id, + opentelemetry::trace::TraceFlags{opentelemetry::trace::TraceFlags::kIsSampled}, + std::chrono::system_clock::now()); + + provider->ForceFlush(); + } + +# ifdef ENABLE_ASYNC_EXPORT + void ExportJsonIntegrationTestAsync() + { + auto mock_otlp_client = + OtlpHttpLogExporterTestPeer::GetMockOtlpHttpClient(HttpRequestContentType::kJson); + auto mock_otlp_http_client = mock_otlp_client.first; + auto client = mock_otlp_client.second; + auto exporter = GetExporter(std::unique_ptr{mock_otlp_http_client}); + + bool attribute_storage_bool_value[] = {true, false, true}; + int32_t attribute_storage_int32_value[] = {1, 2}; + uint32_t attribute_storage_uint32_value[] = {3, 4}; + int64_t attribute_storage_int64_value[] = {5, 6}; + uint64_t attribute_storage_uint64_value[] = {7, 8}; + double attribute_storage_double_value[] = {3.2, 3.3}; + std::string attribute_storage_string_value[] = {"vector", "string"}; + + auto provider = nostd::shared_ptr(new sdk::logs::LoggerProvider()); + sdk::logs::AsyncBatchLogProcessorOptions options; + options.max_queue_size = 5; + options.schedule_delay_millis = std::chrono::milliseconds(256); + options.max_export_batch_size = 5; + + provider->AddProcessor(std::unique_ptr( + new sdk::logs::AsyncBatchLogProcessor(std::move(exporter), options))); + + std::string report_trace_id; + std::string report_span_id; + uint8_t trace_id_bin[opentelemetry::trace::TraceId::kSize] = { + '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f'}; + char trace_id_hex[2 * opentelemetry::trace::TraceId::kSize] = {0}; + opentelemetry::trace::TraceId trace_id{trace_id_bin}; + uint8_t span_id_bin[opentelemetry::trace::SpanId::kSize] = {'7', '6', '5', '4', + '3', '2', '1', '0'}; + char span_id_hex[2 * opentelemetry::trace::SpanId::kSize] = {0}; + opentelemetry::trace::SpanId span_id{span_id_bin}; + + const std::string schema_url{"https://opentelemetry.io/schemas/1.2.0"}; + auto logger = provider->GetLogger("test", "", "opentelelemtry_library", "", schema_url); + + trace_id.ToLowerBase16(MakeSpan(trace_id_hex)); + report_trace_id.assign(trace_id_hex, sizeof(trace_id_hex)); + + span_id.ToLowerBase16(MakeSpan(span_id_hex)); + report_span_id.assign(span_id_hex, sizeof(span_id_hex)); + + auto no_send_client = std::static_pointer_cast(client); + auto mock_session = + std::static_pointer_cast(no_send_client->session_); + EXPECT_CALL(*mock_session, SendRequest) + .WillOnce([&mock_session, report_trace_id, report_span_id]( + std::shared_ptr callback) { + auto check_json = + nlohmann::json::parse(mock_session->GetRequest()->body_, nullptr, false); + auto resource_logs = *check_json["resource_logs"].begin(); + auto instrumentation_library_span = + *resource_logs["instrumentation_library_logs"].begin(); + auto log = *instrumentation_library_span["logs"].begin(); + auto received_trace_id = log["trace_id"].get(); + auto received_span_id = log["span_id"].get(); + EXPECT_EQ(received_trace_id, report_trace_id); + EXPECT_EQ(received_span_id, report_span_id); + EXPECT_EQ("Log name", log["name"].get()); + EXPECT_EQ("Log message", log["body"]["string_value"].get()); + EXPECT_LE(15, log["attributes"].size()); + auto custom_header = mock_session->GetRequest()->headers_.find("Custom-Header-Key"); + ASSERT_TRUE(custom_header != mock_session->GetRequest()->headers_.end()); + if (custom_header != mock_session->GetRequest()->headers_.end()) { - std::thread async_finish{[callback]() { - std::this_thread::sleep_for(std::chrono::milliseconds(100)); - http_client::nosend::Response response; - response.Finish(*callback.get()); - }}; - async_finish.detach(); + EXPECT_EQ("Custom-Header-Value", custom_header->second); } - else - { + + // let the otlp_http_client to continue + std::thread async_finish{[callback]() { + std::this_thread::sleep_for(std::chrono::milliseconds(100)); http_client::nosend::Response response; response.Finish(*callback.get()); - } + }}; + async_finish.detach(); }); logger->Log(opentelemetry::logs::Severity::kInfo, "Log name", "Log message", @@ -192,8 +285,9 @@ class OtlpHttpLogExporterTestPeer : public ::testing::Test provider->ForceFlush(); } +# endif - void ExportBinaryIntegrationTest(bool is_async) + void ExportBinaryIntegrationTest() { auto mock_otlp_client = OtlpHttpLogExporterTestPeer::GetMockOtlpHttpClient(HttpRequestContentType::kBinary); @@ -209,14 +303,11 @@ class OtlpHttpLogExporterTestPeer : public ::testing::Test double attribute_storage_double_value[] = {3.2, 3.3}; std::string attribute_storage_string_value[] = {"vector", "string"}; + auto provider = nostd::shared_ptr(new sdk::logs::LoggerProvider()); sdk::logs::BatchLogProcessorOptions processor_options; processor_options.max_export_batch_size = 5; processor_options.max_queue_size = 5; processor_options.schedule_delay_millis = std::chrono::milliseconds(256); -# ifdef ENABLE_ASYNC_EXPORT - processor_options.is_export_async = is_async; -# endif - auto provider = nostd::shared_ptr(new sdk::logs::LoggerProvider()); provider->AddProcessor(std::unique_ptr( new sdk::logs::BatchLogProcessor(std::move(exporter), processor_options))); @@ -239,7 +330,7 @@ class OtlpHttpLogExporterTestPeer : public ::testing::Test auto mock_session = std::static_pointer_cast(no_send_client->session_); EXPECT_CALL(*mock_session, SendRequest) - .WillOnce([&mock_session, report_trace_id, report_span_id, is_async]( + .WillOnce([&mock_session, report_trace_id, report_span_id]( std::shared_ptr callback) { opentelemetry::proto::collector::logs::v1::ExportLogsServiceRequest request_body; request_body.ParseFromArray(&mock_session->GetRequest()->body_[0], @@ -262,20 +353,109 @@ class OtlpHttpLogExporterTestPeer : public ::testing::Test ASSERT_TRUE(check_service_name); // let the otlp_http_client to continue - if (is_async) + + http_client::nosend::Response response; + response.Finish(*callback.get()); + }); + + logger->Log(opentelemetry::logs::Severity::kInfo, "Log name", "Log message", + {{"service.name", "unit_test_service"}, + {"tenant.id", "test_user"}, + {"bool_value", true}, + {"int32_value", static_cast(1)}, + {"uint32_value", static_cast(2)}, + {"int64_value", static_cast(0x1100000000LL)}, + {"uint64_value", static_cast(0x1200000000ULL)}, + {"double_value", static_cast(3.1)}, + {"vec_bool_value", attribute_storage_bool_value}, + {"vec_int32_value", attribute_storage_int32_value}, + {"vec_uint32_value", attribute_storage_uint32_value}, + {"vec_int64_value", attribute_storage_int64_value}, + {"vec_uint64_value", attribute_storage_uint64_value}, + {"vec_double_value", attribute_storage_double_value}, + {"vec_string_value", attribute_storage_string_value}}, + trace_id, span_id, + opentelemetry::trace::TraceFlags{opentelemetry::trace::TraceFlags::kIsSampled}, + std::chrono::system_clock::now()); + + provider->ForceFlush(); + } + +# ifdef ENABLE_ASYNC_EXPORT + void ExportBinaryIntegrationTestAsync() + { + auto mock_otlp_client = + OtlpHttpLogExporterTestPeer::GetMockOtlpHttpClient(HttpRequestContentType::kBinary); + auto mock_otlp_http_client = mock_otlp_client.first; + auto client = mock_otlp_client.second; + auto exporter = GetExporter(std::unique_ptr{mock_otlp_http_client}); + + bool attribute_storage_bool_value[] = {true, false, true}; + int32_t attribute_storage_int32_value[] = {1, 2}; + uint32_t attribute_storage_uint32_value[] = {3, 4}; + int64_t attribute_storage_int64_value[] = {5, 6}; + uint64_t attribute_storage_uint64_value[] = {7, 8}; + double attribute_storage_double_value[] = {3.2, 3.3}; + std::string attribute_storage_string_value[] = {"vector", "string"}; + + auto provider = nostd::shared_ptr(new sdk::logs::LoggerProvider()); + + sdk::logs::AsyncBatchLogProcessorOptions processor_options; + processor_options.max_export_batch_size = 5; + processor_options.max_queue_size = 5; + processor_options.schedule_delay_millis = std::chrono::milliseconds(256); + provider->AddProcessor(std::unique_ptr( + new sdk::logs::AsyncBatchLogProcessor(std::move(exporter), processor_options))); + + std::string report_trace_id; + std::string report_span_id; + uint8_t trace_id_bin[opentelemetry::trace::TraceId::kSize] = { + '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f'}; + opentelemetry::trace::TraceId trace_id{trace_id_bin}; + uint8_t span_id_bin[opentelemetry::trace::SpanId::kSize] = {'7', '6', '5', '4', + '3', '2', '1', '0'}; + opentelemetry::trace::SpanId span_id{span_id_bin}; + + const std::string schema_url{"https://opentelemetry.io/schemas/1.2.0"}; + auto logger = provider->GetLogger("test", "", "opentelelemtry_library", "", schema_url); + + report_trace_id.assign(reinterpret_cast(trace_id_bin), sizeof(trace_id_bin)); + report_span_id.assign(reinterpret_cast(span_id_bin), sizeof(span_id_bin)); + + auto no_send_client = std::static_pointer_cast(client); + auto mock_session = + std::static_pointer_cast(no_send_client->session_); + EXPECT_CALL(*mock_session, SendRequest) + .WillOnce([&mock_session, report_trace_id, report_span_id]( + std::shared_ptr callback) { + opentelemetry::proto::collector::logs::v1::ExportLogsServiceRequest request_body; + request_body.ParseFromArray(&mock_session->GetRequest()->body_[0], + static_cast(mock_session->GetRequest()->body_.size())); + auto received_log = request_body.resource_logs(0).instrumentation_library_logs(0).logs(0); + EXPECT_EQ(received_log.trace_id(), report_trace_id); + EXPECT_EQ(received_log.span_id(), report_span_id); + EXPECT_EQ("Log name", received_log.name()); + EXPECT_EQ("Log message", received_log.body().string_value()); + EXPECT_LE(15, received_log.attributes_size()); + bool check_service_name = false; + for (auto &attribute : received_log.attributes()) { - std::thread async_finish{[callback]() { - std::this_thread::sleep_for(std::chrono::milliseconds(100)); - http_client::nosend::Response response; - response.Finish(*callback.get()); - }}; - async_finish.detach(); + if ("service.name" == attribute.key()) + { + check_service_name = true; + EXPECT_EQ("unit_test_service", attribute.value().string_value()); + } } - else - { + ASSERT_TRUE(check_service_name); + + // let the otlp_http_client to continue + + std::thread async_finish{[callback]() { + std::this_thread::sleep_for(std::chrono::milliseconds(100)); http_client::nosend::Response response; response.Finish(*callback.get()); - } + }}; + async_finish.detach(); }); logger->Log(opentelemetry::logs::Severity::kInfo, "Log name", "Log message", @@ -300,31 +480,32 @@ class OtlpHttpLogExporterTestPeer : public ::testing::Test provider->ForceFlush(); } +# endif }; // Create log records, let processor call Export() TEST_F(OtlpHttpLogExporterTestPeer, ExportJsonIntegrationTestSync) { - ExportJsonIntegrationTest(false); + ExportJsonIntegrationTest(); } # ifdef ENABLE_ASYNC_EXPORT TEST_F(OtlpHttpLogExporterTestPeer, ExportJsonIntegrationTestAsync) { - ExportJsonIntegrationTest(true); + ExportJsonIntegrationTestAsync(); } # endif // Create log records, let processor call Export() TEST_F(OtlpHttpLogExporterTestPeer, ExportBinaryIntegrationTestSync) { - ExportBinaryIntegrationTest(false); + ExportBinaryIntegrationTest(); } # ifdef ENABLE_ASYNC_EXPORT TEST_F(OtlpHttpLogExporterTestPeer, ExportBinaryIntegrationTestAsync) { - ExportBinaryIntegrationTest(true); + ExportBinaryIntegrationTestAsync(); } # endif diff --git a/sdk/include/opentelemetry/sdk/logs/async_batch_log_processor.h b/sdk/include/opentelemetry/sdk/logs/async_batch_log_processor.h new file mode 100644 index 0000000000..7cc6af8738 --- /dev/null +++ b/sdk/include/opentelemetry/sdk/logs/async_batch_log_processor.h @@ -0,0 +1,104 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +#pragma once +#ifdef ENABLE_LOGS_PREVIEW +# ifdef ENABLE_ASYNC_EXPORT + +# include "opentelemetry/sdk/common/circular_buffer.h" +# include "opentelemetry/sdk/logs/batch_log_processor.h" +# include "opentelemetry/sdk/logs/exporter.h" + +# include +# include +# include +# include +# include +# include + +OPENTELEMETRY_BEGIN_NAMESPACE +namespace sdk +{ + +namespace logs +{ + +/** + * Struct to hold batch SpanProcessor options. + */ +struct AsyncBatchLogProcessorOptions : public BatchLogProcessorOptions +{ + /* Denotes the maximum number of async exports to continue + */ + size_t max_export_async = 8; +}; + +/** + * This is an implementation of the LogProcessor which creates batches of finished logs and passes + * the export-friendly log data representations to the configured LogExporter. + */ +class AsyncBatchLogProcessor : public BatchLogProcessor +{ +public: + /** + * Creates a batch log processor by configuring the specified exporter and other parameters + * as per the official, language-agnostic opentelemetry specs. + * + * @param exporter - The backend exporter to pass the logs to + * @param options - The batch SpanProcessor options. + */ + explicit AsyncBatchLogProcessor(std::unique_ptr &&exporter, + const AsyncBatchLogProcessorOptions &options); + + /** + * Shuts down the processor and does any cleanup required. Completely drains the buffer/queue of + * all its logs and passes them to the exporter. Any subsequent calls to + * ForceFlush or Shutdown will return immediately without doing anything. + * + * NOTE: Timeout functionality not supported yet. + */ + bool Shutdown( + std::chrono::microseconds timeout = std::chrono::microseconds::max()) noexcept override; + + /** + * Class destructor which invokes the Shutdown() method. + */ + virtual ~AsyncBatchLogProcessor(); + +private: + /** + * Exports all logs to the configured exporter. + * + */ + void Export() override; + + struct ExportDataStorage + { + std::queue export_ids; + std::vector export_ids_flag; + + std::condition_variable async_export_waker; + std::mutex async_export_data_m; + }; + std::shared_ptr export_data_storage_; + + const size_t max_export_async_; + static constexpr size_t kInvalidExportId = static_cast(-1); + + /** + * @brief Notify completion of shutdown and force flush. This may be called from the any thread at + * any time + * + * @param notify_force_flush Flag to indicate whether to notify force flush completion. + * @param synchronization_data Synchronization data to be notified. + */ + static void NotifyCompletion(bool notify_force_flush, + const std::shared_ptr &synchronization_data, + const std::shared_ptr &export_data_storage); +}; + +} // namespace logs +} // namespace sdk +OPENTELEMETRY_END_NAMESPACE +# endif +#endif diff --git a/sdk/include/opentelemetry/sdk/logs/batch_log_processor.h b/sdk/include/opentelemetry/sdk/logs/batch_log_processor.h index d93a9d3294..2a781b937c 100644 --- a/sdk/include/opentelemetry/sdk/logs/batch_log_processor.h +++ b/sdk/include/opentelemetry/sdk/logs/batch_log_processor.h @@ -40,14 +40,6 @@ struct BatchLogProcessorOptions * equal to max_queue_size. */ size_t max_export_batch_size = 512; - -# ifdef ENABLE_ASYNC_EXPORT - /** - * Determines whether the export happens asynchronously. - * Default implementation is synchronous. - */ - bool is_export_async = false; -# endif }; /** @@ -72,8 +64,7 @@ class BatchLogProcessor : public LogProcessor std::unique_ptr &&exporter, const size_t max_queue_size = 2048, const std::chrono::milliseconds scheduled_delay_millis = std::chrono::milliseconds(5000), - const size_t max_export_batch_size = 512, - const bool is_export_async = false); + const size_t max_export_batch_size = 512); /** * Creates a batch log processor by configuring the specified exporter and other parameters @@ -116,9 +107,9 @@ class BatchLogProcessor : public LogProcessor /** * Class destructor which invokes the Shutdown() method. */ - virtual ~BatchLogProcessor() override; + virtual ~BatchLogProcessor(); -private: +protected: /** * The background routine performed by the worker thread. */ @@ -128,7 +119,7 @@ class BatchLogProcessor : public LogProcessor * Exports all logs to the configured exporter. * */ - void Export(); + virtual void Export(); /** * Called when Shutdown() is invoked. Completely drains the queue of all log records and @@ -136,22 +127,17 @@ class BatchLogProcessor : public LogProcessor */ void DrainQueue(); -# ifdef ENABLE_ASYNC_EXPORT - /* In case of async export, wait and notify for shutdown to be completed.*/ - void WaitForShutdownCompletion(); -# endif struct SynchronizationData { /* Synchronization primitives */ - std::condition_variable cv, force_flush_cv, async_shutdown_cv; - std::mutex cv_m, force_flush_cv_m, shutdown_m, async_shutdown_m; + std::condition_variable cv, force_flush_cv; + std::mutex cv_m, force_flush_cv_m, shutdown_m; /* Important boolean flags to handle the workflow of the processor */ std::atomic is_force_wakeup_background_worker; std::atomic is_force_flush_pending; std::atomic is_force_flush_notified; std::atomic is_shutdown; - std::atomic is_async_shutdown_notified; }; /** @@ -164,6 +150,9 @@ class BatchLogProcessor : public LogProcessor static void NotifyCompletion(bool notify_force_flush, const std::shared_ptr &synchronization_data); + void GetWaitAdjustedTime(std::chrono::microseconds &timeout, + std::chrono::time_point &start_time); + /* The configured backend log exporter */ std::unique_ptr exporter_; @@ -171,9 +160,6 @@ class BatchLogProcessor : public LogProcessor const size_t max_queue_size_; const std::chrono::milliseconds scheduled_delay_millis_; const size_t max_export_batch_size_; -# ifdef ENABLE_ASYNC_EXPORT - const bool is_export_async_; -# endif /* The buffer/queue to which the ended logs are added */ common::CircularBuffer buffer_; diff --git a/sdk/include/opentelemetry/sdk/trace/async_batch_span_processor.h b/sdk/include/opentelemetry/sdk/trace/async_batch_span_processor.h new file mode 100644 index 0000000000..4cbb234e71 --- /dev/null +++ b/sdk/include/opentelemetry/sdk/trace/async_batch_span_processor.h @@ -0,0 +1,103 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +#pragma once +#ifdef ENABLE_ASYNC_EXPORT + +# include "opentelemetry/sdk/common/circular_buffer.h" +# include "opentelemetry/sdk/trace/batch_span_processor.h" +# include "opentelemetry/sdk/trace/exporter.h" + +# include +# include +# include +# include + +OPENTELEMETRY_BEGIN_NAMESPACE +namespace sdk +{ + +namespace trace +{ + +/** + * Struct to hold batch SpanProcessor options. + */ +struct AsyncBatchSpanProcessorOptions : public BatchSpanProcessorOptions +{ + /* Denotes the maximum number of async exports to continue + */ + size_t max_export_async = 8; +}; + +/** + * This is an implementation of the SpanProcessor which creates batches of finished spans and passes + * the export-friendly span data representations to the configured SpanExporter. + */ +class AsyncBatchSpanProcessor : public BatchSpanProcessor +{ +public: + /** + * Creates a batch span processor by configuring the specified exporter and other parameters + * as per the official, language-agnostic opentelemetry specs. + * + * @param exporter - The backend exporter to pass the ended spans to. + * @param options - The batch SpanProcessor options. + */ + AsyncBatchSpanProcessor(std::unique_ptr &&exporter, + const AsyncBatchSpanProcessorOptions &options); + + /** + * Shuts down the processor and does any cleanup required. Completely drains the buffer/queue of + * all its ended spans and passes them to the exporter. Any subsequent calls to OnStart, OnEnd, + * ForceFlush or Shutdown will return immediately without doing anything. + * + * NOTE: Timeout functionality not supported yet. + */ + bool Shutdown( + std::chrono::microseconds timeout = std::chrono::microseconds::max()) noexcept override; + + /** + * Class destructor which invokes the Shutdown() method. The Shutdown() method is supposed to be + * invoked when the Tracer is shutdown (as per other languages), but the C++ Tracer only takes + * shared ownership of the processor, and thus doesn't call Shutdown (as the processor might be + * shared with other Tracers). + */ + virtual ~AsyncBatchSpanProcessor(); + +private: + /** + * Exports all ended spans to the configured exporter. + * + */ + void Export() override; + + struct ExportDataStorage + { + std::queue export_ids; + std::vector export_ids_flag; + + std::condition_variable async_export_waker; + std::mutex async_export_data_m; + }; + std::shared_ptr export_data_storage_; + + const size_t max_export_async_; + static constexpr size_t kInvalidExportId = static_cast(-1); + + /** + * @brief Notify completion of shutdown and force flush. This may be called from the any thread at + * any time + * + * @param notify_force_flush Flag to indicate whether to notify force flush completion. + * @param synchronization_data Synchronization data to be notified. + */ + static void NotifyCompletion(bool notify_force_flush, + const std::shared_ptr &synchronization_data, + const std::shared_ptr &export_data_storage); +}; + +} // namespace trace +} // namespace sdk +OPENTELEMETRY_END_NAMESPACE +#endif \ No newline at end of file diff --git a/sdk/include/opentelemetry/sdk/trace/batch_span_processor.h b/sdk/include/opentelemetry/sdk/trace/batch_span_processor.h index cbd0cddfbb..f18fc78346 100644 --- a/sdk/include/opentelemetry/sdk/trace/batch_span_processor.h +++ b/sdk/include/opentelemetry/sdk/trace/batch_span_processor.h @@ -37,14 +37,6 @@ struct BatchSpanProcessorOptions * equal to max_queue_size. */ size_t max_export_batch_size = 512; - -#ifdef ENABLE_ASYNC_EXPORT - /** - * Determines whether the export happens asynchronously. - * Default implementation is synchronous. - */ - bool is_export_async = false; -#endif }; /** @@ -113,9 +105,9 @@ class BatchSpanProcessor : public SpanProcessor * shared ownership of the processor, and thus doesn't call Shutdown (as the processor might be * shared with other Tracers). */ - ~BatchSpanProcessor(); + virtual ~BatchSpanProcessor(); -private: +protected: /** * The background routine performed by the worker thread. */ @@ -125,7 +117,7 @@ class BatchSpanProcessor : public SpanProcessor * Exports all ended spans to the configured exporter. * */ - void Export(); + virtual void Export(); /** * Called when Shutdown() is invoked. Completely drains the queue of all its ended spans and @@ -133,23 +125,17 @@ class BatchSpanProcessor : public SpanProcessor */ void DrainQueue(); -#ifdef ENABLE_ASYNC_EXPORT - /* In case of async export, wait and notify for shutdown to be completed.*/ - void WaitForShutdownCompletion(); -#endif - struct SynchronizationData { /* Synchronization primitives */ - std::condition_variable cv, force_flush_cv, async_shutdown_cv; - std::mutex cv_m, force_flush_cv_m, shutdown_m, async_shutdown_m; + std::condition_variable cv, force_flush_cv; + std::mutex cv_m, force_flush_cv_m, shutdown_m; /* Important boolean flags to handle the workflow of the processor */ std::atomic is_force_wakeup_background_worker; std::atomic is_force_flush_pending; std::atomic is_force_flush_notified; std::atomic is_shutdown; - std::atomic is_async_shutdown_notified; }; /** @@ -162,6 +148,8 @@ class BatchSpanProcessor : public SpanProcessor static void NotifyCompletion(bool notify_force_flush, const std::shared_ptr &synchronization_data); + void GetWaitAdjustedTime(std::chrono::microseconds &timeout, + std::chrono::time_point &start_time); /* The configured backend exporter */ std::unique_ptr exporter_; @@ -169,9 +157,6 @@ class BatchSpanProcessor : public SpanProcessor const size_t max_queue_size_; const std::chrono::milliseconds schedule_delay_millis_; const size_t max_export_batch_size_; -#ifdef ENABLE_ASYNC_EXPORT - const bool is_export_async_; -#endif /* The buffer/queue to which the ended spans are added */ common::CircularBuffer buffer_; diff --git a/sdk/src/logs/CMakeLists.txt b/sdk/src/logs/CMakeLists.txt index 20f13324e7..1b1db9e778 100644 --- a/sdk/src/logs/CMakeLists.txt +++ b/sdk/src/logs/CMakeLists.txt @@ -4,6 +4,7 @@ add_library( logger.cc simple_log_processor.cc batch_log_processor.cc + async_batch_log_processor.cc logger_context.cc multi_log_processor.cc multi_recordable.cc) diff --git a/sdk/src/logs/async_batch_log_processor.cc b/sdk/src/logs/async_batch_log_processor.cc new file mode 100644 index 0000000000..5d99754c12 --- /dev/null +++ b/sdk/src/logs/async_batch_log_processor.cc @@ -0,0 +1,194 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +#ifdef ENABLE_LOGS_PREVIEW +# ifdef ENABLE_ASYNC_EXPORT +# include "opentelemetry/sdk/logs/async_batch_log_processor.h" +# include "opentelemetry/common/spin_lock_mutex.h" + +# include + +using opentelemetry::sdk::common::AtomicUniquePtr; +using opentelemetry::sdk::common::CircularBufferRange; + +OPENTELEMETRY_BEGIN_NAMESPACE +namespace sdk +{ +namespace logs +{ +AsyncBatchLogProcessor::AsyncBatchLogProcessor(std::unique_ptr &&exporter, + const AsyncBatchLogProcessorOptions &options) + : BatchLogProcessor(std::move(exporter), options), + max_export_async_(options.max_export_async), + export_data_storage_(std::make_shared()) +{ + export_data_storage_->export_ids_flag.resize(max_export_async_, true); + for (int i = 1; i <= max_export_async_; i++) + { + export_data_storage_->export_ids.push(i); + } +} + +void AsyncBatchLogProcessor::Export() +{ + do + { + std::vector> records_arr; + size_t num_records_to_export; + bool notify_force_flush = + synchronization_data_->is_force_flush_pending.exchange(false, std::memory_order_acq_rel); + if (notify_force_flush) + { + num_records_to_export = buffer_.size(); + } + else + { + num_records_to_export = + buffer_.size() >= max_export_batch_size_ ? max_export_batch_size_ : buffer_.size(); + } + + if (num_records_to_export == 0) + { + NotifyCompletion(notify_force_flush, synchronization_data_, export_data_storage_); + break; + } + + buffer_.Consume(num_records_to_export, + [&](CircularBufferRange> range) noexcept { + range.ForEach([&](AtomicUniquePtr &ptr) { + std::unique_ptr swap_ptr = std::unique_ptr(nullptr); + ptr.Swap(swap_ptr); + records_arr.push_back(std::unique_ptr(swap_ptr.release())); + return true; + }); + }); + + size_t id = kInvalidExportId; + { + std::unique_lock lock(export_data_storage_->async_export_data_m); + export_data_storage_->async_export_waker.wait_for(lock, scheduled_delay_millis_, [this] { + return export_data_storage_->export_ids.size() > 0; + }); + if (export_data_storage_->export_ids.size() > 0) + { + id = export_data_storage_->export_ids.front(); + export_data_storage_->export_ids.pop(); + export_data_storage_->export_ids_flag[id - 1] = false; + } + } + if (id != kInvalidExportId) + { + std::weak_ptr export_data_watcher = export_data_storage_; + std::weak_ptr synchronization_data_watcher = synchronization_data_; + exporter_->Export( + nostd::span>(records_arr.data(), records_arr.size()), + [notify_force_flush, synchronization_data_watcher, export_data_watcher, + id](sdk::common::ExportResult result) { + // TODO: Print result + if (synchronization_data_watcher.expired()) + { + return true; + } + if (export_data_watcher.expired()) + { + return true; + } + bool is_already_notified = false; + auto synchronization_data = synchronization_data_watcher.lock(); + auto export_data = export_data_watcher.lock(); + { + std::unique_lock lk(export_data->async_export_data_m); + // In case callback is called more than once due to some bug in exporter + // we need to ensure export_ids do not contain duplicate. + if (export_data->export_ids_flag[id - 1] == false) + { + export_data->export_ids.push(id); + export_data->export_ids_flag[id - 1] = true; + } + else + { + is_already_notified = true; + } + } + if (is_already_notified == false) + { + NotifyCompletion(notify_force_flush, synchronization_data, export_data); + } + return true; + }); + } + } while (true); +} + +void AsyncBatchLogProcessor::NotifyCompletion( + bool notify_force_flush, + const std::shared_ptr &synchronization_data, + const std::shared_ptr &export_data_storage) +{ + BatchLogProcessor::NotifyCompletion(notify_force_flush, synchronization_data); + export_data_storage->async_export_waker.notify_all(); +} + +bool AsyncBatchLogProcessor::Shutdown(std::chrono::microseconds timeout) noexcept +{ + if (synchronization_data_->is_shutdown.load() == true) + { + return true; + } + auto start_time = std::chrono::system_clock::now(); + + std::lock_guard shutdown_guard{synchronization_data_->shutdown_m}; + bool already_shutdown = synchronization_data_->is_shutdown.exchange(true); + if (worker_thread_.joinable()) + { + synchronization_data_->is_force_wakeup_background_worker.store(true, std::memory_order_release); + synchronization_data_->cv.notify_one(); + worker_thread_.join(); + } + + timeout = opentelemetry::common::DurationUtil::AdjustWaitForTimeout( + timeout, std::chrono::microseconds::zero()); + // wait for all async exports to complete and return if timeout reached. + { + std::unique_lock lock(export_data_storage_->async_export_data_m); + if (timeout <= std::chrono::microseconds::zero()) + { + auto is_wait = false; + while (!is_wait) + { + is_wait = export_data_storage_->async_export_waker.wait_for( + lock, scheduled_delay_millis_, + [this] { return export_data_storage_->export_ids.size() == max_export_async_; }); + } + } + else + { + export_data_storage_->async_export_waker.wait_for(lock, timeout, [this] { + return export_data_storage_->export_ids.size() == max_export_async_; + }); + } + } + + GetWaitAdjustedTime(timeout, start_time); + // Should only shutdown exporter ONCE. + if (!already_shutdown && exporter_ != nullptr) + { + return exporter_->Shutdown(timeout); + } + + return true; +} + +AsyncBatchLogProcessor::~AsyncBatchLogProcessor() +{ + if (synchronization_data_->is_shutdown.load() == false) + { + Shutdown(); + } +} + +} // namespace logs +} // namespace sdk +OPENTELEMETRY_END_NAMESPACE +# endif +#endif diff --git a/sdk/src/logs/batch_log_processor.cc b/sdk/src/logs/batch_log_processor.cc index ee80d21ce7..af2f6cae59 100644 --- a/sdk/src/logs/batch_log_processor.cc +++ b/sdk/src/logs/batch_log_processor.cc @@ -17,15 +17,11 @@ namespace logs BatchLogProcessor::BatchLogProcessor(std::unique_ptr &&exporter, const size_t max_queue_size, const std::chrono::milliseconds scheduled_delay_millis, - const size_t max_export_batch_size, - const bool is_export_async) + const size_t max_export_batch_size) : exporter_(std::move(exporter)), max_queue_size_(max_queue_size), scheduled_delay_millis_(scheduled_delay_millis), max_export_batch_size_(max_export_batch_size), -# ifdef ENABLE_ASYNC_EXPORT - is_export_async_(is_export_async), -# endif buffer_(max_queue_size_), synchronization_data_(std::make_shared()), worker_thread_(&BatchLogProcessor::DoBackgroundWork, this) @@ -34,7 +30,6 @@ BatchLogProcessor::BatchLogProcessor(std::unique_ptr &&exporter, synchronization_data_->is_force_flush_pending.store(false); synchronization_data_->is_force_flush_notified.store(false); synchronization_data_->is_shutdown.store(false); - synchronization_data_->is_async_shutdown_notified.store(false); } BatchLogProcessor::BatchLogProcessor(std::unique_ptr &&exporter, @@ -43,9 +38,6 @@ BatchLogProcessor::BatchLogProcessor(std::unique_ptr &&exporter, max_queue_size_(options.max_queue_size), scheduled_delay_millis_(options.schedule_delay_millis), max_export_batch_size_(options.max_export_batch_size), -# ifdef ENABLE_ASYNC_EXPORT - is_export_async_(options.is_export_async), -# endif buffer_(options.max_queue_size), synchronization_data_(std::make_shared()), worker_thread_(&BatchLogProcessor::DoBackgroundWork, this) @@ -54,7 +46,6 @@ BatchLogProcessor::BatchLogProcessor(std::unique_ptr &&exporter, synchronization_data_->is_force_flush_pending.store(false); synchronization_data_->is_force_flush_notified.store(false); synchronization_data_->is_shutdown.store(false); - synchronization_data_->is_async_shutdown_notified.store(false); } std::unique_ptr BatchLogProcessor::MakeRecordable() noexcept @@ -149,6 +140,7 @@ bool BatchLogProcessor::ForceFlush(std::chrono::microseconds timeout) noexcept } } } + synchronization_data_->is_force_flush_notified.store(false, std::memory_order_release); return result; @@ -225,59 +217,12 @@ void BatchLogProcessor::Export() }); }); -# ifdef ENABLE_ASYNC_EXPORT - if (is_export_async_ == false) - { -# endif - exporter_->Export( - nostd::span>(records_arr.data(), records_arr.size())); - NotifyCompletion(notify_force_flush, synchronization_data_); -# ifdef ENABLE_ASYNC_EXPORT - } - else - { - std::weak_ptr synchronization_data_watcher = synchronization_data_; - exporter_->Export( - nostd::span>(records_arr.data(), records_arr.size()), - [notify_force_flush, synchronization_data_watcher](sdk::common::ExportResult result) { - // TODO: Print result - if (synchronization_data_watcher.expired()) - { - return true; - } - - NotifyCompletion(notify_force_flush, synchronization_data_watcher.lock()); - return true; - }); - } -# endif + exporter_->Export( + nostd::span>(records_arr.data(), records_arr.size())); + NotifyCompletion(notify_force_flush, synchronization_data_); } while (true); } -# ifdef ENABLE_ASYNC_EXPORT -void BatchLogProcessor::WaitForShutdownCompletion() -{ - // Since async export is invoked due to shutdown, need to wait - // for async thread to complete. - if (is_export_async_) - { - std::unique_lock lk(synchronization_data_->async_shutdown_m); - while (true) - { - if (synchronization_data_->is_async_shutdown_notified.load()) - { - break; - } - - // When is_async_shutdown_notified.store(true) and async_shutdown_cv.notify_all() is called - // between is_async_shutdown_notified.load() and async_shutdown_cv.wait(). We must not wait - // for ever - synchronization_data_->async_shutdown_cv.wait_for(lk, scheduled_delay_millis_); - } - } -} -# endif - void BatchLogProcessor::NotifyCompletion( bool notify_force_flush, const std::shared_ptr &synchronization_data) @@ -292,13 +237,6 @@ void BatchLogProcessor::NotifyCompletion( synchronization_data->is_force_flush_notified.store(true, std::memory_order_release); synchronization_data->force_flush_cv.notify_one(); } - - // Notify the thread which is waiting on shutdown to complete. - if (synchronization_data->is_shutdown.load() == true) - { - synchronization_data->is_async_shutdown_notified.store(true); - synchronization_data->async_shutdown_cv.notify_all(); - } } void BatchLogProcessor::DrainQueue() @@ -312,12 +250,26 @@ void BatchLogProcessor::DrainQueue() } Export(); + } +} -# ifdef ENABLE_ASYNC_EXPORT - // Since async export is invoked due to shutdown, need to wait - // for async thread to complete. - WaitForShutdownCompletion(); -# endif +void BatchLogProcessor::GetWaitAdjustedTime( + std::chrono::microseconds &timeout, + std::chrono::time_point &start_time) +{ + auto end_time = std::chrono::system_clock::now(); + auto offset = std::chrono::duration_cast(end_time - start_time); + start_time = end_time; + timeout = opentelemetry::common::DurationUtil::AdjustWaitForTimeout( + timeout, std::chrono::microseconds::zero()); + if (timeout > offset && timeout > std::chrono::microseconds::zero()) + { + timeout -= offset; + } + else + { + // Some module use zero as indefinite timeout.So we can not reset timeout to zero here + timeout = std::chrono::microseconds(1); } } @@ -335,21 +287,7 @@ bool BatchLogProcessor::Shutdown(std::chrono::microseconds timeout) noexcept worker_thread_.join(); } - auto worker_end_time = std::chrono::system_clock::now(); - auto offset = std::chrono::duration_cast(worker_end_time - start_time); - - timeout = opentelemetry::common::DurationUtil::AdjustWaitForTimeout( - timeout, std::chrono::microseconds::zero()); - if (timeout > offset && timeout > std::chrono::microseconds::zero()) - { - timeout -= offset; - } - else - { - // Some module use zero as indefinite timeout.So we can not reset timeout to zero here - timeout = std::chrono::microseconds(1); - } - + GetWaitAdjustedTime(timeout, start_time); // Should only shutdown exporter ONCE. if (!already_shutdown && exporter_ != nullptr) { diff --git a/sdk/src/trace/CMakeLists.txt b/sdk/src/trace/CMakeLists.txt index ddef00fb42..bd906b1fab 100644 --- a/sdk/src/trace/CMakeLists.txt +++ b/sdk/src/trace/CMakeLists.txt @@ -5,6 +5,7 @@ add_library( tracer.cc span.cc batch_span_processor.cc + async_batch_span_processor.cc samplers/parent.cc samplers/trace_id_ratio.cc random_id_generator.cc) diff --git a/sdk/src/trace/async_batch_span_processor.cc b/sdk/src/trace/async_batch_span_processor.cc new file mode 100644 index 0000000000..4f7a7c957a --- /dev/null +++ b/sdk/src/trace/async_batch_span_processor.cc @@ -0,0 +1,187 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 +#ifdef ENABLE_ASYNC_EXPORT + +# include "opentelemetry/sdk/trace/async_batch_span_processor.h" +# include "opentelemetry/common/spin_lock_mutex.h" + +# include +using opentelemetry::sdk::common::AtomicUniquePtr; +using opentelemetry::sdk::common::CircularBuffer; +using opentelemetry::sdk::common::CircularBufferRange; +using opentelemetry::trace::SpanContext; + +OPENTELEMETRY_BEGIN_NAMESPACE +namespace sdk +{ +namespace trace +{ + +AsyncBatchSpanProcessor::AsyncBatchSpanProcessor(std::unique_ptr &&exporter, + const AsyncBatchSpanProcessorOptions &options) + : BatchSpanProcessor(std::move(exporter), options), + max_export_async_(options.max_export_async), + export_data_storage_(std::make_shared()) +{ + export_data_storage_->export_ids_flag.resize(max_export_async_, true); + for (size_t i = 1; i <= max_export_async_; i++) + { + export_data_storage_->export_ids.push(i); + } +} + +void AsyncBatchSpanProcessor::Export() +{ + do + { + std::vector> spans_arr; + size_t num_records_to_export; + bool notify_force_flush = + synchronization_data_->is_force_flush_pending.exchange(false, std::memory_order_acq_rel); + if (notify_force_flush) + { + num_records_to_export = buffer_.size(); + } + else + { + num_records_to_export = + buffer_.size() >= max_export_batch_size_ ? max_export_batch_size_ : buffer_.size(); + } + + if (num_records_to_export == 0) + { + NotifyCompletion(notify_force_flush, synchronization_data_, export_data_storage_); + break; + } + buffer_.Consume(num_records_to_export, + [&](CircularBufferRange> range) noexcept { + range.ForEach([&](AtomicUniquePtr &ptr) { + std::unique_ptr swap_ptr = std::unique_ptr(nullptr); + ptr.Swap(swap_ptr); + spans_arr.push_back(std::unique_ptr(swap_ptr.release())); + return true; + }); + }); + + size_t id = kInvalidExportId; + { + std::unique_lock lock(export_data_storage_->async_export_data_m); + export_data_storage_->async_export_waker.wait_for(lock, schedule_delay_millis_, [this] { + return export_data_storage_->export_ids.size() > 0; + }); + if (export_data_storage_->export_ids.size() > 0) + { + id = export_data_storage_->export_ids.front(); + export_data_storage_->export_ids.pop(); + export_data_storage_->export_ids_flag[id - 1] = false; + } + } + if (id != kInvalidExportId) + { + std::weak_ptr export_data_watcher = export_data_storage_; + + std::weak_ptr synchronization_data_watcher = synchronization_data_; + exporter_->Export( + nostd::span>(spans_arr.data(), spans_arr.size()), + [notify_force_flush, synchronization_data_watcher, export_data_watcher, + id](sdk::common::ExportResult result) { + // TODO: Print result + if (synchronization_data_watcher.expired()) + { + return true; + } + + if (export_data_watcher.expired()) + { + return true; + } + + auto synchronization_data = synchronization_data_watcher.lock(); + auto export_data = export_data_watcher.lock(); + { + std::unique_lock lk(export_data->async_export_data_m); + if (export_data->export_ids_flag[id - 1] == false) + { + export_data->export_ids.push(id); + export_data->export_ids_flag[id - 1] = true; + } + } + NotifyCompletion(notify_force_flush, synchronization_data, export_data); + return true; + }); + } + } while (true); +} + +void AsyncBatchSpanProcessor::NotifyCompletion( + bool notify_force_flush, + const std::shared_ptr &synchronization_data, + const std::shared_ptr &export_data_storage) +{ + BatchSpanProcessor::NotifyCompletion(notify_force_flush, synchronization_data); + export_data_storage->async_export_waker.notify_all(); +} + +bool AsyncBatchSpanProcessor::Shutdown(std::chrono::microseconds timeout) noexcept +{ + if (synchronization_data_->is_shutdown.load() == true) + { + return true; + } + + auto start_time = std::chrono::system_clock::now(); + std::lock_guard shutdown_guard{synchronization_data_->shutdown_m}; + bool already_shutdown = synchronization_data_->is_shutdown.exchange(true); + + if (worker_thread_.joinable()) + { + synchronization_data_->is_force_wakeup_background_worker.store(true, std::memory_order_release); + synchronization_data_->cv.notify_one(); + worker_thread_.join(); + } + + timeout = opentelemetry::common::DurationUtil::AdjustWaitForTimeout( + timeout, std::chrono::microseconds::zero()); + // wait for all async exports to complete and return if timeout reached. + { + std::unique_lock lock(export_data_storage_->async_export_data_m); + if (timeout <= std::chrono::microseconds::zero()) + { + auto is_wait = false; + while (!is_wait) + { + is_wait = export_data_storage_->async_export_waker.wait_for( + lock, schedule_delay_millis_, + [this] { return export_data_storage_->export_ids.size() == max_export_async_; }); + } + } + else + { + export_data_storage_->async_export_waker.wait_for(lock, timeout, [this] { + return export_data_storage_->export_ids.size() == max_export_async_; + }); + } + } + + GetWaitAdjustedTime(timeout, start_time); + // Should only shutdown exporter ONCE. + if (!already_shutdown && exporter_ != nullptr) + { + return exporter_->Shutdown(timeout); + } + + return true; +} + +AsyncBatchSpanProcessor::~AsyncBatchSpanProcessor() +{ + if (synchronization_data_->is_shutdown.load() == false) + { + Shutdown(); + } +} + +} // namespace trace +} // namespace sdk +OPENTELEMETRY_END_NAMESPACE +#endif diff --git a/sdk/src/trace/batch_span_processor.cc b/sdk/src/trace/batch_span_processor.cc index 6a356c17a0..4609eae95f 100644 --- a/sdk/src/trace/batch_span_processor.cc +++ b/sdk/src/trace/batch_span_processor.cc @@ -22,9 +22,6 @@ BatchSpanProcessor::BatchSpanProcessor(std::unique_ptr &&exporter, max_queue_size_(options.max_queue_size), schedule_delay_millis_(options.schedule_delay_millis), max_export_batch_size_(options.max_export_batch_size), -#ifdef ENABLE_ASYNC_EXPORT - is_export_async_(options.is_export_async), -#endif buffer_(max_queue_size_), synchronization_data_(std::make_shared()), worker_thread_(&BatchSpanProcessor::DoBackgroundWork, this) @@ -33,7 +30,6 @@ BatchSpanProcessor::BatchSpanProcessor(std::unique_ptr &&exporter, synchronization_data_->is_force_flush_pending.store(false); synchronization_data_->is_force_flush_notified.store(false); synchronization_data_->is_shutdown.store(false); - synchronization_data_->is_async_shutdown_notified.store(false); } std::unique_ptr BatchSpanProcessor::MakeRecordable() noexcept @@ -207,59 +203,11 @@ void BatchSpanProcessor::Export() }); }); -#ifdef ENABLE_ASYNC_EXPORT - if (is_export_async_ == false) - { -#endif - exporter_->Export( - nostd::span>(spans_arr.data(), spans_arr.size())); - NotifyCompletion(notify_force_flush, synchronization_data_); -#ifdef ENABLE_ASYNC_EXPORT - } - else - { - std::weak_ptr synchronization_data_watcher = synchronization_data_; - exporter_->Export( - nostd::span>(spans_arr.data(), spans_arr.size()), - [notify_force_flush, synchronization_data_watcher](sdk::common::ExportResult result) { - // TODO: Print result - if (synchronization_data_watcher.expired()) - { - return true; - } - - NotifyCompletion(notify_force_flush, synchronization_data_watcher.lock()); - return true; - }); - } -#endif + exporter_->Export(nostd::span>(spans_arr.data(), spans_arr.size())); + NotifyCompletion(notify_force_flush, synchronization_data_); } while (true); } -#ifdef ENABLE_ASYNC_EXPORT -void BatchSpanProcessor::WaitForShutdownCompletion() -{ - // Since async export is invoked due to shutdown, need to wait - // for async thread to complete. - if (is_export_async_) - { - std::unique_lock lk(synchronization_data_->async_shutdown_m); - while (true) - { - if (synchronization_data_->is_async_shutdown_notified.load()) - { - break; - } - - // When is_async_shutdown_notified.store(true) and async_shutdown_cv.notify_all() is called - // between is_async_shutdown_notified.load() and async_shutdown_cv.wait(). We must not wait - // for ever - synchronization_data_->async_shutdown_cv.wait_for(lk, schedule_delay_millis_); - } - } -} -#endif - void BatchSpanProcessor::NotifyCompletion( bool notify_force_flush, const std::shared_ptr &synchronization_data) @@ -274,13 +222,6 @@ void BatchSpanProcessor::NotifyCompletion( synchronization_data->is_force_flush_notified.store(true, std::memory_order_release); synchronization_data->force_flush_cv.notify_one(); } - - // Notify the thread which is waiting on shutdown to complete. - if (synchronization_data->is_shutdown.load() == true) - { - synchronization_data->is_async_shutdown_notified.store(true); - synchronization_data->async_shutdown_cv.notify_all(); - } } void BatchSpanProcessor::DrainQueue() @@ -294,9 +235,26 @@ void BatchSpanProcessor::DrainQueue() } Export(); -#ifdef ENABLE_ASYNC_EXPORT - WaitForShutdownCompletion(); -#endif + } +} + +void BatchSpanProcessor::GetWaitAdjustedTime( + std::chrono::microseconds &timeout, + std::chrono::time_point &start_time) +{ + auto end_time = std::chrono::system_clock::now(); + auto offset = std::chrono::duration_cast(end_time - start_time); + start_time = end_time; + timeout = opentelemetry::common::DurationUtil::AdjustWaitForTimeout( + timeout, std::chrono::microseconds::zero()); + if (timeout > offset && timeout > std::chrono::microseconds::zero()) + { + timeout -= offset; + } + else + { + // Some module use zero as indefinite timeout.So we can not reset timeout to zero here + timeout = std::chrono::microseconds(1); } } @@ -313,22 +271,7 @@ bool BatchSpanProcessor::Shutdown(std::chrono::microseconds timeout) noexcept worker_thread_.join(); } - auto worker_end_time = std::chrono::system_clock::now(); - auto offset = std::chrono::duration_cast(worker_end_time - start_time); - - // Fix timeout to meet requirement of wait_for - timeout = opentelemetry::common::DurationUtil::AdjustWaitForTimeout( - timeout, std::chrono::microseconds::zero()); - if (timeout > offset && timeout > std::chrono::microseconds::zero()) - { - timeout -= offset; - } - else - { - // Some module use zero as indefinite timeout.So we can not reset timeout to zero here - timeout = std::chrono::microseconds(1); - } - + GetWaitAdjustedTime(timeout, start_time); // Should only shutdown exporter ONCE. if (!already_shutdown && exporter_ != nullptr) { diff --git a/sdk/test/logs/CMakeLists.txt b/sdk/test/logs/CMakeLists.txt index 84b865d226..550b48edd8 100644 --- a/sdk/test/logs/CMakeLists.txt +++ b/sdk/test/logs/CMakeLists.txt @@ -1,5 +1,8 @@ -foreach(testname logger_provider_sdk_test logger_sdk_test log_record_test - simple_log_processor_test batch_log_processor_test) +foreach( + testname + logger_provider_sdk_test logger_sdk_test log_record_test + simple_log_processor_test batch_log_processor_test + async_batch_log_processor_test) add_executable(${testname} "${testname}.cc") target_link_libraries(${testname} ${GTEST_BOTH_LIBRARIES} ${CMAKE_THREAD_LIBS_INIT} opentelemetry_logs) diff --git a/sdk/test/logs/async_batch_log_processor_test.cc b/sdk/test/logs/async_batch_log_processor_test.cc new file mode 100644 index 0000000000..dba600b9a7 --- /dev/null +++ b/sdk/test/logs/async_batch_log_processor_test.cc @@ -0,0 +1,374 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 +#ifndef ENABLE_ASYNC_EXPORT +# include +TEST(AsyncBatchLogProcessor, DummyTest) +{ + // For linking +} +#endif + +#ifdef ENABLE_LOGS_PREVIEW +# ifdef ENABLE_ASYNC_EXPORT + +# include "opentelemetry/sdk/logs/async_batch_log_processor.h" +# include "opentelemetry/sdk/logs/exporter.h" +# include "opentelemetry/sdk/logs/log_record.h" + +# include +# include +# include +# include +# include + +using namespace opentelemetry::sdk::logs; +using namespace opentelemetry::sdk::common; + +/** + * A sample log exporter + * for testing the batch log processor + */ +class MockLogExporter final : public LogExporter +{ +public: + MockLogExporter(std::shared_ptr>> logs_received, + std::shared_ptr> is_shutdown, + std::shared_ptr> is_export_completed, + const std::chrono::milliseconds export_delay = std::chrono::milliseconds(0), + int callback_count = 1) + : logs_received_(logs_received), + is_shutdown_(is_shutdown), + is_export_completed_(is_export_completed), + export_delay_(export_delay), + callback_count_(callback_count) + {} + + std::unique_ptr MakeRecordable() noexcept + { + return std::unique_ptr(new LogRecord()); + } + + // Export method stores the logs received into a shared list of record names + ExportResult Export( + const opentelemetry::nostd::span> &records) noexcept override + { + *is_export_completed_ = false; // Meant exclusively to test scheduled_delay_millis + + for (auto &record : records) + { + auto log = std::unique_ptr(static_cast(record.release())); + if (log != nullptr) + { + logs_received_->push_back(std::move(log)); + } + } + + *is_export_completed_ = true; + return ExportResult::kSuccess; + } + + void Export(const opentelemetry::nostd::span> &records, + std::function + &&result_callback) noexcept override + { + // We should keep the order of test records + auto result = Export(records); + async_threads_.emplace_back(std::make_shared( + [this, + result](std::function &&result_callback) { + for (int i = 0; i < callback_count_; i++) + { + result_callback(result); + } + }, + std::move(result_callback))); + } + + // toggles the boolean flag marking this exporter as shut down + bool Shutdown( + std::chrono::microseconds timeout = std::chrono::microseconds::max()) noexcept override + { + while (!async_threads_.empty()) + { + std::list> async_threads; + async_threads.swap(async_threads_); + for (auto &async_thread : async_threads) + { + if (async_thread && async_thread->joinable()) + { + async_thread->join(); + } + } + } + *is_shutdown_ = true; + return true; + } + +private: + std::shared_ptr>> logs_received_; + std::shared_ptr> is_shutdown_; + std::shared_ptr> is_export_completed_; + const std::chrono::milliseconds export_delay_; + std::list> async_threads_; + int callback_count_; +}; + +/** + * A fixture class for testing the BatchLogProcessor class that uses the TestExporter defined above. + */ +class AsyncBatchLogProcessorTest : public testing::Test // ::testing::Test +{ +public: + // returns a batch log processor that received a batch of log records, a shared pointer to a + // is_shutdown flag, and the processor configuration options (default if unspecified) + std::shared_ptr GetMockProcessor( + std::shared_ptr>> logs_received, + std::shared_ptr> is_shutdown, + std::shared_ptr> is_export_completed = + std::shared_ptr>(new std::atomic(false)), + const std::chrono::milliseconds export_delay = std::chrono::milliseconds(0), + const std::chrono::milliseconds scheduled_delay_millis = std::chrono::milliseconds(5000), + const size_t max_queue_size = 2048, + const size_t max_export_batch_size = 512, + const size_t max_export_async = 8, + int callback_count = 1) + { + AsyncBatchLogProcessorOptions options; + options.max_queue_size = max_queue_size; + options.schedule_delay_millis = scheduled_delay_millis; + options.max_export_batch_size = max_export_batch_size; + options.max_export_async = max_export_async; + return std::shared_ptr(new AsyncBatchLogProcessor( + std::unique_ptr(new MockLogExporter( + logs_received, is_shutdown, is_export_completed, export_delay, callback_count)), + options)); + } +}; + +TEST_F(AsyncBatchLogProcessorTest, TestAsyncShutdown) +{ + // initialize a batch log processor with the test exporter + std::shared_ptr>> logs_received( + new std::vector>); + std::shared_ptr> is_shutdown(new std::atomic(false)); + std::shared_ptr> is_export_completed(new std::atomic(false)); + + const std::chrono::milliseconds export_delay(0); + const std::chrono::milliseconds scheduled_delay_millis(5000); + const size_t max_export_batch_size = 512; + const size_t max_queue_size = 2048; + const size_t max_export_async = 5; + + auto batch_processor = GetMockProcessor(logs_received, is_shutdown, is_export_completed, + export_delay, scheduled_delay_millis, max_queue_size, + max_export_batch_size, max_export_async); + + // Create a few test log records and send them to the processor + const int num_logs = 2048; + + for (int i = 0; i < num_logs; ++i) + { + auto log = batch_processor->MakeRecordable(); + log->SetName("Log" + std::to_string(i)); + batch_processor->OnReceive(std::move(log)); + } + + // Test that shutting down the processor will first wait for the + // current batch of logs to be sent to the log exporter + // by checking the number of logs sent and the names of the logs sent + EXPECT_EQ(true, batch_processor->Shutdown()); + // It's safe to shutdown again + EXPECT_TRUE(batch_processor->Shutdown()); + + EXPECT_EQ(num_logs, logs_received->size()); + + // Assume logs are received by exporter in same order as sent by processor + for (int i = 0; i < num_logs; ++i) + { + EXPECT_EQ("Log" + std::to_string(i), logs_received->at(i)->GetName()); + } + + // Also check that the processor is shut down at the end + EXPECT_TRUE(is_shutdown->load()); +} + +TEST_F(AsyncBatchLogProcessorTest, TestAsyncShutdownNoCallback) +{ + // initialize a batch log processor with the test exporter + std::shared_ptr>> logs_received( + new std::vector>); + std::shared_ptr> is_shutdown(new std::atomic(false)); + std::shared_ptr> is_export_completed(new std::atomic(false)); + + const std::chrono::milliseconds export_delay(0); + const std::chrono::milliseconds scheduled_delay_millis(5000); + const size_t max_export_batch_size = 512; + const size_t max_queue_size = 2048; + const size_t max_export_async = 5; + + auto batch_processor = GetMockProcessor(logs_received, is_shutdown, is_export_completed, + export_delay, scheduled_delay_millis, max_queue_size, + max_export_batch_size, max_export_async, 0); + + // Create a few test log records and send them to the processor + const int num_logs = 2048; + + for (int i = 0; i < num_logs; ++i) + { + auto log = batch_processor->MakeRecordable(); + log->SetName("Log" + std::to_string(i)); + batch_processor->OnReceive(std::move(log)); + } + + EXPECT_EQ(true, batch_processor->Shutdown(std::chrono::milliseconds(5000))); + // It's safe to shutdown again + EXPECT_TRUE(batch_processor->Shutdown()); + + // Also check that the processor is shut down at the end + EXPECT_TRUE(is_shutdown->load()); +} + +TEST_F(AsyncBatchLogProcessorTest, TestAsyncForceFlush) +{ + std::shared_ptr> is_shutdown(new std::atomic(false)); + std::shared_ptr>> logs_received( + new std::vector>); + std::shared_ptr> is_export_completed(new std::atomic(false)); + + const std::chrono::milliseconds export_delay(0); + const std::chrono::milliseconds scheduled_delay_millis(5000); + const size_t max_export_batch_size = 512; + const size_t max_queue_size = 2048; + + auto batch_processor = + GetMockProcessor(logs_received, is_shutdown, is_export_completed, export_delay, + scheduled_delay_millis, max_queue_size, max_export_batch_size); + + const int num_logs = 2048; + + for (int i = 0; i < num_logs; ++i) + { + auto log = batch_processor->MakeRecordable(); + log->SetName("Log" + std::to_string(i)); + batch_processor->OnReceive(std::move(log)); + } + + EXPECT_TRUE(batch_processor->ForceFlush()); + + EXPECT_EQ(num_logs, logs_received->size()); + for (int i = 0; i < num_logs; ++i) + { + EXPECT_EQ("Log" + std::to_string(i), logs_received->at(i)->GetName()); + } + + // Create some more logs to make sure that the processor still works + for (int i = 0; i < num_logs; ++i) + { + auto log = batch_processor->MakeRecordable(); + log->SetName("Log" + std::to_string(i)); + batch_processor->OnReceive(std::move(log)); + } + + EXPECT_TRUE(batch_processor->ForceFlush()); + + EXPECT_EQ(num_logs * 2, logs_received->size()); + for (int i = 0; i < num_logs * 2; ++i) + { + EXPECT_EQ("Log" + std::to_string(i % num_logs), logs_received->at(i)->GetName()); + } +} + +TEST_F(AsyncBatchLogProcessorTest, TestManyLogsLoss) +{ + /* Test that when exporting more than max_queue_size logs, some are most likely lost*/ + + std::shared_ptr> is_shutdown(new std::atomic(false)); + std::shared_ptr>> logs_received( + new std::vector>); + + const int max_queue_size = 4096; + + auto batch_processor = GetMockProcessor(logs_received, is_shutdown); + + // Create max_queue_size log records + for (int i = 0; i < max_queue_size; ++i) + { + auto log = batch_processor->MakeRecordable(); + log->SetName("Log" + std::to_string(i)); + batch_processor->OnReceive(std::move(log)); + } + + EXPECT_TRUE(batch_processor->ForceFlush()); + + // Log should be exported by now + EXPECT_GE(max_queue_size, logs_received->size()); +} + +TEST_F(AsyncBatchLogProcessorTest, TestManyLogsLossLess) +{ + /* Test that no logs are lost when sending max_queue_size logs */ + + std::shared_ptr> is_shutdown(new std::atomic(false)); + std::shared_ptr>> logs_received( + new std::vector>); + auto batch_processor = GetMockProcessor(logs_received, is_shutdown); + + const int num_logs = 2048; + + for (int i = 0; i < num_logs; ++i) + { + auto log = batch_processor->MakeRecordable(); + log->SetName("Log" + std::to_string(i)); + batch_processor->OnReceive(std::move(log)); + } + + EXPECT_TRUE(batch_processor->ForceFlush()); + + EXPECT_EQ(num_logs, logs_received->size()); + for (int i = 0; i < num_logs; ++i) + { + EXPECT_EQ("Log" + std::to_string(i), logs_received->at(i)->GetName()); + } +} + +TEST_F(AsyncBatchLogProcessorTest, TestScheduledDelayMillis) +{ + /* Test that max_export_batch_size logs are exported every scheduled_delay_millis + seconds */ + + std::shared_ptr> is_shutdown(new std::atomic(false)); + std::shared_ptr> is_export_completed(new std::atomic(false)); + std::shared_ptr>> logs_received( + new std::vector>); + + const std::chrono::milliseconds export_delay(0); + const std::chrono::milliseconds scheduled_delay_millis(2000); + const size_t max_export_batch_size = 512; + + auto batch_processor = GetMockProcessor(logs_received, is_shutdown, is_export_completed, + export_delay, scheduled_delay_millis); + + for (std::size_t i = 0; i < max_export_batch_size; ++i) + { + auto log = batch_processor->MakeRecordable(); + log->SetName("Log" + std::to_string(i)); + batch_processor->OnReceive(std::move(log)); + } + // Sleep for scheduled_delay_millis milliseconds + std::this_thread::sleep_for(scheduled_delay_millis); + + // small delay to give time to export, which is being performed + // asynchronously by the worker thread (this thread will not + // forcibly join() the main thread unless processor's shutdown() is called). + std::this_thread::sleep_for(std::chrono::milliseconds(50)); + + // Logs should be exported by now + EXPECT_TRUE(is_export_completed->load()); + EXPECT_EQ(max_export_batch_size, logs_received->size()); + for (size_t i = 0; i < max_export_batch_size; ++i) + { + EXPECT_EQ("Log" + std::to_string(i), logs_received->at(i)->GetName()); + } +} +# endif +#endif diff --git a/sdk/test/logs/batch_log_processor_test.cc b/sdk/test/logs/batch_log_processor_test.cc index 2ad4eaec67..5e4e0f852b 100644 --- a/sdk/test/logs/batch_log_processor_test.cc +++ b/sdk/test/logs/batch_log_processor_test.cc @@ -64,11 +64,7 @@ class MockLogExporter final : public LogExporter { // We should keep the order of test records auto result = Export(records); - async_threads_.emplace_back(std::make_shared( - [result](std::function &&result_callback) { - result_callback(result); - }, - std::move(result_callback))); + result_callback(result); } # endif @@ -76,18 +72,6 @@ class MockLogExporter final : public LogExporter bool Shutdown( std::chrono::microseconds timeout = std::chrono::microseconds::max()) noexcept override { - while (!async_threads_.empty()) - { - std::list> async_threads; - async_threads.swap(async_threads_); - for (auto &async_thread : async_threads) - { - if (async_thread && async_thread->joinable()) - { - async_thread->join(); - } - } - } *is_shutdown_ = true; return true; } @@ -97,7 +81,6 @@ class MockLogExporter final : public LogExporter std::shared_ptr> is_shutdown_; std::shared_ptr> is_export_completed_; const std::chrono::milliseconds export_delay_; - std::list> async_threads_; }; /** @@ -116,13 +99,12 @@ class BatchLogProcessorTest : public testing::Test // ::testing::Test const std::chrono::milliseconds export_delay = std::chrono::milliseconds(0), const std::chrono::milliseconds scheduled_delay_millis = std::chrono::milliseconds(5000), const size_t max_queue_size = 2048, - const size_t max_export_batch_size = 512, - const bool is_export_async = false) + const size_t max_export_batch_size = 512) { - return std::shared_ptr(new BatchLogProcessor( - std::unique_ptr( - new MockLogExporter(logs_received, is_shutdown, is_export_completed, export_delay)), - max_queue_size, scheduled_delay_millis, max_export_batch_size, is_export_async)); + return std::shared_ptr( + new BatchLogProcessor(std::unique_ptr(new MockLogExporter( + logs_received, is_shutdown, is_export_completed, export_delay)), + max_queue_size, scheduled_delay_millis, max_export_batch_size)); } }; @@ -164,55 +146,6 @@ TEST_F(BatchLogProcessorTest, TestShutdown) EXPECT_TRUE(is_shutdown->load()); } -# ifdef ENABLE_ASYNC_EXPORT -TEST_F(BatchLogProcessorTest, TestAsyncShutdown) -{ - // initialize a batch log processor with the test exporter - std::shared_ptr>> logs_received( - new std::vector>); - std::shared_ptr> is_shutdown(new std::atomic(false)); - std::shared_ptr> is_export_completed(new std::atomic(false)); - - const std::chrono::milliseconds export_delay(0); - const std::chrono::milliseconds scheduled_delay_millis(5000); - const size_t max_export_batch_size = 512; - const size_t max_queue_size = 2048; - const bool is_export_async = true; - - auto batch_processor = GetMockProcessor(logs_received, is_shutdown, is_export_completed, - export_delay, scheduled_delay_millis, max_queue_size, - max_export_batch_size, is_export_async); - - // Create a few test log records and send them to the processor - const int num_logs = 3; - - for (int i = 0; i < num_logs; ++i) - { - auto log = batch_processor->MakeRecordable(); - log->SetName("Log" + std::to_string(i)); - batch_processor->OnReceive(std::move(log)); - } - - // Test that shutting down the processor will first wait for the - // current batch of logs to be sent to the log exporter - // by checking the number of logs sent and the names of the logs sent - EXPECT_EQ(true, batch_processor->Shutdown()); - // It's safe to shutdown again - EXPECT_TRUE(batch_processor->Shutdown()); - - EXPECT_EQ(num_logs, logs_received->size()); - - // Assume logs are received by exporter in same order as sent by processor - for (int i = 0; i < num_logs; ++i) - { - EXPECT_EQ("Log" + std::to_string(i), logs_received->at(i)->GetName()); - } - - // Also check that the processor is shut down at the end - EXPECT_TRUE(is_shutdown->load()); -} -# endif - TEST_F(BatchLogProcessorTest, TestForceFlush) { std::shared_ptr> is_shutdown(new std::atomic(false)); @@ -254,59 +187,6 @@ TEST_F(BatchLogProcessorTest, TestForceFlush) } } -# ifdef ENABLE_ASYNC_EXPORT -TEST_F(BatchLogProcessorTest, TestAsyncForceFlush) -{ - std::shared_ptr> is_shutdown(new std::atomic(false)); - std::shared_ptr>> logs_received( - new std::vector>); - std::shared_ptr> is_export_completed(new std::atomic(false)); - - const std::chrono::milliseconds export_delay(0); - const std::chrono::milliseconds scheduled_delay_millis(5000); - const size_t max_export_batch_size = 512; - const size_t max_queue_size = 2048; - const bool is_export_async = true; - - auto batch_processor = GetMockProcessor(logs_received, is_shutdown, is_export_completed, - export_delay, scheduled_delay_millis, max_queue_size, - max_export_batch_size, is_export_async); - - const int num_logs = 2048; - - for (int i = 0; i < num_logs; ++i) - { - auto log = batch_processor->MakeRecordable(); - log->SetName("Log" + std::to_string(i)); - batch_processor->OnReceive(std::move(log)); - } - - EXPECT_TRUE(batch_processor->ForceFlush()); - - EXPECT_EQ(num_logs, logs_received->size()); - for (int i = 0; i < num_logs; ++i) - { - EXPECT_EQ("Log" + std::to_string(i), logs_received->at(i)->GetName()); - } - - // Create some more logs to make sure that the processor still works - for (int i = 0; i < num_logs; ++i) - { - auto log = batch_processor->MakeRecordable(); - log->SetName("Log" + std::to_string(i)); - batch_processor->OnReceive(std::move(log)); - } - - EXPECT_TRUE(batch_processor->ForceFlush()); - - EXPECT_EQ(num_logs * 2, logs_received->size()); - for (int i = 0; i < num_logs * 2; ++i) - { - EXPECT_EQ("Log" + std::to_string(i % num_logs), logs_received->at(i)->GetName()); - } -} -# endif - TEST_F(BatchLogProcessorTest, TestManyLogsLoss) { /* Test that when exporting more than max_queue_size logs, some are most likely lost*/ diff --git a/sdk/test/trace/CMakeLists.txt b/sdk/test/trace/CMakeLists.txt index b02ff705fa..8e9402625b 100644 --- a/sdk/test/trace/CMakeLists.txt +++ b/sdk/test/trace/CMakeLists.txt @@ -8,7 +8,8 @@ foreach( always_on_sampler_test parent_sampler_test trace_id_ratio_sampler_test - batch_span_processor_test) + batch_span_processor_test + async_batch_span_processor_test) add_executable(${testname} "${testname}.cc") target_link_libraries( ${testname} diff --git a/sdk/test/trace/async_batch_span_processor_test.cc b/sdk/test/trace/async_batch_span_processor_test.cc new file mode 100644 index 0000000000..1ac28c5bc9 --- /dev/null +++ b/sdk/test/trace/async_batch_span_processor_test.cc @@ -0,0 +1,375 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 +#ifndef ENABLE_ASYNC_EXPORT +# include +TEST(AsyncBatchSpanProcessor, DummyTest) +{ + // For linking +} +#endif + +#ifdef ENABLE_ASYNC_EXPORT + +# include "opentelemetry/sdk/trace/async_batch_span_processor.h" +# include "opentelemetry/sdk/trace/span_data.h" +# include "opentelemetry/sdk/trace/tracer.h" + +# include +# include +# include +# include +# include + +OPENTELEMETRY_BEGIN_NAMESPACE + +/** + * Returns a mock span exporter meant exclusively for testing only + */ +class MockSpanExporter final : public sdk::trace::SpanExporter +{ +public: + MockSpanExporter( + std::shared_ptr>> spans_received, + std::shared_ptr> is_shutdown, + std::shared_ptr> is_export_completed = + std::shared_ptr>(new std::atomic(false)), + const std::chrono::milliseconds export_delay = std::chrono::milliseconds(0), + int callback_count = 1) noexcept + : spans_received_(spans_received), + is_shutdown_(is_shutdown), + is_export_completed_(is_export_completed), + export_delay_(export_delay), + callback_count_(callback_count) + {} + + std::unique_ptr MakeRecordable() noexcept override + { + return std::unique_ptr(new sdk::trace::SpanData); + } + + sdk::common::ExportResult Export( + const nostd::span> &recordables) noexcept override + { + *is_export_completed_ = false; + + std::this_thread::sleep_for(export_delay_); + + for (auto &recordable : recordables) + { + auto span = std::unique_ptr( + static_cast(recordable.release())); + + if (span != nullptr) + { + spans_received_->push_back(std::move(span)); + } + } + + *is_export_completed_ = true; + return sdk::common::ExportResult::kSuccess; + } + + void Export(const nostd::span> &records, + std::function + &&result_callback) noexcept override + { + // We should keep the order of test records + auto result = Export(records); + async_threads_.emplace_back(std::make_shared( + [this, + result](std::function &&result_callback) { + for (int i = 0; i < callback_count_; i++) + { + result_callback(result); + } + }, + std::move(result_callback))); + } + + bool Shutdown( + std::chrono::microseconds timeout = std::chrono::microseconds::max()) noexcept override + { + while (!async_threads_.empty()) + { + std::list> async_threads; + async_threads.swap(async_threads_); + for (auto &async_thread : async_threads) + { + if (async_thread && async_thread->joinable()) + { + async_thread->join(); + } + } + } + *is_shutdown_ = true; + return true; + } + + bool IsExportCompleted() { return is_export_completed_->load(); } + +private: + std::shared_ptr>> spans_received_; + std::shared_ptr> is_shutdown_; + std::shared_ptr> is_export_completed_; + // Meant exclusively to test force flush timeout + const std::chrono::milliseconds export_delay_; + std::list> async_threads_; + int callback_count_; +}; + +/** + * Fixture Class + */ +class AsyncBatchSpanProcessorTestPeer : public testing::Test +{ +public: + std::unique_ptr>> GetTestSpans( + std::shared_ptr processor, + const int num_spans) + { + std::unique_ptr>> test_spans( + new std::vector>); + + for (int i = 0; i < num_spans; ++i) + { + test_spans->push_back(processor->MakeRecordable()); + static_cast(test_spans->at(i).get()) + ->SetName("Span " + std::to_string(i)); + } + + return test_spans; + } +}; + +/* ################################## TESTS ############################################ */ + +TEST_F(AsyncBatchSpanProcessorTestPeer, TestAsyncShutdown) +{ + std::shared_ptr>> spans_received( + new std::vector>); + std::shared_ptr> is_shutdown(new std::atomic(false)); + + sdk::trace::AsyncBatchSpanProcessorOptions options{}; + options.max_export_async = 5; + + auto batch_processor = + std::shared_ptr(new sdk::trace::AsyncBatchSpanProcessor( + std::unique_ptr(new MockSpanExporter(spans_received, is_shutdown)), + options)); + const int num_spans = 2048; + + auto test_spans = GetTestSpans(batch_processor, num_spans); + + for (int i = 0; i < num_spans; ++i) + { + batch_processor->OnEnd(std::move(test_spans->at(i))); + } + + EXPECT_TRUE(batch_processor->Shutdown(std::chrono::milliseconds(5000))); + // It's safe to shutdown again + EXPECT_TRUE(batch_processor->Shutdown()); + + EXPECT_EQ(num_spans, spans_received->size()); + for (int i = 0; i < num_spans; ++i) + { + EXPECT_EQ("Span " + std::to_string(i), spans_received->at(i)->GetName()); + } + + EXPECT_TRUE(is_shutdown->load()); +} + +TEST_F(AsyncBatchSpanProcessorTestPeer, TestAsyncShutdownNoCallback) +{ + std::shared_ptr> is_export_completed(new std::atomic(false)); + std::shared_ptr>> spans_received( + new std::vector>); + const std::chrono::milliseconds export_delay(0); + std::shared_ptr> is_shutdown(new std::atomic(false)); + + sdk::trace::AsyncBatchSpanProcessorOptions options{}; + options.max_export_async = 8; + + auto batch_processor = + std::shared_ptr(new sdk::trace::AsyncBatchSpanProcessor( + std::unique_ptr(new MockSpanExporter( + spans_received, is_shutdown, is_export_completed, export_delay, 0)), + options)); + const int num_spans = 2048; + + auto test_spans = GetTestSpans(batch_processor, num_spans); + + for (int i = 0; i < num_spans; ++i) + { + batch_processor->OnEnd(std::move(test_spans->at(i))); + } + + // Shutdown should never block for ever and return on timeout + EXPECT_TRUE(batch_processor->Shutdown(std::chrono::milliseconds(5000))); + // It's safe to shutdown again + EXPECT_TRUE(batch_processor->Shutdown()); + + EXPECT_TRUE(is_shutdown->load()); +} + +TEST_F(AsyncBatchSpanProcessorTestPeer, TestAsyncForceFlush) +{ + std::shared_ptr> is_shutdown(new std::atomic(false)); + std::shared_ptr>> spans_received( + new std::vector>); + + sdk::trace::AsyncBatchSpanProcessorOptions options{}; + + auto batch_processor = + std::shared_ptr(new sdk::trace::AsyncBatchSpanProcessor( + std::unique_ptr(new MockSpanExporter(spans_received, is_shutdown)), + options)); + const int num_spans = 2048; + + auto test_spans = GetTestSpans(batch_processor, num_spans); + + for (int i = 0; i < num_spans; ++i) + { + batch_processor->OnEnd(std::move(test_spans->at(i))); + } + + // Give some time to export + std::this_thread::sleep_for(std::chrono::milliseconds(50)); + + EXPECT_TRUE(batch_processor->ForceFlush()); + + EXPECT_EQ(num_spans, spans_received->size()); + for (int i = 0; i < num_spans; ++i) + { + EXPECT_EQ("Span " + std::to_string(i), spans_received->at(i)->GetName()); + } + + // Create some more spans to make sure that the processor still works + auto more_test_spans = GetTestSpans(batch_processor, num_spans); + for (int i = 0; i < num_spans; ++i) + { + batch_processor->OnEnd(std::move(more_test_spans->at(i))); + } + + // Give some time to export the spans + std::this_thread::sleep_for(std::chrono::milliseconds(50)); + + EXPECT_TRUE(batch_processor->ForceFlush()); + + EXPECT_EQ(num_spans * 2, spans_received->size()); + for (int i = 0; i < num_spans; ++i) + { + EXPECT_EQ("Span " + std::to_string(i % num_spans), + spans_received->at(num_spans + i)->GetName()); + } +} + +TEST_F(AsyncBatchSpanProcessorTestPeer, TestManySpansLoss) +{ + /* Test that when exporting more than max_queue_size spans, some are most likely lost*/ + + std::shared_ptr> is_shutdown(new std::atomic(false)); + std::shared_ptr>> spans_received( + new std::vector>); + + const int max_queue_size = 4096; + + auto batch_processor = + std::shared_ptr(new sdk::trace::AsyncBatchSpanProcessor( + std::unique_ptr(new MockSpanExporter(spans_received, is_shutdown)), + sdk::trace::AsyncBatchSpanProcessorOptions())); + + auto test_spans = GetTestSpans(batch_processor, max_queue_size); + + for (int i = 0; i < max_queue_size; ++i) + { + batch_processor->OnEnd(std::move(test_spans->at(i))); + } + + // Give some time to export the spans + std::this_thread::sleep_for(std::chrono::milliseconds(700)); + + EXPECT_TRUE(batch_processor->ForceFlush()); + + // Span should be exported by now + EXPECT_GE(max_queue_size, spans_received->size()); +} + +TEST_F(AsyncBatchSpanProcessorTestPeer, TestManySpansLossLess) +{ + /* Test that no spans are lost when sending max_queue_size spans */ + + std::shared_ptr> is_shutdown(new std::atomic(false)); + std::shared_ptr>> spans_received( + new std::vector>); + + const int num_spans = 2048; + + auto batch_processor = + std::shared_ptr(new sdk::trace::AsyncBatchSpanProcessor( + std::unique_ptr(new MockSpanExporter(spans_received, is_shutdown)), + sdk::trace::AsyncBatchSpanProcessorOptions())); + + auto test_spans = GetTestSpans(batch_processor, num_spans); + + for (int i = 0; i < num_spans; ++i) + { + batch_processor->OnEnd(std::move(test_spans->at(i))); + } + + // Give some time to export the spans + std::this_thread::sleep_for(std::chrono::milliseconds(50)); + + EXPECT_TRUE(batch_processor->ForceFlush()); + + EXPECT_EQ(num_spans, spans_received->size()); + for (int i = 0; i < num_spans; ++i) + { + EXPECT_EQ("Span " + std::to_string(i), spans_received->at(i)->GetName()); + } +} + +TEST_F(AsyncBatchSpanProcessorTestPeer, TestScheduleDelayMillis) +{ + /* Test that max_export_batch_size spans are exported every schedule_delay_millis + seconds */ + + std::shared_ptr> is_shutdown(new std::atomic(false)); + std::shared_ptr> is_export_completed(new std::atomic(false)); + std::shared_ptr>> spans_received( + new std::vector>); + const std::chrono::milliseconds export_delay(0); + const size_t max_export_batch_size = 512; + sdk::trace::AsyncBatchSpanProcessorOptions options{}; + options.schedule_delay_millis = std::chrono::milliseconds(2000); + + auto batch_processor = + std::shared_ptr(new sdk::trace::AsyncBatchSpanProcessor( + std::unique_ptr( + new MockSpanExporter(spans_received, is_shutdown, is_export_completed, export_delay)), + options)); + + auto test_spans = GetTestSpans(batch_processor, max_export_batch_size); + + for (size_t i = 0; i < max_export_batch_size; ++i) + { + batch_processor->OnEnd(std::move(test_spans->at(i))); + } + + // Sleep for schedule_delay_millis milliseconds + std::this_thread::sleep_for(options.schedule_delay_millis); + + // small delay to give time to export + std::this_thread::sleep_for(std::chrono::milliseconds(50)); + + // Spans should be exported by now + EXPECT_TRUE(is_export_completed->load()); + EXPECT_EQ(max_export_batch_size, spans_received->size()); + for (size_t i = 0; i < max_export_batch_size; ++i) + { + EXPECT_EQ("Span " + std::to_string(i), spans_received->at(i)->GetName()); + } +} + +OPENTELEMETRY_END_NAMESPACE + +#endif diff --git a/sdk/test/trace/batch_span_processor_test.cc b/sdk/test/trace/batch_span_processor_test.cc index 122cbbedfe..6f270b9766 100644 --- a/sdk/test/trace/batch_span_processor_test.cc +++ b/sdk/test/trace/batch_span_processor_test.cc @@ -63,31 +63,15 @@ class MockSpanExporter final : public sdk::trace::SpanExporter std::function &&result_callback) noexcept override { - // We should keep the order of test records + // This is just dummy implementation. auto result = Export(records); - async_threads_.emplace_back(std::make_shared( - [result](std::function &&result_callback) { - result_callback(result); - }, - std::move(result_callback))); + result_callback(result); } #endif bool Shutdown( std::chrono::microseconds timeout = std::chrono::microseconds::max()) noexcept override { - while (!async_threads_.empty()) - { - std::list> async_threads; - async_threads.swap(async_threads_); - for (auto &async_thread : async_threads) - { - if (async_thread && async_thread->joinable()) - { - async_thread->join(); - } - } - } *is_shutdown_ = true; return true; } @@ -100,7 +84,6 @@ class MockSpanExporter final : public sdk::trace::SpanExporter std::shared_ptr> is_export_completed_; // Meant exclusively to test force flush timeout const std::chrono::milliseconds export_delay_; - std::list> async_threads_; }; /** @@ -161,96 +144,6 @@ TEST_F(BatchSpanProcessorTestPeer, TestShutdown) EXPECT_TRUE(is_shutdown->load()); } -#ifdef ENABLE_ASYNC_EXPORT -TEST_F(BatchSpanProcessorTestPeer, TestAsyncShutdown) -{ - std::shared_ptr> is_shutdown(new std::atomic(false)); - std::shared_ptr>> spans_received( - new std::vector>); - - sdk::trace::BatchSpanProcessorOptions options{}; - options.is_export_async = true; - - auto batch_processor = - std::shared_ptr(new sdk::trace::BatchSpanProcessor( - std::unique_ptr(new MockSpanExporter(spans_received, is_shutdown)), - options)); - const int num_spans = 3; - - auto test_spans = GetTestSpans(batch_processor, num_spans); - - for (int i = 0; i < num_spans; ++i) - { - batch_processor->OnEnd(std::move(test_spans->at(i))); - } - - EXPECT_TRUE(batch_processor->Shutdown()); - // It's safe to shutdown again - EXPECT_TRUE(batch_processor->Shutdown()); - - EXPECT_EQ(num_spans, spans_received->size()); - for (int i = 0; i < num_spans; ++i) - { - EXPECT_EQ("Span " + std::to_string(i), spans_received->at(i)->GetName()); - } - - EXPECT_TRUE(is_shutdown->load()); -} - -TEST_F(BatchSpanProcessorTestPeer, TestAsyncForceFlush) -{ - std::shared_ptr> is_shutdown(new std::atomic(false)); - std::shared_ptr>> spans_received( - new std::vector>); - - sdk::trace::BatchSpanProcessorOptions options{}; - options.is_export_async = true; - - auto batch_processor = - std::shared_ptr(new sdk::trace::BatchSpanProcessor( - std::unique_ptr(new MockSpanExporter(spans_received, is_shutdown)), - options)); - const int num_spans = 2048; - - auto test_spans = GetTestSpans(batch_processor, num_spans); - - for (int i = 0; i < num_spans; ++i) - { - batch_processor->OnEnd(std::move(test_spans->at(i))); - } - - // Give some time to export - std::this_thread::sleep_for(std::chrono::milliseconds(50)); - - EXPECT_TRUE(batch_processor->ForceFlush()); - - EXPECT_EQ(num_spans, spans_received->size()); - for (int i = 0; i < num_spans; ++i) - { - EXPECT_EQ("Span " + std::to_string(i), spans_received->at(i)->GetName()); - } - - // Create some more spans to make sure that the processor still works - auto more_test_spans = GetTestSpans(batch_processor, num_spans); - for (int i = 0; i < num_spans; ++i) - { - batch_processor->OnEnd(std::move(more_test_spans->at(i))); - } - - // Give some time to export the spans - std::this_thread::sleep_for(std::chrono::milliseconds(50)); - - EXPECT_TRUE(batch_processor->ForceFlush()); - - EXPECT_EQ(num_spans * 2, spans_received->size()); - for (int i = 0; i < num_spans; ++i) - { - EXPECT_EQ("Span " + std::to_string(i % num_spans), - spans_received->at(num_spans + i)->GetName()); - } -} -#endif - TEST_F(BatchSpanProcessorTestPeer, TestForceFlush) { std::shared_ptr> is_shutdown(new std::atomic(false));