Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[POC] Better control of threads executed by opentelemetry-cpp #3175

Draft
wants to merge 7 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include "opentelemetry/nostd/string_view.h"
#include "opentelemetry/nostd/variant.h"
#include "opentelemetry/sdk/common/exporter_utils.h"
#include "opentelemetry/sdk/common/thread_instrumentation.h"
#include "opentelemetry/version.h"

// forward declare google::protobuf::Message
Expand Down Expand Up @@ -83,28 +84,33 @@ struct OtlpHttpClientOptions
// User agent
std::string user_agent;

inline OtlpHttpClientOptions(nostd::string_view input_url,
bool input_ssl_insecure_skip_verify,
nostd::string_view input_ssl_ca_cert_path,
nostd::string_view input_ssl_ca_cert_string,
nostd::string_view input_ssl_client_key_path,
nostd::string_view input_ssl_client_key_string,
nostd::string_view input_ssl_client_cert_path,
nostd::string_view input_ssl_client_cert_string,
nostd::string_view input_ssl_min_tls,
nostd::string_view input_ssl_max_tls,
nostd::string_view input_ssl_cipher,
nostd::string_view input_ssl_cipher_suite,
HttpRequestContentType input_content_type,
JsonBytesMappingKind input_json_bytes_mapping,
nostd::string_view input_compression,
bool input_use_json_name,
bool input_console_debug,
std::chrono::system_clock::duration input_timeout,
const OtlpHeaders &input_http_headers,
std::size_t input_concurrent_sessions = 64,
std::size_t input_max_requests_per_connection = 8,
nostd::string_view input_user_agent = GetOtlpDefaultUserAgent())
std::shared_ptr<sdk::common::ThreadInstrumentation> thread_instrumentation =
std::shared_ptr<sdk::common::ThreadInstrumentation>(nullptr);

inline OtlpHttpClientOptions(
nostd::string_view input_url,
bool input_ssl_insecure_skip_verify,
nostd::string_view input_ssl_ca_cert_path,
nostd::string_view input_ssl_ca_cert_string,
nostd::string_view input_ssl_client_key_path,
nostd::string_view input_ssl_client_key_string,
nostd::string_view input_ssl_client_cert_path,
nostd::string_view input_ssl_client_cert_string,
nostd::string_view input_ssl_min_tls,
nostd::string_view input_ssl_max_tls,
nostd::string_view input_ssl_cipher,
nostd::string_view input_ssl_cipher_suite,
HttpRequestContentType input_content_type,
JsonBytesMappingKind input_json_bytes_mapping,
nostd::string_view input_compression,
bool input_use_json_name,
bool input_console_debug,
std::chrono::system_clock::duration input_timeout,
const OtlpHeaders &input_http_headers,
const std::shared_ptr<sdk::common::ThreadInstrumentation> &input_thread_instrumentation,
std::size_t input_concurrent_sessions = 64,
std::size_t input_max_requests_per_connection = 8,
nostd::string_view input_user_agent = GetOtlpDefaultUserAgent())
: url(input_url),
ssl_options(input_url,
input_ssl_insecure_skip_verify,
Expand All @@ -127,7 +133,8 @@ struct OtlpHttpClientOptions
http_headers(input_http_headers),
max_concurrent_requests(input_concurrent_sessions),
max_requests_per_connection(input_max_requests_per_connection),
user_agent(input_user_agent)
user_agent(input_user_agent),
thread_instrumentation(input_thread_instrumentation)
{}
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@
#pragma once

#include <chrono>
#include <memory>
#include <string>

#include "opentelemetry/exporters/otlp/otlp_environment.h"
#include "opentelemetry/exporters/otlp/otlp_http.h"
#include "opentelemetry/sdk/common/thread_instrumentation.h"
#include "opentelemetry/version.h"

OPENTELEMETRY_BEGIN_NAMESPACE
Expand Down Expand Up @@ -101,6 +103,9 @@ struct OPENTELEMETRY_EXPORT OtlpHttpExporterOptions

/** Compression type. */
std::string compression;

std::shared_ptr<sdk::common::ThreadInstrumentation> thread_instrumentation =
std::shared_ptr<sdk::common::ThreadInstrumentation>(nullptr);
};

} // namespace otlp
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@
#pragma once

#include <chrono>
#include <memory>
#include <string>

#include "opentelemetry/exporters/otlp/otlp_environment.h"
#include "opentelemetry/exporters/otlp/otlp_http.h"
#include "opentelemetry/sdk/common/thread_instrumentation.h"
#include "opentelemetry/version.h"

OPENTELEMETRY_BEGIN_NAMESPACE
Expand Down Expand Up @@ -101,6 +103,9 @@ struct OPENTELEMETRY_EXPORT OtlpHttpLogRecordExporterOptions

/** Compression type. */
std::string compression;

std::shared_ptr<sdk::common::ThreadInstrumentation> thread_instrumentation =
std::shared_ptr<sdk::common::ThreadInstrumentation>(nullptr);
};

} // namespace otlp
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@
#pragma once

#include <chrono>
#include <memory>
#include <string>

#include "opentelemetry/exporters/otlp/otlp_environment.h"
#include "opentelemetry/exporters/otlp/otlp_http.h"
#include "opentelemetry/exporters/otlp/otlp_preferred_temporality.h"
#include "opentelemetry/sdk/common/thread_instrumentation.h"
#include "opentelemetry/version.h"

OPENTELEMETRY_BEGIN_NAMESPACE
Expand Down Expand Up @@ -104,6 +106,9 @@ struct OPENTELEMETRY_EXPORT OtlpHttpMetricExporterOptions

/** Compression type. */
std::string compression;

std::shared_ptr<sdk::common::ThreadInstrumentation> thread_instrumentation =
std::shared_ptr<sdk::common::ThreadInstrumentation>(nullptr);
};

} // namespace otlp
Expand Down
2 changes: 1 addition & 1 deletion exporters/otlp/src/otlp_http_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -665,7 +665,7 @@ void ConvertListFieldToJson(nlohmann::json &value,
OtlpHttpClient::OtlpHttpClient(OtlpHttpClientOptions &&options)
: is_shutdown_(false),
options_(options),
http_client_(http_client::HttpClientFactory::Create()),
http_client_(http_client::HttpClientFactory::Create(options.thread_instrumentation)),
start_session_counter_(0),
finished_session_counter_(0)
{
Expand Down
4 changes: 3 additions & 1 deletion exporters/otlp/src/otlp_http_exporter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@ OtlpHttpExporter::OtlpHttpExporter(const OtlpHttpExporterOptions &options)
options.use_json_name,
options.console_debug,
options.timeout,
options.http_headers
options.http_headers,
options.thread_instrumentation
#ifdef ENABLE_ASYNC_EXPORT
,
options.max_concurrent_requests,
Expand All @@ -77,6 +78,7 @@ OtlpHttpExporter::OtlpHttpExporter(std::unique_ptr<OtlpHttpClient> http_client)
options.console_debug = http_client_->GetOptions().console_debug;
options.timeout = http_client_->GetOptions().timeout;
options.http_headers = http_client_->GetOptions().http_headers;
options.thread_instrumentation = http_client_->GetOptions().thread_instrumentation;
#ifdef ENABLE_ASYNC_EXPORT
options.max_concurrent_requests = http_client_->GetOptions().max_concurrent_requests;
options.max_requests_per_connection = http_client_->GetOptions().max_requests_per_connection;
Expand Down
18 changes: 10 additions & 8 deletions exporters/otlp/src/otlp_http_log_record_exporter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@ OtlpHttpLogRecordExporter::OtlpHttpLogRecordExporter(
options.use_json_name,
options.console_debug,
options.timeout,
options.http_headers
options.http_headers,
options.thread_instrumentation
#ifdef ENABLE_ASYNC_EXPORT
,
options.max_concurrent_requests,
Expand All @@ -74,13 +75,14 @@ OtlpHttpLogRecordExporter::OtlpHttpLogRecordExporter(std::unique_ptr<OtlpHttpCli
{
OtlpHttpLogRecordExporterOptions &options =
const_cast<OtlpHttpLogRecordExporterOptions &>(options_);
options.url = http_client_->GetOptions().url;
options.content_type = http_client_->GetOptions().content_type;
options.json_bytes_mapping = http_client_->GetOptions().json_bytes_mapping;
options.use_json_name = http_client_->GetOptions().use_json_name;
options.console_debug = http_client_->GetOptions().console_debug;
options.timeout = http_client_->GetOptions().timeout;
options.http_headers = http_client_->GetOptions().http_headers;
options.url = http_client_->GetOptions().url;
options.content_type = http_client_->GetOptions().content_type;
options.json_bytes_mapping = http_client_->GetOptions().json_bytes_mapping;
options.use_json_name = http_client_->GetOptions().use_json_name;
options.console_debug = http_client_->GetOptions().console_debug;
options.timeout = http_client_->GetOptions().timeout;
options.http_headers = http_client_->GetOptions().http_headers;
options.thread_instrumentation = http_client_->GetOptions().thread_instrumentation;
#ifdef ENABLE_ASYNC_EXPORT
options.max_concurrent_requests = http_client_->GetOptions().max_concurrent_requests;
options.max_requests_per_connection = http_client_->GetOptions().max_requests_per_connection;
Expand Down
4 changes: 3 additions & 1 deletion exporters/otlp/src/otlp_http_metric_exporter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ OtlpHttpMetricExporter::OtlpHttpMetricExporter(const OtlpHttpMetricExporterOptio
options.use_json_name,
options.console_debug,
options.timeout,
options.http_headers
options.http_headers,
options.thread_instrumentation
#ifdef ENABLE_ASYNC_EXPORT
,
options.max_concurrent_requests,
Expand All @@ -84,6 +85,7 @@ OtlpHttpMetricExporter::OtlpHttpMetricExporter(std::unique_ptr<OtlpHttpClient> h
options.console_debug = http_client_->GetOptions().console_debug;
options.timeout = http_client_->GetOptions().timeout;
options.http_headers = http_client_->GetOptions().http_headers;
options.thread_instrumentation = http_client_->GetOptions().thread_instrumentation;
#ifdef ENABLE_ASYNC_EXPORT
options.max_concurrent_requests = http_client_->GetOptions().max_concurrent_requests;
options.max_requests_per_connection = http_client_->GetOptions().max_requests_per_connection;
Expand Down
3 changes: 2 additions & 1 deletion exporters/otlp/test/otlp_http_exporter_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ static nostd::span<T, N> MakeSpan(T (&array)[N])
OtlpHttpClientOptions MakeOtlpHttpClientOptions(HttpRequestContentType content_type,
bool async_mode)
{
std::shared_ptr<opentelemetry::sdk::common::ThreadInstrumentation> not_instrumented;
OtlpHttpExporterOptions options;
options.content_type = content_type;
options.console_debug = true;
Expand All @@ -70,7 +71,7 @@ OtlpHttpClientOptions MakeOtlpHttpClientOptions(HttpRequestContentType content_t
"", /* ssl_cipher */
"", /* ssl_cipher_suite */
options.content_type, options.json_bytes_mapping, options.compression, options.use_json_name,
options.console_debug, options.timeout, options.http_headers);
options.console_debug, options.timeout, options.http_headers, not_instrumented);
if (!async_mode)
{
otlp_http_client_options.max_concurrent_requests = 0;
Expand Down
3 changes: 2 additions & 1 deletion exporters/otlp/test/otlp_http_log_record_exporter_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ static nostd::span<T, N> MakeSpan(T (&array)[N])
OtlpHttpClientOptions MakeOtlpHttpClientOptions(HttpRequestContentType content_type,
bool async_mode)
{
std::shared_ptr<opentelemetry::sdk::common::ThreadInstrumentation> not_instrumented;
OtlpHttpLogRecordExporterOptions options;
options.content_type = content_type;
options.console_debug = true;
Expand All @@ -69,7 +70,7 @@ OtlpHttpClientOptions MakeOtlpHttpClientOptions(HttpRequestContentType content_t
"", /* ssl_cipher */
"", /* ssl_cipher_suite */
options.content_type, options.json_bytes_mapping, options.compression, options.use_json_name,
options.console_debug, options.timeout, options.http_headers);
options.console_debug, options.timeout, options.http_headers, not_instrumented);
if (!async_mode)
{
otlp_http_client_options.max_concurrent_requests = 0;
Expand Down
4 changes: 3 additions & 1 deletion exporters/otlp/test/otlp_http_metric_exporter_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include "opentelemetry/ext/http/client/http_client.h"
#include "opentelemetry/nostd/string_view.h"
#include "opentelemetry/sdk/common/exporter_utils.h"
#include "opentelemetry/sdk/common/thread_instrumentation.h"
#include "opentelemetry/sdk/instrumentationscope/instrumentation_scope.h"
#include "opentelemetry/sdk/metrics/data/metric_data.h"
#include "opentelemetry/sdk/metrics/data/point_data.h"
Expand Down Expand Up @@ -76,6 +77,7 @@ static IntegerType JsonToInteger(nlohmann::json value)
OtlpHttpClientOptions MakeOtlpHttpClientOptions(HttpRequestContentType content_type,
bool async_mode)
{
std::shared_ptr<opentelemetry::sdk::common::ThreadInstrumentation> not_instrumented;
OtlpHttpMetricExporterOptions options;
options.content_type = content_type;
options.console_debug = true;
Expand All @@ -91,7 +93,7 @@ OtlpHttpClientOptions MakeOtlpHttpClientOptions(HttpRequestContentType content_t
"", /* ssl_cipher */
"", /* ssl_cipher_suite */
options.content_type, options.json_bytes_mapping, options.compression, options.use_json_name,
options.console_debug, options.timeout, options.http_headers);
options.console_debug, options.timeout, options.http_headers, not_instrumented);
if (!async_mode)
{
otlp_http_client_options.max_concurrent_requests = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include "opentelemetry/nostd/function_ref.h"
#include "opentelemetry/nostd/shared_ptr.h"
#include "opentelemetry/nostd/string_view.h"
#include "opentelemetry/sdk/common/thread_instrumentation.h"
#include "opentelemetry/version.h"

OPENTELEMETRY_BEGIN_NAMESPACE
Expand Down Expand Up @@ -304,6 +305,7 @@ class HttpClient : public opentelemetry::ext::http::client::HttpClient
public:
// The call (curl_global_init) is not thread safe. Ensure this is called only once.
HttpClient();
HttpClient(const std::shared_ptr<sdk::common::ThreadInstrumentation> &thread_instrumentation);
~HttpClient() override;

std::shared_ptr<opentelemetry::ext::http::client::Session> CreateSession(
Expand Down Expand Up @@ -366,6 +368,7 @@ class HttpClient : public opentelemetry::ext::http::client::HttpClient

std::mutex background_thread_m_;
std::unique_ptr<std::thread> background_thread_;
std::shared_ptr<sdk::common::ThreadInstrumentation> background_thread_instrumentation_;
std::chrono::milliseconds scheduled_delay_milliseconds_;

nostd::shared_ptr<HttpCurlGlobalInitializer> curl_global_initializer_;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@
// SPDX-License-Identifier: Apache-2.0

#pragma once

#include "opentelemetry/ext/http/client/http_client.h"
#include "opentelemetry/sdk/common/thread_instrumentation.h"
#include "opentelemetry/version.h"

OPENTELEMETRY_BEGIN_NAMESPACE
namespace ext
Expand All @@ -17,6 +20,8 @@ class HttpClientFactory
static std::shared_ptr<HttpClientSync> CreateSync();

static std::shared_ptr<HttpClient> Create();
static std::shared_ptr<HttpClient> Create(
const std::shared_ptr<sdk::common::ThreadInstrumentation> &thread_instrumentation);
};
} // namespace client
} // namespace http
Expand Down
22 changes: 22 additions & 0 deletions ext/src/http/client/curl/http_client_curl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include "opentelemetry/ext/http/common/url_parser.h"
#include "opentelemetry/nostd/shared_ptr.h"
#include "opentelemetry/nostd/string_view.h"
#include "opentelemetry/sdk/common/thread_instrumentation.h"
#include "opentelemetry/version.h"

#ifdef ENABLE_OTLP_COMPRESSION_PREVIEW
Expand Down Expand Up @@ -188,6 +189,17 @@ HttpClient::HttpClient()
: multi_handle_(curl_multi_init()),
next_session_id_{0},
max_sessions_per_connection_{8},
background_thread_instrumentation_(nullptr),
scheduled_delay_milliseconds_{std::chrono::milliseconds(256)},
curl_global_initializer_(HttpCurlGlobalInitializer::GetInstance())
{}

HttpClient::HttpClient(
const std::shared_ptr<sdk::common::ThreadInstrumentation> &thread_instrumentation)
: multi_handle_(curl_multi_init()),
next_session_id_{0},
max_sessions_per_connection_{8},
background_thread_instrumentation_(thread_instrumentation),
scheduled_delay_milliseconds_{std::chrono::milliseconds(256)},
curl_global_initializer_(HttpCurlGlobalInitializer::GetInstance())
{}
Expand Down Expand Up @@ -345,6 +357,11 @@ void HttpClient::MaybeSpawnBackgroundThread()

background_thread_.reset(new std::thread(
[](HttpClient *self) {
if (self->background_thread_instrumentation_ != nullptr)
{
self->background_thread_instrumentation_->OnStart();
}

int still_running = 1;
while (true)
{
Expand Down Expand Up @@ -445,6 +462,11 @@ void HttpClient::MaybeSpawnBackgroundThread()
{
if (self->background_thread_)
{
if (self->background_thread_instrumentation_ != nullptr)
{
self->background_thread_instrumentation_->OnEnd();
self->background_thread_instrumentation_.reset();
}
self->background_thread_->detach();
self->background_thread_.reset();
}
Expand Down
7 changes: 7 additions & 0 deletions ext/src/http/client/curl/http_client_factory_curl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include "opentelemetry/ext/http/client/curl/http_client_curl.h"
#include "opentelemetry/ext/http/client/http_client.h"
#include "opentelemetry/ext/http/client/http_client_factory.h"
#include "opentelemetry/sdk/common/thread_instrumentation.h"

namespace http_client = opentelemetry::ext::http::client;

Expand All @@ -14,6 +15,12 @@ std::shared_ptr<http_client::HttpClient> http_client::HttpClientFactory::Create(
return std::make_shared<http_client::curl::HttpClient>();
}

std::shared_ptr<http_client::HttpClient> http_client::HttpClientFactory::Create(
const std::shared_ptr<sdk::common::ThreadInstrumentation> &thread_instrumentation)
{
return std::make_shared<http_client::curl::HttpClient>(thread_instrumentation);
}

std::shared_ptr<http_client::HttpClientSync> http_client::HttpClientFactory::CreateSync()
{
return std::make_shared<http_client::curl::HttpClientSync>();
Expand Down
Loading
Loading