Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Otlp Retry Part2 - Introduce transmission handler #5367

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ protected BaseOtlpHttpExportClient(OtlpExporterOptions options, HttpClient httpC
/// <inheritdoc/>
public ExportClientResponse SendExportRequest(TRequest request, CancellationToken cancellationToken = default)
{
// `HttpClient.Timeout.TotalMilliseconds` would be populated with the correct timeout value for both the exporter configuration cases:
// 1. User provides their own HttpClient. This case is straightforward as the user wants to use their `HttpClient` and thereby the same client's timeout value.
// 2. If the user configures timeout via the exporter options, then the timeout set for the `HttpClient` initialized by the exporter will be set to user provided value.
DateTime deadline = DateTime.UtcNow.AddMilliseconds(this.HttpClient.Timeout.TotalMilliseconds);
try
{
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

#if NET6_0_OR_GREATER
using System.Diagnostics.CodeAnalysis;
#endif
using System.Diagnostics.Tracing;
using OpenTelemetry.Internal;

Expand Down Expand Up @@ -33,6 +30,15 @@ public void ExportMethodException(Exception ex, bool isRetry = false)
}
}

[NonEvent]
public void TrySubmitRequestException(Exception ex)
{
if (Log.IsEnabled(EventLevel.Error, EventKeywords.All))
{
this.TrySubmitRequestException(ex.ToInvariantString());
}
}

[Event(2, Message = "Exporter failed send data to collector to {0} endpoint. Data will not be sent. Exception: {1}", Level = EventLevel.Error)]
public void FailedToReachCollector(string rawCollectorUri, string ex)
{
Expand All @@ -45,9 +51,6 @@ public void CouldNotTranslateActivity(string className, string methodName)
this.WriteEvent(3, className, methodName);
}

#if NET6_0_OR_GREATER
[UnconditionalSuppressMessage("ReflectionAnalysis", "IL2026:RequiresUnreferencedCode", Justification = "Parameters to this method are primitive and are trimmer safe.")]
#endif
[Event(4, Message = "Unknown error in export method. Message: '{0}'. IsRetry: {1}", Level = EventLevel.Error)]
public void ExportMethodException(string ex, bool isRetry)
{
Expand Down Expand Up @@ -83,4 +86,10 @@ public void InvalidEnvironmentVariable(string key, string value)
{
this.WriteEvent(11, key, value);
}

[Event(12, Message = "Unknown error in TrySubmitRequest method. Message: '{0}'", Level = EventLevel.Error)]
public void TrySubmitRequestException(string ex)
{
this.WriteEvent(12, ex);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

#nullable enable

using System.Diagnostics;
using OpenTelemetry.Exporter.OpenTelemetryProtocol.Implementation.ExportClient;
using OpenTelemetry.Internal;

namespace OpenTelemetry.Exporter.OpenTelemetryProtocol.Implementation.Transmission;

internal class OtlpExporterTransmissionHandler<TRequest>
{
public OtlpExporterTransmissionHandler(IExportClient<TRequest> exportClient)
{
Guard.ThrowIfNull(exportClient);

this.ExportClient = exportClient;
}

protected IExportClient<TRequest> ExportClient { get; }

/// <summary>
/// Attempts to send an export request to the server.
/// </summary>
/// <param name="request">The request to send to the server.</param>
/// <returns> <see langword="true" /> if the request is sent successfully; otherwise, <see
/// langword="false" />.
/// </returns>
public bool TrySubmitRequest(TRequest request)
{
try
{
var response = this.ExportClient.SendExportRequest(request);
if (response.Success)
{
return true;
}

return this.OnSubmitRequestFailure(request, response);
}
catch (Exception ex)
{
OpenTelemetryProtocolExporterEventSource.Log.TrySubmitRequestException(ex);
return false;
}
}

/// <summary>
/// Attempts to shutdown the transmission handler, blocks the current thread
/// until shutdown completed or timed out.
/// </summary>
/// <param name="timeoutMilliseconds">
/// The number (non-negative) of milliseconds to wait, or
/// <c>Timeout.Infinite</c> to wait indefinitely.
/// </param>
/// <returns>
/// Returns <see langword="true" /> if shutdown succeeded; otherwise, <see
/// langword="false" />.
/// </returns>
public bool Shutdown(int timeoutMilliseconds)
{
vishweshbankwar marked this conversation as resolved.
Show resolved Hide resolved
Guard.ThrowIfInvalidTimeout(timeoutMilliseconds);

var sw = timeoutMilliseconds == Timeout.Infinite ? null : Stopwatch.StartNew();

this.OnShutdown(timeoutMilliseconds);

if (sw != null)
{
var timeout = timeoutMilliseconds - sw.ElapsedMilliseconds;

return this.ExportClient.Shutdown((int)Math.Max(timeout, 0));
}

return this.ExportClient.Shutdown(timeoutMilliseconds);
}

/// <summary>
/// Fired when the transmission handler is shutdown.
/// </summary>
/// <param name="timeoutMilliseconds">
/// The number (non-negative) of milliseconds to wait, or
/// <c>Timeout.Infinite</c> to wait indefinitely.
/// </param>
protected virtual void OnShutdown(int timeoutMilliseconds)
{
}

/// <summary>
/// Fired when a request could not be submitted.
/// </summary>
/// <param name="request">The request that was attempted to send to the server.</param>
/// <param name="response"><see cref="ExportClientResponse" />.</param>
/// <returns><see langword="true" /> If the request is resubmitted and succeeds; otherwise, <see
/// langword="false" />.</returns>
protected virtual bool OnSubmitRequestFailure(TRequest request, ExportClientResponse response)
{
return false;
}

/// <summary>
/// Fired when resending a request to the server.
/// </summary>
/// <param name="request">The request to be resent to the server.</param>
/// <param name="response"><see cref="ExportClientResponse" />.</param>
/// <returns><see langword="true" /> If the retry succeeds; otherwise, <see
/// langword="false" />.</returns>
protected bool TryRetryRequest(TRequest request, out ExportClientResponse response)
{
response = this.ExportClient.SendExportRequest(request);
if (!response.Success)
{
OpenTelemetryProtocolExporterEventSource.Log.ExportMethodException(response.Exception, isRetry: true);
return false;
}

return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#if NETSTANDARD2_1 || NET6_0_OR_GREATER
using Grpc.Net.Client;
#endif
using OpenTelemetry.Exporter.OpenTelemetryProtocol.Implementation.Transmission;
using LogOtlpCollector = OpenTelemetry.Proto.Collector.Logs.V1;
using MetricsOtlpCollector = OpenTelemetry.Proto.Collector.Metrics.V1;
using TraceOtlpCollector = OpenTelemetry.Proto.Collector.Trace.V1;
Expand Down Expand Up @@ -87,6 +88,15 @@ public static THeaders GetHeaders<THeaders>(this OtlpExporterOptions options, Ac
return headers;
}

public static OtlpExporterTransmissionHandler<TraceOtlpCollector.ExportTraceServiceRequest> GetTraceExportTransmissionHandler(this OtlpExporterOptions options)
=> new(GetTraceExportClient(options));

public static OtlpExporterTransmissionHandler<MetricsOtlpCollector.ExportMetricsServiceRequest> GetMetricsExportTransmissionHandler(this OtlpExporterOptions options)
=> new(GetMetricsExportClient(options));

public static OtlpExporterTransmissionHandler<LogOtlpCollector.ExportLogsServiceRequest> GetLogsExportTransmissionHandler(this OtlpExporterOptions options)
=> new(GetLogExportClient(options));

public static IExportClient<TraceOtlpCollector.ExportTraceServiceRequest> GetTraceExportClient(this OtlpExporterOptions options) =>
options.Protocol switch
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

using System.Diagnostics;
using OpenTelemetry.Exporter.OpenTelemetryProtocol.Implementation;
using OpenTelemetry.Exporter.OpenTelemetryProtocol.Implementation.ExportClient;
using OpenTelemetry.Exporter.OpenTelemetryProtocol.Implementation.Transmission;
using OpenTelemetry.Internal;
using OpenTelemetry.Logs;
using OtlpCollector = OpenTelemetry.Proto.Collector.Logs.V1;
Expand All @@ -19,7 +19,7 @@ namespace OpenTelemetry.Exporter;
/// </summary>
public sealed class OtlpLogExporter : BaseExporter<LogRecord>
{
private readonly IExportClient<OtlpCollector.ExportLogsServiceRequest> exportClient;
private readonly OtlpExporterTransmissionHandler<OtlpCollector.ExportLogsServiceRequest> transmissionHandler;
private readonly OtlpLogRecordTransformer otlpLogRecordTransformer;

private OtlpResource.Resource? processResource;
Expand All @@ -29,7 +29,7 @@ public sealed class OtlpLogExporter : BaseExporter<LogRecord>
/// </summary>
/// <param name="options">Configuration options for the exporter.</param>
public OtlpLogExporter(OtlpExporterOptions options)
: this(options, sdkLimitOptions: new(), experimentalOptions: new(), exportClient: null)
: this(options, sdkLimitOptions: new(), experimentalOptions: new(), transmissionHandler: null)
{
}

Expand All @@ -39,12 +39,12 @@ public OtlpLogExporter(OtlpExporterOptions options)
/// <param name="exporterOptions">Configuration options for the exporter.</param>
/// <param name="sdkLimitOptions"><see cref="SdkLimitOptions"/>.</param>
/// <param name="experimentalOptions"><see cref="ExperimentalOptions"/>.</param>
/// <param name="exportClient">Client used for sending export request.</param>
/// <param name="transmissionHandler"><see cref="OtlpExporterTransmissionHandler{T}"/>.</param>
internal OtlpLogExporter(
OtlpExporterOptions exporterOptions,
SdkLimitOptions sdkLimitOptions,
ExperimentalOptions experimentalOptions,
IExportClient<OtlpCollector.ExportLogsServiceRequest>? exportClient = null)
OtlpExporterTransmissionHandler<OtlpCollector.ExportLogsServiceRequest>? transmissionHandler = null)
{
Debug.Assert(exporterOptions != null, "exporterOptions was null");
Debug.Assert(sdkLimitOptions != null, "sdkLimitOptions was null");
Expand All @@ -62,14 +62,7 @@ internal OtlpLogExporter(
OpenTelemetryProtocolExporterEventSource.Log.InvalidEnvironmentVariable(key, value);
};

if (exportClient != null)
{
this.exportClient = exportClient;
}
else
{
this.exportClient = exporterOptions!.GetLogExportClient();
}
this.transmissionHandler = transmissionHandler ?? exporterOptions.GetLogsExportTransmissionHandler();

this.otlpLogRecordTransformer = new OtlpLogRecordTransformer(sdkLimitOptions!, experimentalOptions!);
}
Expand All @@ -89,7 +82,7 @@ public override ExportResult Export(in Batch<LogRecord> logRecordBatch)
{
request = this.otlpLogRecordTransformer.BuildExportRequest(this.ProcessResource, logRecordBatch);

if (!this.exportClient.SendExportRequest(request).Success)
if (!this.transmissionHandler.TrySubmitRequest(request))
{
return ExportResult.Failure;
}
Expand All @@ -113,6 +106,6 @@ public override ExportResult Export(in Batch<LogRecord> logRecordBatch)
/// <inheritdoc />
protected override bool OnShutdown(int timeoutMilliseconds)
{
return this.exportClient?.Shutdown(timeoutMilliseconds) ?? true;
return this.transmissionHandler?.Shutdown(timeoutMilliseconds) ?? true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
// SPDX-License-Identifier: Apache-2.0

using OpenTelemetry.Exporter.OpenTelemetryProtocol.Implementation;
using OpenTelemetry.Exporter.OpenTelemetryProtocol.Implementation.ExportClient;
using OpenTelemetry.Exporter.OpenTelemetryProtocol.Implementation.Transmission;
using OpenTelemetry.Internal;
using OpenTelemetry.Metrics;
using OtlpCollector = OpenTelemetry.Proto.Collector.Metrics.V1;
Expand All @@ -16,7 +16,7 @@ namespace OpenTelemetry.Exporter;
/// </summary>
public class OtlpMetricExporter : BaseExporter<Metric>
{
private readonly IExportClient<OtlpCollector.ExportMetricsServiceRequest> exportClient;
private readonly OtlpExporterTransmissionHandler<OtlpCollector.ExportMetricsServiceRequest> transmissionHandler;

private OtlpResource.Resource processResource;

Expand All @@ -25,16 +25,18 @@ public class OtlpMetricExporter : BaseExporter<Metric>
/// </summary>
/// <param name="options">Configuration options for the exporter.</param>
public OtlpMetricExporter(OtlpExporterOptions options)
: this(options, null)
: this(options, transmissionHandler: null)
{
}

/// <summary>
/// Initializes a new instance of the <see cref="OtlpMetricExporter"/> class.
/// </summary>
/// <param name="options">Configuration options for the export.</param>
/// <param name="exportClient">Client used for sending export request.</param>
internal OtlpMetricExporter(OtlpExporterOptions options, IExportClient<OtlpCollector.ExportMetricsServiceRequest> exportClient = null)
/// <param name="transmissionHandler"><see cref="OtlpExporterTransmissionHandler{T}"/>.</param>
internal OtlpMetricExporter(
OtlpExporterOptions options,
OtlpExporterTransmissionHandler<OtlpCollector.ExportMetricsServiceRequest> transmissionHandler = null)
{
// Each of the Otlp exporters: Traces, Metrics, and Logs set the same value for `OtlpKeyValueTransformer.LogUnsupportedAttributeType`
// and `ConfigurationExtensions.LogInvalidEnvironmentVariable` so it should be fine even if these exporters are used together.
Expand All @@ -48,14 +50,7 @@ internal OtlpMetricExporter(OtlpExporterOptions options, IExportClient<OtlpColle
OpenTelemetryProtocolExporterEventSource.Log.InvalidEnvironmentVariable(key, value);
};

if (exportClient != null)
{
this.exportClient = exportClient;
}
else
{
this.exportClient = options.GetMetricsExportClient();
}
this.transmissionHandler = transmissionHandler ?? options.GetMetricsExportTransmissionHandler();
}

internal OtlpResource.Resource ProcessResource => this.processResource ??= this.ParentProvider.GetResource().ToOtlpResource();
Expand All @@ -72,7 +67,7 @@ public override ExportResult Export(in Batch<Metric> metrics)
{
request.AddMetrics(this.ProcessResource, metrics);

if (!this.exportClient.SendExportRequest(request).Success)
if (!this.transmissionHandler.TrySubmitRequest(request))
{
return ExportResult.Failure;
}
Expand All @@ -93,6 +88,6 @@ public override ExportResult Export(in Batch<Metric> metrics)
/// <inheritdoc />
protected override bool OnShutdown(int timeoutMilliseconds)
{
return this.exportClient?.Shutdown(timeoutMilliseconds) ?? true;
return this.transmissionHandler.Shutdown(timeoutMilliseconds);
}
}
Loading