diff --git a/examples/CMakeLists.txt b/examples/CMakeLists.txt index 3d5299a444..8aec6526a6 100644 --- a/examples/CMakeLists.txt +++ b/examples/CMakeLists.txt @@ -6,4 +6,5 @@ add_subdirectory(simple) add_subdirectory(batch) add_subdirectory(metrics_simple) add_subdirectory(multithreaded) +add_subdirectory(multi_processor) add_subdirectory(http) diff --git a/examples/http/tracer_common.hpp b/examples/http/tracer_common.hpp index 19239890c6..0b0723a61a 100644 --- a/examples/http/tracer_common.hpp +++ b/examples/http/tracer_common.hpp @@ -4,7 +4,7 @@ #include "opentelemetry/sdk/trace/tracer_provider.h" #include "opentelemetry/trace/provider.h" -#include +#include namespace { @@ -13,8 +13,10 @@ void initTracer() { new opentelemetry::exporter::trace::OStreamSpanExporter); auto processor = std::unique_ptr( new sdktrace::SimpleSpanProcessor(std::move(exporter))); + std::vector> processors; + processors.push_back(std::move(processor)); // Default is an always-on sampler. - auto context = std::make_shared(std::move(processor)); + auto context = std::make_shared(std::move(processors)); auto provider = nostd::shared_ptr( new sdktrace::TracerProvider(context)); // Set the global trace provider @@ -27,4 +29,4 @@ nostd::shared_ptr get_tracer(std::string tracer_na return provider->GetTracer(tracer_name); } -} \ No newline at end of file +} diff --git a/examples/multi_processor/BUILD b/examples/multi_processor/BUILD new file mode 100644 index 0000000000..a5464bdfc2 --- /dev/null +++ b/examples/multi_processor/BUILD @@ -0,0 +1,26 @@ +cc_library( + name = "foo_multi_library", + srcs = [ + "foo_library/foo_library.cc", + ], + hdrs = [ + "foo_library/foo_library.h", + ], + deps = [ + "//api", + ], +) + +cc_binary( + name = "example_multi_processor", + srcs = [ + "main.cc", + ], + deps = [ + ":foo_multi_library", + "//api", + "//exporters/memory:in_memory_span_exporter", + "//exporters/ostream:ostream_span_exporter", + "//sdk/src/trace", + ], +) diff --git a/examples/multi_processor/CMakeLists.txt b/examples/multi_processor/CMakeLists.txt new file mode 100644 index 0000000000..65c7c2e510 --- /dev/null +++ b/examples/multi_processor/CMakeLists.txt @@ -0,0 +1,10 @@ +include_directories(${CMAKE_SOURCE_DIR}/exporters/ostream/include + ${CMAKE_SOURCE_DIR}/exporters/memory/include) + +add_library(foo_multi_library foo_library/foo_library.cc) +target_link_libraries(foo_multi_library opentelemetry_exporter_ostream_span + ${CMAKE_THREAD_LIBS_INIT} opentelemetry_api) + +add_executable(example_multi_processor main.cc) +target_link_libraries(example_multi_processor ${CMAKE_THREAD_LIBS_INIT} + foo_multi_library opentelemetry_trace) diff --git a/examples/multi_processor/README.md b/examples/multi_processor/README.md new file mode 100644 index 0000000000..3efbfa6ef1 --- /dev/null +++ b/examples/multi_processor/README.md @@ -0,0 +1,14 @@ +# Multi Processor Trace Example + +In this example, the application in `main.cc` initializes and registers a tracer +provider from the [OpenTelemetry +SDK](https://github.com/open-telemetry/opentelemetry-cpp). The `TracerProvider` is connected +to two `processor-exporter` pipelines - for exporting simultaneously to `StdoutSpanExporter` +and `InMemorySpanExporter`. The application then +calls a `foo_library` which has been instrumented using the [OpenTelemetry +API](https://github.com/open-telemetry/opentelemetry-cpp/tree/main/api). +Resulting telemetry is directed to stdout through the StdoutSpanExporter, and saved as a +variable (vector of spans) through `InMemorySpanExporter`. + +See [CONTRIBUTING.md](../../CONTRIBUTING.md) for instructions on building and +running the example. diff --git a/examples/multi_processor/foo_library/foo_library.cc b/examples/multi_processor/foo_library/foo_library.cc new file mode 100644 index 0000000000..73077f135e --- /dev/null +++ b/examples/multi_processor/foo_library/foo_library.cc @@ -0,0 +1,42 @@ +#include "opentelemetry/trace/provider.h" + +namespace trace = opentelemetry::trace; +namespace nostd = opentelemetry::nostd; + +namespace +{ +nostd::shared_ptr get_tracer() +{ + auto provider = trace::Provider::GetTracerProvider(); + return provider->GetTracer("foo_library"); +} + +void f1() +{ + auto span = get_tracer()->StartSpan("f1"); + auto scope = get_tracer()->WithActiveSpan(span); + + span->End(); +} + +void f2() +{ + auto span = get_tracer()->StartSpan("f2"); + auto scope = get_tracer()->WithActiveSpan(span); + + f1(); + f1(); + + span->End(); +} +} // namespace + +void foo_library() +{ + auto span = get_tracer()->StartSpan("library"); + auto scope = get_tracer()->WithActiveSpan(span); + + f2(); + + span->End(); +} diff --git a/examples/multi_processor/foo_library/foo_library.h b/examples/multi_processor/foo_library/foo_library.h new file mode 100644 index 0000000000..7ac75c0e50 --- /dev/null +++ b/examples/multi_processor/foo_library/foo_library.h @@ -0,0 +1,3 @@ +#pragma once + +void foo_library(); diff --git a/examples/multi_processor/main.cc b/examples/multi_processor/main.cc new file mode 100644 index 0000000000..64e9098f9f --- /dev/null +++ b/examples/multi_processor/main.cc @@ -0,0 +1,79 @@ +#include "opentelemetry/sdk/trace/simple_processor.h" +#include "opentelemetry/sdk/trace/tracer_context.h" +#include "opentelemetry/sdk/trace/tracer_provider.h" +#include "opentelemetry/trace/provider.h" + +// Using an exporter that simply dumps span data to stdout. +#include "foo_library/foo_library.h" +#include "opentelemetry/exporters/memory/in_memory_span_exporter.h" +#include "opentelemetry/exporters/ostream/span_exporter.h" + +using opentelemetry::exporter::memory::InMemorySpanExporter; + +InMemorySpanExporter *memory_span_exporter; + +namespace +{ +void initTracer() +{ + auto exporter1 = std::unique_ptr( + new opentelemetry::exporter::trace::OStreamSpanExporter); + auto processor1 = std::unique_ptr( + new sdktrace::SimpleSpanProcessor(std::move(exporter1))); + + auto exporter2 = std::unique_ptr(new InMemorySpanExporter()); + + // fetch the exporter for dumping data later + memory_span_exporter = dynamic_cast(exporter2.get()); + + auto processor2 = std::unique_ptr( + new sdktrace::SimpleSpanProcessor(std::move(exporter2))); + + auto provider = nostd::shared_ptr( + new sdktrace::TracerProvider(std::move(processor1))); + provider->AddProcessor(std::move(processor2)); + // Set the global trace provider + opentelemetry::trace::Provider::SetTracerProvider(std::move(provider)); +} + +void dumpSpans(std::vector> &spans) +{ + char span_buf[opentelemetry::trace::SpanId::kSize * 2]; + char trace_buf[opentelemetry::trace::TraceId::kSize * 2]; + char parent_span_buf[opentelemetry::trace::SpanId::kSize * 2]; + std::cout << "\nSpans from memory :" << std::endl; + + for (auto &span : spans) + { + std::cout << "\n\tSpan: " << std::endl; + std::cout << "\t\tName: " << span->GetName() << std::endl; + span->GetSpanId().ToLowerBase16(span_buf); + span->GetTraceId().ToLowerBase16(trace_buf); + span->GetParentSpanId().ToLowerBase16(parent_span_buf); + std::cout << "\t\tTraceId: " << std::string(trace_buf, sizeof(trace_buf)) << std::endl; + std::cout << "\t\tSpanId: " << std::string(span_buf, sizeof(span_buf)) << std::endl; + std::cout << "\t\tParentSpanId: " << std::string(parent_span_buf, sizeof(parent_span_buf)) + << std::endl; + + std::cout << "\t\tDescription: " << span->GetDescription() << std::endl; + std::cout << "\t\tSpan kind:" + << static_cast::type>( + span->GetSpanKind()) + << std::endl; + std::cout << "\t\tSpan Status: " + << static_cast::type>( + span->GetStatus()) + << std::endl; + } +} +} // namespace + +int main() +{ + // Removing this line will leave the default noop TracerProvider in place. + initTracer(); + + foo_library(); + auto memory_spans = memory_span_exporter->GetData()->GetSpans(); + dumpSpans(memory_spans); +} diff --git a/ext/test/w3c_tracecontext_test/main.cc b/ext/test/w3c_tracecontext_test/main.cc index d82ad401e4..d19eaa1ee8 100644 --- a/ext/test/w3c_tracecontext_test/main.cc +++ b/ext/test/w3c_tracecontext_test/main.cc @@ -42,7 +42,9 @@ void initTracer() new opentelemetry::exporter::trace::OStreamSpanExporter); auto processor = std::unique_ptr( new sdktrace::SimpleSpanProcessor(std::move(exporter))); - auto context = std::make_shared(std::move(processor)); + std::vector> processors; + processors.push_back(std::move(processor)); + auto context = std::make_shared(std::move(processors)); auto provider = nostd::shared_ptr( new sdktrace::TracerProvider(context)); // Set the global trace provider diff --git a/ext/test/zpages/tracez_data_aggregator_test.cc b/ext/test/zpages/tracez_data_aggregator_test.cc index f6f5ba6d9f..f84058d298 100644 --- a/ext/test/zpages/tracez_data_aggregator_test.cc +++ b/ext/test/zpages/tracez_data_aggregator_test.cc @@ -36,8 +36,11 @@ class TracezDataAggregatorTest : public ::testing::Test { std::shared_ptr shared_data(new TracezSharedData()); auto resource = opentelemetry::sdk::resource::Resource::Create({}); - auto context = std::make_shared( - std::unique_ptr(new TracezSpanProcessor(shared_data)), resource); + std::unique_ptr processor(new TracezSpanProcessor(shared_data)); + std::vector> processors; + processors.push_back(std::move(processor)); + + auto context = std::make_shared(std::move(processors), resource); tracer = std::shared_ptr(new Tracer(context)); tracez_data_aggregator = std::unique_ptr( new TracezDataAggregator(shared_data, milliseconds(10))); diff --git a/ext/test/zpages/tracez_processor_test.cc b/ext/test/zpages/tracez_processor_test.cc index 7ea4063d60..208b67c96a 100644 --- a/ext/test/zpages/tracez_processor_test.cc +++ b/ext/test/zpages/tracez_processor_test.cc @@ -175,13 +175,16 @@ class TracezProcessor : public ::testing::Test protected: void SetUp() override { - shared_data = std::shared_ptr(new TracezSharedData()); - processor = std::shared_ptr(new TracezSpanProcessor(shared_data)); + shared_data = std::shared_ptr(new TracezSharedData()); + processor = std::shared_ptr(new TracezSpanProcessor(shared_data)); + std::unique_ptr processor2(new TracezSpanProcessor(shared_data)); + std::vector> processors; + processors.push_back(std::move(processor2)); auto resource = opentelemetry::sdk::resource::Resource::Create({}); + // Note: we make a *different* processor for the tracercontext. THis is because // all the tests use shared data, and we want to make sure this works correctly. - auto context = std::make_shared( - std::unique_ptr(new TracezSpanProcessor(shared_data)), resource); + auto context = std::make_shared(std::move(processors), resource); tracer = std::shared_ptr(new Tracer(context)); auto spans = shared_data->GetSpanSnapshot(); diff --git a/sdk/include/opentelemetry/sdk/trace/multi_recordable.h b/sdk/include/opentelemetry/sdk/trace/multi_recordable.h new file mode 100644 index 0000000000..925685a14f --- /dev/null +++ b/sdk/include/opentelemetry/sdk/trace/multi_recordable.h @@ -0,0 +1,150 @@ +#pragma once + +#include "opentelemetry/common/timestamp.h" +#include "opentelemetry/sdk/trace/processor.h" +#include "opentelemetry/sdk/trace/recordable.h" +#include "opentelemetry/version.h" + +#include + +OPENTELEMETRY_BEGIN_NAMESPACE +namespace sdk +{ +namespace trace +{ +namespace +{ +std::size_t MakeKey(const SpanProcessor &processor) +{ + return reinterpret_cast(&processor); +} + +} // namespace + +class MultiRecordable : public Recordable +{ +public: + void AddRecordable(const SpanProcessor &processor, + std::unique_ptr recordable) noexcept + { + recordables_[MakeKey(processor)] = std::move(recordable); + } + + const std::unique_ptr &GetRecordable(const SpanProcessor &processor) const noexcept + { + // TODO - return nullptr ref on failed lookup? + auto i = recordables_.find(MakeKey(processor)); + if (i != recordables_.end()) + { + return i->second; + } + static std::unique_ptr empty(nullptr); + return empty; + } + + std::unique_ptr ReleaseRecordable(const SpanProcessor &processor) noexcept + { + auto i = recordables_.find(MakeKey(processor)); + if (i != recordables_.end()) + { + std::unique_ptr result(i->second.release()); + recordables_.erase(MakeKey(processor)); + return result; + } + return std::unique_ptr(nullptr); + } + + void SetIdentity(const opentelemetry::trace::SpanContext &span_context, + opentelemetry::trace::SpanId parent_span_id) noexcept override + { + for (auto &recordable : recordables_) + { + recordable.second->SetIdentity(span_context, parent_span_id); + } + } + + virtual void SetAttribute(nostd::string_view key, + const opentelemetry::common::AttributeValue &value) noexcept override + { + for (auto &recordable : recordables_) + { + recordable.second->SetAttribute(key, value); + } + } + + virtual void AddEvent(nostd::string_view name, + opentelemetry::common::SystemTimestamp timestamp, + const opentelemetry::common::KeyValueIterable &attributes) noexcept override + { + + for (auto &recordable : recordables_) + { + recordable.second->AddEvent(name, timestamp, attributes); + } + } + + virtual void AddLink(const opentelemetry::trace::SpanContext &span_context, + const opentelemetry::common::KeyValueIterable &attributes) noexcept override + { + for (auto &recordable : recordables_) + { + recordable.second->AddLink(span_context, attributes); + } + } + + virtual void SetStatus(opentelemetry::trace::StatusCode code, + nostd::string_view description) noexcept override + { + for (auto &recordable : recordables_) + { + recordable.second->SetStatus(code, description); + } + } + + virtual void SetName(nostd::string_view name) noexcept override + { + for (auto &recordable : recordables_) + { + recordable.second->SetName(name); + } + } + + virtual void SetSpanKind(opentelemetry::trace::SpanKind span_kind) noexcept override + { + for (auto &recordable : recordables_) + { + recordable.second->SetSpanKind(span_kind); + } + } + + virtual void SetStartTime(opentelemetry::common::SystemTimestamp start_time) noexcept override + { + for (auto &recordable : recordables_) + { + recordable.second->SetStartTime(start_time); + } + } + + virtual void SetDuration(std::chrono::nanoseconds duration) noexcept override + { + for (auto &recordable : recordables_) + { + recordable.second->SetDuration(duration); + } + } + + void SetInstrumentationLibrary( + const InstrumentationLibrary &instrumentation_library) noexcept override + { + for (auto &recordable : recordables_) + { + recordable.second->SetInstrumentationLibrary(instrumentation_library); + } + } + +private: + std::map> recordables_; +}; +} // namespace trace +} // namespace sdk +OPENTELEMETRY_END_NAMESPACE \ No newline at end of file diff --git a/sdk/include/opentelemetry/sdk/trace/multi_span_processor.h b/sdk/include/opentelemetry/sdk/trace/multi_span_processor.h new file mode 100644 index 0000000000..2e32657bfc --- /dev/null +++ b/sdk/include/opentelemetry/sdk/trace/multi_span_processor.h @@ -0,0 +1,184 @@ +#pragma once + +#include +#include + +#include "opentelemetry/sdk/trace/multi_recordable.h" +#include "opentelemetry/sdk/trace/processor.h" + +#include + +OPENTELEMETRY_BEGIN_NAMESPACE +namespace sdk +{ +namespace trace +{ + +/** Instantiation options. */ +struct MultiSpanProcessorOptions +{}; + +/** + * Span processor allow hooks for span start and end method invocations. + * + * Built-in span processors are responsible for batching and conversion of + * spans to exportable representation and passing batches to exporters. + */ +class MultiSpanProcessor : public SpanProcessor +{ +public: + MultiSpanProcessor(std::vector> &&processors) + : head_(nullptr), tail_(nullptr), count_(0) + { + for (auto &processor : processors) + { + AddProcessor(std::move(processor)); + } + } + + void AddProcessor(std::unique_ptr &&processor) + { + // Add preocessor to end of the list. + if (processor) + { + ProcessorNode *pNode = new ProcessorNode(std::move(processor), tail_); + if (count_ > 0) + { + tail_->next_ = pNode; + tail_ = pNode; + } + else + { + head_ = tail_ = pNode; + } + count_++; + } + } + + std::unique_ptr MakeRecordable() noexcept override + { + auto recordable = std::unique_ptr(new MultiRecordable); + auto multi_recordable = static_cast(recordable.get()); + ProcessorNode *node = head_; + while (node != nullptr) + { + auto processor = node->value_.get(); + multi_recordable->AddRecordable(*processor, processor->MakeRecordable()); + node = node->next_; + } + return recordable; + } + + virtual void OnStart(Recordable &span, + const opentelemetry::trace::SpanContext &parent_context) noexcept override + { + auto multi_recordable = static_cast(&span); + ProcessorNode *node = head_; + while (node != nullptr) + { + auto processor = node->value_.get(); + auto &recordable = multi_recordable->GetRecordable(*processor); + if (recordable != nullptr) + { + processor->OnStart(*recordable, parent_context); + } + node = node->next_; + } + } + + virtual void OnEnd(std::unique_ptr &&span) noexcept override + { + auto multi_recordable = static_cast(span.release()); + ProcessorNode *node = head_; + while (node != nullptr) + { + auto processor = node->value_.get(); + auto recordable = multi_recordable->ReleaseRecordable(*processor); + if (recordable != nullptr) + { + processor->OnEnd(std::move(recordable)); + } + node = node->next_; + } + delete multi_recordable; + } + + bool ForceFlush( + std::chrono::microseconds timeout = (std::chrono::microseconds::max)()) noexcept override + { + bool result = true; + ProcessorNode *node = head_; + while (node != nullptr) + { + auto processor = node->value_.get(); + result |= processor->ForceFlush(timeout); + node = node->next_; + } + return result; + } + + bool Shutdown( + std::chrono::microseconds timeout = (std::chrono::microseconds::max)()) noexcept override + { + bool result = true; + ProcessorNode *node = head_; + while (node != nullptr) + { + auto processor = node->value_.get(); + result |= processor->Shutdown(timeout); + node = node->next_; + } + return result; + } + + ~MultiSpanProcessor() + { + Shutdown(); + Cleanup(); + } + +private: + struct ProcessorNode + { + std::unique_ptr value_; + ProcessorNode *next_, *prev_; + ProcessorNode(std::unique_ptr &&value, + ProcessorNode *prev = nullptr, + ProcessorNode *next = nullptr) + : value_(std::move(value)), prev_(prev), next_(next) + {} + }; + + void Cleanup() + { + if (count_) + { + ProcessorNode *node = tail_; + while (node != nullptr) + { + if (node->next_ != nullptr) + { + delete node->next_; + node->next_ = nullptr; + } + if (node->prev_ != nullptr) + { + node = node->prev_; + } + else + { + delete node; + node = nullptr; + } + } + head_ = tail_ = nullptr; + count_ = 0; + } + } + + ProcessorNode *head_, *tail_; + size_t count_; +}; +} // namespace trace +} // namespace sdk +OPENTELEMETRY_END_NAMESPACE \ No newline at end of file diff --git a/sdk/include/opentelemetry/sdk/trace/tracer.h b/sdk/include/opentelemetry/sdk/trace/tracer.h index 1e025ec89b..57a05d8ff5 100644 --- a/sdk/include/opentelemetry/sdk/trace/tracer.h +++ b/sdk/include/opentelemetry/sdk/trace/tracer.h @@ -38,8 +38,8 @@ class Tracer final : public trace_api::Tracer, public std::enable_shared_from_th void CloseWithMicroseconds(uint64_t timeout) noexcept override; - /** Returns the currently active span processor. */ - SpanProcessor &GetActiveProcessor() const noexcept { return context_->GetActiveProcessor(); } + /** Returns the configured span processor. */ + SpanProcessor &GetProcessor() noexcept { return context_->GetProcessor(); } /** Returns the configured Id generator */ IdGenerator &GetIdGenerator() const noexcept { return context_->GetIdGenerator(); } diff --git a/sdk/include/opentelemetry/sdk/trace/tracer_context.h b/sdk/include/opentelemetry/sdk/trace/tracer_context.h index 3d6c4faa44..0a953b8aa4 100644 --- a/sdk/include/opentelemetry/sdk/trace/tracer_context.h +++ b/sdk/include/opentelemetry/sdk/trace/tracer_context.h @@ -21,28 +21,31 @@ namespace trace * - A thread-safe class that allows updating/altering processor/exporter pipelines * and sampling config. * - The owner/destroyer of Processors/Exporters. These will remain active until - * this class is destroyed. I.e. Sampling, Exporting, flushing etc. are all ok if this - * object is alive, and they will work together. If this object is destroyed, then - * no shared references to Processor, Exporter, Recordable etc. should exist, and all + * this class is destroyed. I.e. Sampling, Exporting, flushing, Custom Iterator etc. are all ok + * if this object is alive, and they will work together. If this object is destroyed, then no shared + * references to Processor, Exporter, Recordable, Custom Iterator etc. should exist, and all * associated pipelines will have been flushed. */ class TracerContext { public: explicit TracerContext( - std::unique_ptr processor, + std::vector> &&processor, opentelemetry::sdk::resource::Resource resource = opentelemetry::sdk::resource::Resource::Create({}), std::unique_ptr sampler = std::unique_ptr(new AlwaysOnSampler), std::unique_ptr id_generator = std::unique_ptr(new RandomIdGenerator())) noexcept; + /** - * Attaches a span processor to this tracer context. - * + * Attaches a span processor to list of configured processors to this tracer context. + * Processor once attached can't be removed. * @param processor The new span processor for this tracer. This must not be * a nullptr. Ownership is given to the `TracerContext`. + * + * Note: This method is not thread safe. */ - void RegisterPipeline(std::unique_ptr processor) noexcept; + void AddProcessor(std::unique_ptr processor) noexcept; /** * Obtain the sampler associated with this tracer. @@ -51,12 +54,12 @@ class TracerContext Sampler &GetSampler() const noexcept; /** - * Obtain the (conceptual) active processor. + * Obtain the configured (composite) processor. * * Note: When more than one processor is active, this will * return an "aggregate" processor */ - SpanProcessor &GetActiveProcessor() const noexcept; + SpanProcessor &GetProcessor() const noexcept; /** * Obtain the resource associated with this tracer context. @@ -83,7 +86,7 @@ class TracerContext private: // This is an atomic pointer so we can adapt the processor pipeline dynamically. - opentelemetry::sdk::common::AtomicUniquePtr processor_; + std::unique_ptr processor_; opentelemetry::sdk::resource::Resource resource_; std::unique_ptr sampler_; std::unique_ptr id_generator_; diff --git a/sdk/include/opentelemetry/sdk/trace/tracer_provider.h b/sdk/include/opentelemetry/sdk/trace/tracer_provider.h index 6c1f8595fa..feb191f1e9 100644 --- a/sdk/include/opentelemetry/sdk/trace/tracer_provider.h +++ b/sdk/include/opentelemetry/sdk/trace/tracer_provider.h @@ -26,8 +26,11 @@ class TracerProvider final : public opentelemetry::trace::TracerProvider * Initialize a new tracer provider with a specified sampler * @param processor The span processor for this tracer provider. This must * not be a nullptr. + * @param resource The resources for this tracer provider. * @param sampler The sampler for this tracer provider. This must * not be a nullptr. + * @param id_generator The custom id generator for this tracer provider. This must + * not be a nullptr */ explicit TracerProvider( std::unique_ptr processor, @@ -38,6 +41,15 @@ class TracerProvider final : public opentelemetry::trace::TracerProvider std::unique_ptr( new RandomIdGenerator())) noexcept; + explicit TracerProvider( + std::vector> &&processors, + opentelemetry::sdk::resource::Resource resource = + opentelemetry::sdk::resource::Resource::Create({}), + std::unique_ptr sampler = std::unique_ptr(new AlwaysOnSampler), + std::unique_ptr id_generator = + std::unique_ptr( + new RandomIdGenerator())) noexcept; + /** * Initialize a new tracer provider with a specified context * @param context The shared tracer configuration/pipeline for this provider. @@ -49,13 +61,14 @@ class TracerProvider final : public opentelemetry::trace::TracerProvider nostd::string_view library_version = "") noexcept override; /** - * Attaches a span processor pipeline to this tracer provider. + * Attaches a span processor to list of configured processors for this tracer provider. * @param processor The new span processor for this tracer provider. This * must not be a nullptr. * * Note: This process may not receive any in-flight spans, but will get newly created spans. + * Note: This method is not thread safe, and should ideally be called from main thread. */ - void RegisterPipeline(std::unique_ptr processor) noexcept; + void AddProcessor(std::unique_ptr processor) noexcept; /** * Obtain the resource associated with this tracer provider. @@ -63,6 +76,12 @@ class TracerProvider final : public opentelemetry::trace::TracerProvider */ const opentelemetry::sdk::resource::Resource &GetResource() const noexcept; + /** + * Obtain the span processor associated with this tracer provider. + * @return The span processor for this tracer provider. + */ + std::shared_ptr GetProcessor() const noexcept; + /** * Shutdown the span processor associated with this tracer provider. */ diff --git a/sdk/src/trace/span.cc b/sdk/src/trace/span.cc index deb9487066..4ad453158f 100644 --- a/sdk/src/trace/span.cc +++ b/sdk/src/trace/span.cc @@ -50,7 +50,7 @@ Span::Span(std::shared_ptr &&tracer, const nostd::shared_ptr trace_state, const bool sampled) noexcept : tracer_{std::move(tracer)}, - recordable_{tracer_->GetActiveProcessor().MakeRecordable()}, + recordable_{tracer_->GetProcessor().MakeRecordable()}, start_steady_time{options.start_steady_time}, has_ended_{false} { @@ -104,7 +104,7 @@ Span::Span(std::shared_ptr &&tracer, start_steady_time = NowOr(options.start_steady_time); // recordable_->SetResource(tracer_->GetResoource()); TODO // recordable_->SetResource(tracer_->GetInstrumentationLibrary()); TODO - tracer_->GetActiveProcessor().OnStart(*recordable_, parent_span_context); + tracer_->GetProcessor().OnStart(*recordable_, parent_span_context); } Span::~Span() @@ -191,7 +191,7 @@ void Span::End(const trace_api::EndSpanOptions &options) noexcept recordable_->SetDuration(std::chrono::steady_clock::time_point(end_steady_time) - std::chrono::steady_clock::time_point(start_steady_time)); - tracer_->GetActiveProcessor().OnEnd(std::move(recordable_)); + tracer_->GetProcessor().OnEnd(std::move(recordable_)); recordable_.reset(); } diff --git a/sdk/src/trace/tracer_context.cc b/sdk/src/trace/tracer_context.cc index c7bf1934e1..96cb4a5fc9 100644 --- a/sdk/src/trace/tracer_context.cc +++ b/sdk/src/trace/tracer_context.cc @@ -1,5 +1,5 @@ #include "opentelemetry/sdk/trace/tracer_context.h" -#include "opentelemetry/sdk/trace/processor.h" +#include "opentelemetry/sdk/trace/multi_span_processor.h" OPENTELEMETRY_BEGIN_NAMESPACE namespace sdk @@ -7,11 +7,11 @@ namespace sdk namespace trace { -TracerContext::TracerContext(std::unique_ptr processor, +TracerContext::TracerContext(std::vector> &&processors, opentelemetry::sdk::resource::Resource resource, std::unique_ptr sampler, std::unique_ptr id_generator) noexcept - : processor_(std::move(processor)), + : processor_(std::unique_ptr(new MultiSpanProcessor(std::move(processors)))), resource_(resource), sampler_(std::move(sampler)), id_generator_(std::move(id_generator)) @@ -32,16 +32,14 @@ opentelemetry::sdk::trace::IdGenerator &TracerContext::GetIdGenerator() const no return *id_generator_; } -void TracerContext::RegisterPipeline(std::unique_ptr processor) noexcept +void TracerContext::AddProcessor(std::unique_ptr processor) noexcept { - // TODO(jsuereth): Implement - // 1. If existing processor is an "AggregateProcessor" append the new processor to it. - // 2. If the existing processor is NOT an "AggregateProcessor", create a new Aggregate of this and - // the other, - // then replace our atomic ptr with the new aggregate. + + auto multi_processor = static_cast(processor_.get()); + multi_processor->AddProcessor(std::move(processor)); } -SpanProcessor &TracerContext::GetActiveProcessor() const noexcept +SpanProcessor &TracerContext::GetProcessor() const noexcept { return *processor_; } diff --git a/sdk/src/trace/tracer_provider.cc b/sdk/src/trace/tracer_provider.cc index 8c15601722..25b0eb6747 100644 --- a/sdk/src/trace/tracer_provider.cc +++ b/sdk/src/trace/tracer_provider.cc @@ -13,11 +13,21 @@ TracerProvider::TracerProvider(std::unique_ptr processor, opentelemetry::sdk::resource::Resource resource, std::unique_ptr sampler, std::unique_ptr id_generator) noexcept - : TracerProvider(std::make_shared(std::move(processor), - resource, - std::move(sampler), - std::move(id_generator))) -{} +{ + std::vector> processors; + processors.push_back(std::move(processor)); + context_ = std::make_shared(std::move(processors), resource, std::move(sampler), + std::move(id_generator)); +} + +TracerProvider::TracerProvider(std::vector> &&processors, + opentelemetry::sdk::resource::Resource resource, + std::unique_ptr sampler, + std::unique_ptr id_generator) noexcept +{ + context_ = std::make_shared(std::move(processors), resource, std::move(sampler), + std::move(id_generator)); +} nostd::shared_ptr TracerProvider::GetTracer( nostd::string_view library_name, @@ -48,9 +58,9 @@ nostd::shared_ptr TracerProvider::GetTracer( return nostd::shared_ptr{tracers_.back()}; } -void TracerProvider::RegisterPipeline(std::unique_ptr processor) noexcept +void TracerProvider::AddProcessor(std::unique_ptr processor) noexcept { - return context_->RegisterPipeline(std::move(processor)); + context_->AddProcessor(std::move(processor)); } const opentelemetry::sdk::resource::Resource &TracerProvider::GetResource() const noexcept diff --git a/sdk/test/trace/sampler_benchmark.cc b/sdk/test/trace/sampler_benchmark.cc index 90a07adc77..6cfe67d1f7 100644 --- a/sdk/test/trace/sampler_benchmark.cc +++ b/sdk/test/trace/sampler_benchmark.cc @@ -118,10 +118,12 @@ BENCHMARK(BM_TraceIdRatioBasedSamplerShouldSample); void BenchmarkSpanCreation(std::shared_ptr sampler, benchmark::State &state) { std::unique_ptr exporter(new InMemorySpanExporter()); - auto processor = std::unique_ptr(new SimpleSpanProcessor(std::move(exporter))); - auto context = std::make_shared(std::move(processor)); - auto resource = opentelemetry::sdk::resource::Resource::Create({}); - auto tracer = std::shared_ptr(new Tracer(context)); + std::unique_ptr processor(new SimpleSpanProcessor(std::move(exporter))); + std::vector> processors; + processors.push_back(std::move(processor)); + auto context = std::make_shared(std::move(processors)); + auto resource = opentelemetry::sdk::resource::Resource::Create({}); + auto tracer = std::shared_ptr(new Tracer(context)); while (state.KeepRunning()) { diff --git a/sdk/test/trace/tracer_provider_test.cc b/sdk/test/trace/tracer_provider_test.cc index 64a8234880..4943346269 100644 --- a/sdk/test/trace/tracer_provider_test.cc +++ b/sdk/test/trace/tracer_provider_test.cc @@ -10,11 +10,14 @@ using namespace opentelemetry::sdk::trace; using namespace opentelemetry::sdk::resource; +#include + TEST(TracerProvider, GetTracer) { std::unique_ptr processor(new SimpleSpanProcessor(nullptr)); - - TracerProvider tp1(std::make_shared(std::move(processor), Resource::Create({}))); + std::vector> processors; + processors.push_back(std::move(processor)); + TracerProvider tp1(std::make_shared(std::move(processors), Resource::Create({}))); auto t1 = tp1.GetTracer("test"); auto t2 = tp1.GetTracer("test"); auto t3 = tp1.GetTracer("different", "1.0.0"); @@ -33,10 +36,13 @@ TEST(TracerProvider, GetTracer) auto sdkTracer1 = dynamic_cast(t1.get()); ASSERT_NE(nullptr, sdkTracer1); ASSERT_EQ("AlwaysOnSampler", sdkTracer1->GetSampler().GetDescription()); - TracerProvider tp2(std::make_shared( - std::unique_ptr(new SimpleSpanProcessor(nullptr)), Resource::Create({}), - std::unique_ptr(new AlwaysOffSampler()), - std::unique_ptr(new RandomIdGenerator))); + std::unique_ptr processor2(new SimpleSpanProcessor(nullptr)); + std::vector> processors2; + processors2.push_back(std::move(processor2)); + TracerProvider tp2( + std::make_shared(std::move(processors2), Resource::Create({}), + std::unique_ptr(new AlwaysOffSampler()), + std::unique_ptr(new RandomIdGenerator))); auto sdkTracer2 = dynamic_cast(tp2.GetTracer("test").get()); ASSERT_EQ("AlwaysOffSampler", sdkTracer2->GetSampler().GetDescription()); @@ -53,9 +59,11 @@ TEST(TracerProvider, GetTracer) TEST(TracerProvider, Shutdown) { - std::unique_ptr processor1(new SimpleSpanProcessor(nullptr)); + std::unique_ptr processor(new SimpleSpanProcessor(nullptr)); + std::vector> processors; + processors.push_back(std::move(processor)); - TracerProvider tp1(std::make_shared(std::move(processor1))); + TracerProvider tp1(std::make_shared(std::move(processors))); EXPECT_TRUE(tp1.Shutdown()); } diff --git a/sdk/test/trace/tracer_test.cc b/sdk/test/trace/tracer_test.cc index 3d1344a724..e5775b812b 100644 --- a/sdk/test/trace/tracer_test.cc +++ b/sdk/test/trace/tracer_test.cc @@ -68,7 +68,9 @@ namespace std::shared_ptr initTracer(std::unique_ptr &&exporter) { auto processor = std::unique_ptr(new SimpleSpanProcessor(std::move(exporter))); - auto context = std::make_shared(std::move(processor)); + std::vector> processors; + processors.push_back(std::move(processor)); + auto context = std::make_shared(std::move(processors)); return std::shared_ptr(new Tracer(context)); } @@ -79,12 +81,15 @@ std::shared_ptr initTracer( IdGenerator *id_generator = new RandomIdGenerator) { auto processor = std::unique_ptr(new SimpleSpanProcessor(std::move(exporter))); - auto resource = Resource::Create({}); - auto context = std::make_shared(std::move(processor), resource, + std::vector> processors; + processors.push_back(std::move(processor)); + auto resource = Resource::Create({}); + auto context = std::make_shared(std::move(processors), resource, std::unique_ptr(sampler), std::unique_ptr(id_generator)); return std::shared_ptr(new Tracer(context)); } + } // namespace TEST(Tracer, ToInMemorySpanExporter)