Skip to content

Commit

Permalink
Merge from main
Browse files Browse the repository at this point in the history
  • Loading branch information
stonkie committed Dec 10, 2024
2 parents aeb1639 + 145e7ad commit 04a3eee
Show file tree
Hide file tree
Showing 14 changed files with 94 additions and 106 deletions.
12 changes: 12 additions & 0 deletions .github/dependabot.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,15 @@ updates:
interval: "daily"
labels:
- "infra"
- package-ecosystem: "dotnet-sdk"
directory: "/"
schedule:
interval: "weekly"
day: "wednesday"
labels:
- "infra"
ignore:
- dependency-name: "*"
update-types:
- "version-update:semver-major"
- "version-update:semver-minor"
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,8 @@ internal static IOpenTelemetryBuilder UseOtlpExporter(
/// <para><see cref="IConfiguration"/> to bind onto <see cref="OtlpExporterBuilderOptions"/>.</para>
/// <para>Notes:
/// <list type="bullet">
/// <item docLink="true">See [TODO:Add doc link] for details on the configuration
/// schema.</item>
/// <item docLink="true"><see href="https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/protocol/exporter.md"/>
/// for details on the configuration schema.</item>
/// <item>The <see cref="OtlpExporterBuilderOptions"/> instance will be
/// named "otlp" by default when calling this method.</item>
/// </list>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@ internal sealed class ExperimentalOptions

public const string OtlpDiskRetryDirectoryPathEnvVar = "OTEL_DOTNET_EXPERIMENTAL_OTLP_DISK_RETRY_DIRECTORY_PATH";

public const string OtlpUseCustomSerializer = "OTEL_DOTNET_EXPERIMENTAL_USE_CUSTOM_PROTOBUF_SERIALIZER";

public const string EmitNoRecordedValueNeededDataPointsEnvVar = "OTEL_DOTNET_EXPERIMENTAL_OTLP_METRICS_EMIT_NO_RECORDED_VALUE";

public ExperimentalOptions()
Expand All @@ -33,11 +31,6 @@ public ExperimentalOptions(IConfiguration configuration)
this.EmitLogEventAttributes = emitLogEventAttributes;
}

if (configuration.TryGetBoolValue(OpenTelemetryProtocolExporterEventSource.Log, OtlpUseCustomSerializer, out var useCustomSerializer))
{
this.UseCustomProtobufSerializer = useCustomSerializer;
}

if (configuration.TryGetBoolValue(OpenTelemetryProtocolExporterEventSource.Log, EmitNoRecordedValueNeededDataPointsEnvVar, out var emitNoRecordedValueNeededDataPoints))
{
this.EmitNoRecordedValueNeededDataPoints = emitNoRecordedValueNeededDataPoints;
Expand Down Expand Up @@ -93,11 +86,6 @@ public ExperimentalOptions(IConfiguration configuration)
/// </summary>
public string? DiskRetryDirectoryPath { get; }

/// <summary>
/// Gets a value indicating whether custom serializer should be used for OTLP export.
/// </summary>
public bool UseCustomProtobufSerializer { get; }

/// <summary>
/// Gets a value indicating whether the NoRecordedValue measurement should be sent when metrics are removed,
/// e.g when disposing a Meter.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@ internal static class GrpcStatusDeserializer
TimeSpan.FromTicks(retryInfo.Value.RetryDelay.Value.Nanos / 100); // Convert nanos to ticks
}
}
catch
catch (Exception ex)
{
// TODO: Log exception to event source.
OpenTelemetryProtocolExporterEventSource.Log.GrpcRetryDelayParsingFailed(grpcStatusDetailsHeader, ex);
return null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,15 @@ public void RequestTimedOut(Uri endpoint, Exception ex)
}
}

[NonEvent]
public void GrpcRetryDelayParsingFailed(string? grpcStatusDetailsHeader, Exception ex)
{
if (Log.IsEnabled(EventLevel.Warning, EventKeywords.All))
{
this.GrpcRetryDelayParsingFailed(grpcStatusDetailsHeader ?? "null", ex.ToInvariantString());
}
}

[Event(2, Message = "Exporter failed send data to collector to {0} endpoint. Data will not be sent. Exception: {1}", Level = EventLevel.Error)]
public void FailedToReachCollector(string rawCollectorUri, string ex)
{
Expand Down Expand Up @@ -205,6 +214,12 @@ public void ExportFailure(string endpoint, string message)
this.WriteEvent(23, endpoint, message);
}

[Event(24, Message = "Failed to parse gRPC retry delay from header grpcStatusDetailsHeader: '{0}'. Exception: {1}", Level = EventLevel.Warning)]
public void GrpcRetryDelayParsingFailed(string grpcStatusDetailsHeader, string exception)
{
this.WriteEvent(24, grpcStatusDetailsHeader, exception);
}

void IConfigurationExtensionsLogger.LogInvalidConfigurationValue(string key, string value)
{
this.InvalidConfigurationValue(key, value);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,32 @@ internal static int WriteLogsData(byte[] buffer, int writePosition, SdkLimitOpti
logRecords.Add(logRecord);
}

writePosition = WriteResourceLogs(buffer, writePosition, sdkLimitOptions, experimentalOptions, resource, ScopeLogsList);
writePosition = TryWriteResourceLogs(buffer, writePosition, sdkLimitOptions, experimentalOptions, resource, ScopeLogsList);
ProtobufSerializer.WriteReservedLength(buffer, logsDataLengthPosition, writePosition - (logsDataLengthPosition + ReserveSizeForLength));
ReturnLogRecordListToPool();

return writePosition;
}

internal static int TryWriteResourceLogs(byte[] buffer, int writePosition, SdkLimitOptions sdkLimitOptions, ExperimentalOptions experimentalOptions, Resources.Resource? resource, Dictionary<string, List<LogRecord>> scopeLogs)
{
try
{
writePosition = WriteResourceLogs(buffer, writePosition, sdkLimitOptions, experimentalOptions, resource, scopeLogs);
}
catch (IndexOutOfRangeException)
{
if (!ProtobufSerializer.IncreaseBufferSize(ref buffer, OtlpSignalType.Logs))
{
throw;
}

return TryWriteResourceLogs(buffer, writePosition, sdkLimitOptions, experimentalOptions, resource, scopeLogs);
}

return writePosition;
}

internal static void ReturnLogRecordListToPool()
{
if (ScopeLogsList.Count != 0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,32 @@ internal static int WriteMetricsData(
metrics.Add(metric);
}

writePosition = WriteResourceMetrics(buffer, writePosition, resource, ScopeMetricsList, emitNoRecordedValueNeededDataPoints);
writePosition = TryWriteResourceMetrics(buffer, writePosition, resource, ScopeMetricsList, emitNoRecordedValueNeededDataPoints);
ProtobufSerializer.WriteReservedLength(buffer, mericsDataLengthPosition, writePosition - (mericsDataLengthPosition + ReserveSizeForLength));
ReturnMetricListToPool();

return writePosition;
}

internal static int TryWriteResourceMetrics(byte[] buffer, int writePosition, Resources.Resource? resource, Dictionary<string, List<Metric>> scopeMetrics, bool emitNoRecordedValueNeededDataPoints)
{
try
{
writePosition = WriteResourceMetrics(buffer, writePosition, resource, scopeMetrics, emitNoRecordedValueNeededDataPoints);
}
catch (IndexOutOfRangeException)
{
if (!ProtobufSerializer.IncreaseBufferSize(ref buffer, OtlpSignalType.Metrics))
{
throw;
}

return TryWriteResourceMetrics(buffer, writePosition, resource, scopeMetrics, emitNoRecordedValueNeededDataPoints);
}

return writePosition;
}

private static void ReturnMetricListToPool()
{
if (ScopeMetricsList.Count != 0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,6 @@ internal static int TryWriteResourceSpans(byte[] buffer, int writePosition, SdkL
// and avoids stack overflow.
return TryWriteResourceSpans(buffer, writePosition, sdkLimitOptions, resource);
}
catch
{
throw;
}

return writePosition;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,9 @@ public static THeaders GetHeaders<THeaders>(this OtlpExporterOptions options, Ac
return headers;
}

public static OtlpExporterTransmissionHandler GetProtobufExportTransmissionHandler(this OtlpExporterOptions options, ExperimentalOptions experimentalOptions, OtlpSignalType otlpSignalType)
public static OtlpExporterTransmissionHandler GetExportTransmissionHandler(this OtlpExporterOptions options, ExperimentalOptions experimentalOptions, OtlpSignalType otlpSignalType)
{
var exportClient = GetProtobufExportClient(options, otlpSignalType);
var exportClient = GetExportClient(options, otlpSignalType);

// `HttpClient.Timeout.TotalMilliseconds` would be populated with the correct timeout value for both the exporter configuration cases:
// 1. User provides their own HttpClient. This case is straightforward as the user wants to use their `HttpClient` and thereby the same client's timeout value.
Expand Down Expand Up @@ -88,7 +88,7 @@ public static OtlpExporterTransmissionHandler GetProtobufExportTransmissionHandl
}
}

public static IExportClient GetProtobufExportClient(this OtlpExporterOptions options, OtlpSignalType otlpSignalType)
public static IExportClient GetExportClient(this OtlpExporterOptions options, OtlpSignalType otlpSignalType)
{
var httpClient = options.HttpClientFactory?.Invoke() ?? throw new InvalidOperationException("OtlpExporterOptions was missing HttpClientFactory or it returned null.");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ namespace OpenTelemetry.Exporter;
/// </summary>
public sealed class OtlpLogExporter : BaseExporter<LogRecord>
{
private const int GrpcStartWritePosition = 5;
private readonly SdkLimitOptions sdkLimitOptions;
private readonly ExperimentalOptions experimentalOptions;
private readonly OtlpExporterTransmissionHandler transmissionHandler;
Expand Down Expand Up @@ -57,8 +58,8 @@ internal OtlpLogExporter(

this.experimentalOptions = experimentalOptions!;
this.sdkLimitOptions = sdkLimitOptions!;
this.startWritePosition = exporterOptions!.Protocol == OtlpExportProtocol.Grpc ? 5 : 0;
this.transmissionHandler = transmissionHandler ?? exporterOptions!.GetProtobufExportTransmissionHandler(experimentalOptions!, OtlpSignalType.Logs);
this.startWritePosition = exporterOptions!.Protocol == OtlpExportProtocol.Grpc ? GrpcStartWritePosition : 0;
this.transmissionHandler = transmissionHandler ?? exporterOptions!.GetExportTransmissionHandler(experimentalOptions!, OtlpSignalType.Logs);
}

internal Resource Resource => this.resource ??= this.ParentProvider.GetResource();
Expand All @@ -73,14 +74,14 @@ public override ExportResult Export(in Batch<LogRecord> logRecordBatch)
{
int writePosition = ProtobufOtlpLogSerializer.WriteLogsData(this.buffer, this.startWritePosition, this.sdkLimitOptions, this.experimentalOptions, this.Resource, logRecordBatch);

if (this.startWritePosition == 5)
if (this.startWritePosition == GrpcStartWritePosition)
{
// Grpc payload consists of 3 parts
// byte 0 - Specifying if the payload is compressed.
// 1-4 byte - Specifies the length of payload in big endian format.
// 5 and above - Protobuf serialized data.
Span<byte> data = new Span<byte>(this.buffer, 1, 4);
var dataLength = writePosition - 5;
var dataLength = writePosition - GrpcStartWritePosition;
BinaryPrimitives.WriteUInt32BigEndian(data, (uint)dataLength);
}

Expand All @@ -89,13 +90,6 @@ public override ExportResult Export(in Batch<LogRecord> logRecordBatch)
return ExportResult.Failure;
}
}
catch (IndexOutOfRangeException)
{
if (!this.IncreaseBufferSize())
{
throw;
}
}
catch (Exception ex)
{
OpenTelemetryProtocolExporterEventSource.Log.ExportMethodException(ex);
Expand All @@ -107,21 +101,4 @@ public override ExportResult Export(in Batch<LogRecord> logRecordBatch)

/// <inheritdoc />
protected override bool OnShutdown(int timeoutMilliseconds) => this.transmissionHandler?.Shutdown(timeoutMilliseconds) ?? true;

// TODO: Consider moving this to a shared utility class.
private bool IncreaseBufferSize()
{
var newBufferSize = this.buffer.Length * 2;

if (newBufferSize > 100 * 1024 * 1024)
{
return false;
}

var newBuffer = new byte[newBufferSize];
this.buffer.CopyTo(newBuffer, 0);
this.buffer = newBuffer;

return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ namespace OpenTelemetry.Exporter;
/// </summary>
public class OtlpMetricExporter : BaseExporter<Metric>
{
private const int GrpcStartWritePosition = 5;
private readonly OtlpExporterTransmissionHandler transmissionHandler;
private readonly int startWritePosition;

Expand Down Expand Up @@ -51,8 +52,8 @@ internal OtlpMetricExporter(
Debug.Assert(exporterOptions != null, "exporterOptions was null");
Debug.Assert(experimentalOptions != null, "experimentalOptions was null");

this.startWritePosition = exporterOptions!.Protocol == OtlpExportProtocol.Grpc ? 5 : 0;
this.transmissionHandler = transmissionHandler ?? exporterOptions!.GetProtobufExportTransmissionHandler(experimentalOptions!, OtlpSignalType.Metrics);
this.startWritePosition = exporterOptions!.Protocol == OtlpExportProtocol.Grpc ? GrpcStartWritePosition : 0;
this.transmissionHandler = transmissionHandler ?? exporterOptions!.GetExportTransmissionHandler(experimentalOptions!, OtlpSignalType.Metrics);
this.emitNoRecordedValueNeededDataPoints = experimentalOptions!.EmitNoRecordedValueNeededDataPoints;
}

Expand All @@ -68,14 +69,14 @@ public override ExportResult Export(in Batch<Metric> metrics)
{
int writePosition = ProtobufOtlpMetricSerializer.WriteMetricsData(this.buffer, this.startWritePosition, this.Resource, metrics, this.emitNoRecordedValueNeededDataPoints);

if (this.startWritePosition == 5)
if (this.startWritePosition == GrpcStartWritePosition)
{
// Grpc payload consists of 3 parts
// byte 0 - Specifying if the payload is compressed.
// 1-4 byte - Specifies the length of payload in big endian format.
// 5 and above - Protobuf serialized data.
Span<byte> data = new Span<byte>(this.buffer, 1, 4);
var dataLength = writePosition - 5;
var dataLength = writePosition - GrpcStartWritePosition;
BinaryPrimitives.WriteUInt32BigEndian(data, (uint)dataLength);
}

Expand All @@ -84,13 +85,6 @@ public override ExportResult Export(in Batch<Metric> metrics)
return ExportResult.Failure;
}
}
catch (IndexOutOfRangeException)
{
if (!this.IncreaseBufferSize())
{
throw;
}
}
catch (Exception ex)
{
OpenTelemetryProtocolExporterEventSource.Log.ExportMethodException(ex);
Expand All @@ -102,21 +96,4 @@ public override ExportResult Export(in Batch<Metric> metrics)

/// <inheritdoc />
protected override bool OnShutdown(int timeoutMilliseconds) => this.transmissionHandler.Shutdown(timeoutMilliseconds);

// TODO: Consider moving this to a shared utility class.
private bool IncreaseBufferSize()
{
var newBufferSize = this.buffer.Length * 2;

if (newBufferSize > 100 * 1024 * 1024)
{
return false;
}

var newBuffer = new byte[newBufferSize];
this.buffer.CopyTo(newBuffer, 0);
this.buffer = newBuffer;

return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ namespace OpenTelemetry.Exporter;
/// </summary>
public class OtlpTraceExporter : BaseExporter<Activity>
{
private const int GrpcStartWritePosition = 5;
private readonly SdkLimitOptions sdkLimitOptions;
private readonly OtlpExporterTransmissionHandler transmissionHandler;
private readonly int startWritePosition;
Expand Down Expand Up @@ -53,8 +54,8 @@ internal OtlpTraceExporter(
Debug.Assert(sdkLimitOptions != null, "sdkLimitOptions was null");

this.sdkLimitOptions = sdkLimitOptions!;
this.startWritePosition = exporterOptions!.Protocol == OtlpExportProtocol.Grpc ? 5 : 0;
this.transmissionHandler = transmissionHandler ?? exporterOptions!.GetProtobufExportTransmissionHandler(experimentalOptions, OtlpSignalType.Traces);
this.startWritePosition = exporterOptions!.Protocol == OtlpExportProtocol.Grpc ? GrpcStartWritePosition : 0;
this.transmissionHandler = transmissionHandler ?? exporterOptions!.GetExportTransmissionHandler(experimentalOptions, OtlpSignalType.Traces);
}

internal Resource Resource => this.resource ??= this.ParentProvider.GetResource();
Expand All @@ -69,14 +70,14 @@ public override ExportResult Export(in Batch<Activity> activityBatch)
{
int writePosition = ProtobufOtlpTraceSerializer.WriteTraceData(this.buffer, this.startWritePosition, this.sdkLimitOptions, this.Resource, activityBatch);

if (this.startWritePosition == 5)
if (this.startWritePosition == GrpcStartWritePosition)
{
// Grpc payload consists of 3 parts
// byte 0 - Specifying if the payload is compressed.
// 1-4 byte - Specifies the length of payload in big endian format.
// 5 and above - Protobuf serialized data.
Span<byte> data = new Span<byte>(this.buffer, 1, 4);
var dataLength = writePosition - 5;
var dataLength = writePosition - GrpcStartWritePosition;
BinaryPrimitives.WriteUInt32BigEndian(data, (uint)dataLength);
}

Expand Down
Loading

0 comments on commit 04a3eee

Please sign in to comment.