Skip to content

Commit

Permalink
ZipkinExporter performance improvements. (#596)
Browse files Browse the repository at this point in the history
* ZipkinExporter performance improvements.

* Bug fixes.

* Added a Zipkin integration test.

* Bug fixes.

* Fixed HttpClientCollectorOptions not filtering HttpWebRequest events.

* Removed LangVersion from project file, it wasn't needed.

* Fixed unit tests failing when IPv6 isn't available on host.

Co-authored-by: Sergey Kanzhelev <S.Kanzhelev@live.com>
  • Loading branch information
CodeBlanch and SergeyKanzhelev authored Apr 14, 2020
1 parent a1ce4eb commit b35311e
Show file tree
Hide file tree
Showing 18 changed files with 513 additions and 312 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@
// limitations under the License.
// </copyright>
using System;
using System.Net;
using System.Net.Http;
using OpenTelemetry.Collector.Dependencies.Implementation;
using OpenTelemetry.Context.Propagation;

namespace OpenTelemetry.Collector.Dependencies
Expand Down Expand Up @@ -63,12 +65,9 @@ private static bool DefaultFilter(string activityName, object arg1, object unuse
// TODO: there is some preliminary consensus that we should introduce 'terminal' spans or context.
// exporters should ensure they set it

if (activityName == "System.Net.Http.HttpRequestOut" &&
arg1 is HttpRequestMessage request &&
request.RequestUri != null &&
request.Method == HttpMethod.Post)
if (IsHttpOutgoingPostRequest(activityName, arg1, out Uri requestUri))
{
var originalString = request.RequestUri.OriginalString;
var originalString = requestUri.OriginalString;

// zipkin
if (originalString.Contains(":9411/api/v2/spans"))
Expand All @@ -89,5 +88,34 @@ arg1 is HttpRequestMessage request &&

return true;
}

private static bool IsHttpOutgoingPostRequest(string activityName, object arg1, out Uri requestUri)
{
if (activityName == "System.Net.Http.HttpRequestOut")
{
if (arg1 is HttpRequestMessage request &&
request.RequestUri != null &&
request.Method == HttpMethod.Post)
{
requestUri = request.RequestUri;
return true;
}
}
#if NET461
else if (activityName == HttpWebRequestDiagnosticSource.ActivityName)
{
if (arg1 is HttpWebRequest request &&
request.RequestUri != null &&
request.Method == "POST")
{
requestUri = request.RequestUri;
return true;
}
}
#endif

requestUri = null;
return false;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
using System;
using System.Collections.Generic;
using System.Diagnostics;
using OpenTelemetry.Internal;
using OpenTelemetry.Resources;
using OpenTelemetry.Trace;
using OpenTelemetry.Trace.Export;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@
// See the License for the specific language governing permissions and
// limitations under the License.
// </copyright>
using System;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using OpenTelemetry.Internal;
using Thrift.Protocol;
using Thrift.Protocol.Entities;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using OpenTelemetry.Internal;
using Thrift.Protocol;
using Thrift.Protocol.Entities;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,13 @@
<NoWarn>$(NoWarn),1591</NoWarn>
</PropertyGroup>

<ItemGroup>
<Compile Include="..\OpenTelemetry\Internal\EnumerationHelper.cs" Link="Implementation\EnumerationHelper.cs" />
<Compile Include="..\OpenTelemetry\Internal\PooledList.cs" Link="Implementation\PooledList.cs" />
</ItemGroup>

<ItemGroup>
<PackageReference Include="ApacheThrift" Version="0.13.0.1" IncludeAssets="all" ExcludeAssets="compile;runtime" GeneratePathProperty="true" />
<PackageReference Include="System.Reflection.Emit.Lightweight" Version="4.7.0" />
<PackageReference Include="System.Threading.Tasks.Extensions" Version="4.5.3" Condition="'$(TargetFramework)' != 'netstandard2.1'" />
</ItemGroup>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,18 @@
// </copyright>
namespace OpenTelemetry.Exporter.Zipkin.Implementation
{
internal class ZipkinAnnotation
internal readonly struct ZipkinAnnotation
{
public long Timestamp { get; set; }
public ZipkinAnnotation(
long timestamp,
string value)
{
this.Timestamp = timestamp;
this.Value = value;
}

public string Value { get; set; }
public long Timestamp { get; }

public string Value { get; }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using OpenTelemetry.Internal;
using OpenTelemetry.Resources;
using OpenTelemetry.Trace;
using OpenTelemetry.Trace.Export;
Expand All @@ -40,116 +41,100 @@ internal static class ZipkinConversionExtensions

private static readonly ConcurrentDictionary<string, ZipkinEndpoint> LocalEndpointCache = new ConcurrentDictionary<string, ZipkinEndpoint>();
private static readonly ConcurrentDictionary<string, ZipkinEndpoint> RemoteEndpointCache = new ConcurrentDictionary<string, ZipkinEndpoint>();
private static readonly ConcurrentDictionary<CanonicalCode, string> CanonicalCodeCache = new ConcurrentDictionary<CanonicalCode, string>();

private static readonly DictionaryEnumerator<string, object, AttributeEnumerationState>.ForEachDelegate ProcessAttributesRef = ProcessAttributes;
private static readonly DictionaryEnumerator<string, object, AttributeEnumerationState>.ForEachDelegate ProcessLibraryResourcesRef = ProcessLibraryResources;
private static readonly ListEnumerator<Event, PooledList<ZipkinAnnotation>>.ForEachDelegate ProcessEventsRef = ProcessEvents;

internal static ZipkinSpan ToZipkinSpan(this SpanData otelSpan, ZipkinEndpoint defaultLocalEndpoint, bool useShortTraceIds = false)
{
var context = otelSpan.Context;
var startTimestamp = ToEpochMicroseconds(otelSpan.StartTimestamp);
var endTimestamp = ToEpochMicroseconds(otelSpan.EndTimestamp);

var spanBuilder =
ZipkinSpan.NewBuilder()
.TraceId(EncodeTraceId(context.TraceId, useShortTraceIds))
.Id(EncodeSpanId(context.SpanId))
.Kind(ToSpanKind(otelSpan))
.Name(otelSpan.Name)
.Timestamp(ToEpochMicroseconds(otelSpan.StartTimestamp))
.Duration(endTimestamp - startTimestamp);

string parentId = null;
if (otelSpan.ParentSpanId != default)
{
spanBuilder.ParentId(EncodeSpanId(otelSpan.ParentSpanId));
parentId = EncodeSpanId(otelSpan.ParentSpanId);
}

Tuple<string, int> remoteEndpointServiceName = null;
foreach (var label in otelSpan.Attributes)
var attributeEnumerationState = new AttributeEnumerationState
{
string key = label.Key;
string strVal = label.Value.ToString();
Tags = PooledList<KeyValuePair<string, string>>.Create(),
};

if (strVal != null
&& RemoteEndpointServiceNameKeyResolutionDictionary.TryGetValue(key, out int priority)
&& (remoteEndpointServiceName == null || priority < remoteEndpointServiceName.Item2))
{
remoteEndpointServiceName = new Tuple<string, int>(strVal, priority);
}
DictionaryEnumerator<string, object, AttributeEnumerationState>.AllocationFreeForEach(otelSpan.Attributes, ref attributeEnumerationState, ProcessAttributesRef);
DictionaryEnumerator<string, object, AttributeEnumerationState>.AllocationFreeForEach(otelSpan.LibraryResource.Attributes, ref attributeEnumerationState, ProcessLibraryResourcesRef);

spanBuilder.PutTag(key, strVal);
}
var localEndpoint = defaultLocalEndpoint;

// See https://github.com/open-telemetry/opentelemetry-specification/blob/master/specification/data-resource-semantic-conventions.md
string serviceName = string.Empty;
string serviceNamespace = string.Empty;
foreach (var label in otelSpan.LibraryResource.Attributes)
{
string key = label.Key;
object val = label.Value;
string strVal = val as string;
var serviceName = attributeEnumerationState.ServiceName;

if (key == Resource.ServiceNameKey && strVal != null)
{
serviceName = strVal;
}
else if (key == Resource.ServiceNamespaceKey && strVal != null)
{
serviceNamespace = strVal;
}
else
// override default service name
if (!string.IsNullOrWhiteSpace(serviceName))
{
if (!string.IsNullOrWhiteSpace(attributeEnumerationState.ServiceNamespace))
{
spanBuilder.PutTag(key, strVal ?? val?.ToString());
serviceName = attributeEnumerationState.ServiceNamespace + "." + serviceName;
}
}

if (serviceNamespace != string.Empty)
{
serviceName = serviceNamespace + "." + serviceName;
}

var endpoint = defaultLocalEndpoint;

// override default service name
if (serviceName != string.Empty)
{
endpoint = LocalEndpointCache.GetOrAdd(serviceName, _ => new ZipkinEndpoint()
if (!LocalEndpointCache.TryGetValue(serviceName, out localEndpoint))
{
Ipv4 = defaultLocalEndpoint.Ipv4,
Ipv6 = defaultLocalEndpoint.Ipv6,
Port = defaultLocalEndpoint.Port,
ServiceName = serviceName,
});
localEndpoint = defaultLocalEndpoint.Clone(serviceName);
LocalEndpointCache.TryAdd(serviceName, localEndpoint);
}
}

spanBuilder.LocalEndpoint(endpoint);

if ((otelSpan.Kind == SpanKind.Client || otelSpan.Kind == SpanKind.Producer) && remoteEndpointServiceName != null)
ZipkinEndpoint remoteEndpoint = null;
if ((otelSpan.Kind == SpanKind.Client || otelSpan.Kind == SpanKind.Producer) && attributeEnumerationState.RemoteEndpointServiceName != null)
{
spanBuilder.RemoteEndpoint(RemoteEndpointCache.GetOrAdd(remoteEndpointServiceName.Item1, _ => new ZipkinEndpoint
{
ServiceName = remoteEndpointServiceName.Item1,
}));
remoteEndpoint = RemoteEndpointCache.GetOrAdd(attributeEnumerationState.RemoteEndpointServiceName, ZipkinEndpoint.Create);
}

var status = otelSpan.Status;

if (status.IsValid)
{
spanBuilder.PutTag(StatusCode, status.CanonicalCode.ToString());
if (!CanonicalCodeCache.TryGetValue(status.CanonicalCode, out string canonicalCode))
{
canonicalCode = status.CanonicalCode.ToString();
CanonicalCodeCache.TryAdd(status.CanonicalCode, canonicalCode);
}

PooledList<KeyValuePair<string, string>>.Add(ref attributeEnumerationState.Tags, new KeyValuePair<string, string>(StatusCode, canonicalCode));

if (status.Description != null)
{
spanBuilder.PutTag(StatusDescription, status.Description);
PooledList<KeyValuePair<string, string>>.Add(ref attributeEnumerationState.Tags, new KeyValuePair<string, string>(StatusDescription, status.Description));
}
}

foreach (var annotation in otelSpan.Events)
{
spanBuilder.AddAnnotation(ToEpochMicroseconds(annotation.Timestamp), annotation.Name);
}
var annotations = PooledList<ZipkinAnnotation>.Create();
ListEnumerator<Event, PooledList<ZipkinAnnotation>>.AllocationFreeForEach(otelSpan.Events, ref annotations, ProcessEventsRef);

return new ZipkinSpan(
EncodeTraceId(context.TraceId, useShortTraceIds),
parentId,
EncodeSpanId(context.SpanId),
ToSpanKind(otelSpan),
otelSpan.Name,
ToEpochMicroseconds(otelSpan.StartTimestamp),
duration: endTimestamp - startTimestamp,
localEndpoint,
remoteEndpoint,
annotations,
attributeEnumerationState.Tags,
null,
null);
}

return spanBuilder.Build();
internal static string EncodeSpanId(ActivitySpanId spanId)
{
return spanId.ToHexString();
}

private static long ToEpochMicroseconds(DateTimeOffset timestamp)
internal static long ToEpochMicroseconds(DateTimeOffset timestamp)
{
return timestamp.ToUnixTimeMilliseconds() * 1000;
}
Expand All @@ -166,23 +151,83 @@ private static string EncodeTraceId(ActivityTraceId traceId, bool useShortTraceI
return id;
}

private static string EncodeSpanId(ActivitySpanId spanId)
private static string ToSpanKind(SpanData otelSpan)
{
return spanId.ToHexString();
switch (otelSpan.Kind)
{
case SpanKind.Server:
return "SERVER";
case SpanKind.Producer:
return "PRODUCER";
case SpanKind.Consumer:
return "CONSUMER";
default:
return "CLIENT";
}
}

private static ZipkinSpanKind ToSpanKind(SpanData otelSpan)
private static bool ProcessEvents(ref PooledList<ZipkinAnnotation> annotations, Event @event)
{
if (otelSpan.Kind == SpanKind.Server)
PooledList<ZipkinAnnotation>.Add(ref annotations, new ZipkinAnnotation(ToEpochMicroseconds(@event.Timestamp), @event.Name));
return true;
}

private static bool ProcessAttributes(ref AttributeEnumerationState state, KeyValuePair<string, object> attribute)
{
string key = attribute.Key;
if (!(attribute.Value is string strVal))
{
return ZipkinSpanKind.SERVER;
strVal = attribute.Value?.ToString();
}
else if (otelSpan.Kind == SpanKind.Client)

if (strVal != null
&& RemoteEndpointServiceNameKeyResolutionDictionary.TryGetValue(key, out int priority)
&& (state.RemoteEndpointServiceName == null || priority < state.RemoteEndpointServiceNamePriority))
{
return ZipkinSpanKind.CLIENT;
state.RemoteEndpointServiceName = strVal;
state.RemoteEndpointServiceNamePriority = priority;
}

return ZipkinSpanKind.CLIENT;
PooledList<KeyValuePair<string, string>>.Add(ref state.Tags, new KeyValuePair<string, string>(key, strVal));

return true;
}

private static bool ProcessLibraryResources(ref AttributeEnumerationState state, KeyValuePair<string, object> label)
{
// See https://github.com/open-telemetry/opentelemetry-specification/blob/master/specification/data-resource-semantic-conventions.md

string key = label.Key;
object val = label.Value;
string strVal = val as string;

if (key == Resource.ServiceNameKey && strVal != null)
{
state.ServiceName = strVal;
}
else if (key == Resource.ServiceNamespaceKey && strVal != null)
{
state.ServiceNamespace = strVal;
}
else
{
PooledList<KeyValuePair<string, string>>.Add(ref state.Tags, new KeyValuePair<string, string>(key, strVal ?? val?.ToString()));
}

return true;
}

private struct AttributeEnumerationState
{
public PooledList<KeyValuePair<string, string>> Tags;

public string RemoteEndpointServiceName;

public int RemoteEndpointServiceNamePriority;

public string ServiceName;

public string ServiceNamespace;
}
}
}
Loading

0 comments on commit b35311e

Please sign in to comment.