Skip to content

Commit

Permalink
Rebasing on latest master and removing the need for OpenTelemetry.API…
Browse files Browse the repository at this point in the history
… dependency.
  • Loading branch information
stebet committed Mar 13, 2023
1 parent c39dfd3 commit 0768f20
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 22 deletions.
1 change: 0 additions & 1 deletion projects/RabbitMQ.Client/RabbitMQ.Client.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@
<PackageReference Include="Microsoft.NETFramework.ReferenceAssemblies" Version="1.0.3" PrivateAssets="all" />
<PackageReference Include="Microsoft.SourceLink.GitHub" Version="1.1.1" PrivateAssets="All" />
<PackageReference Include="MinVer" Version="4.3.0" PrivateAssets="all" />
<PackageReference Include="OpenTelemetry.Api" Version="1.3.1" />
<PackageReference Include="System.Memory" Version="4.5.5" />
<PackageReference Include="System.Threading.Channels" Version="7.0.0" />
<PackageReference Include="System.IO.Pipelines" Version="7.0.0" />
Expand Down
13 changes: 7 additions & 6 deletions projects/RabbitMQ.Client/client/impl/ModelBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,6 @@
using System.Threading;
using System.Threading.Tasks;

using OpenTelemetry;
using OpenTelemetry.Context.Propagation;

using RabbitMQ.Client.client.framing;
using RabbitMQ.Client.client.impl;
using RabbitMQ.Client.ConsumerDispatching;
Expand All @@ -57,7 +54,6 @@ internal abstract class ModelBase : IModel, IRecoverable
///<summary>Only used to kick-start a connection open
///sequence. See <see cref="Connection.Open"/> </summary>
internal BlockingCell<ConnectionStartDetails> m_connectionStartCell;
private static readonly TextMapPropagator Propagator = new CompositeTextMapPropagator(new TextMapPropagator[] { new TraceContextPropagator(), new BaggagePropagator() });

private readonly RpcContinuationQueue _continuationQueue = new RpcContinuationQueue();
private readonly ManualResetEventSlim _flowControlBlock = new ManualResetEventSlim(true);
Expand Down Expand Up @@ -912,8 +908,13 @@ public BasicGetResult BasicGet(string queue, bool autoAck)
public void BasicPublish<TProperties>(string exchange, string routingKey, in TProperties basicProperties, ReadOnlyMemory<byte> body, bool mandatory)
where TProperties : IReadOnlyBasicProperties, IAmqpHeader
{
void InjectTraceContextIntoBasicProperties(IBasicProperties props, string key, string value)
void InjectTraceContextIntoBasicProperties(object propsObj, string key, string value)
{
if (propsObj is not IBasicProperties props)
{
return;
}

if (props.Headers == null)
{
props.Headers = new Dictionary<string, object>();
Expand Down Expand Up @@ -961,7 +962,7 @@ void InjectTraceContextIntoBasicProperties(IBasicProperties props, string key, s
}

// Inject the ActivityContext into the message headers to propagate trace context to the receiving service.
Propagator.Inject(new PropagationContext(sendActivity.Context, Baggage.Current), props, InjectTraceContextIntoBasicProperties);
DistributedContextPropagator.Current.Inject(sendActivity, props, InjectTraceContextIntoBasicProperties);

ModelSend(in cmd, (BasicProperties)props, body);
return;
Expand Down
49 changes: 34 additions & 15 deletions projects/RabbitMQ.Client/client/impl/RabbitMQActivitySource.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,6 @@
using System.Threading;
using System.Threading.Tasks;

using OpenTelemetry;
using OpenTelemetry.Context.Propagation;

using RabbitMQ.Client.Events;
using RabbitMQ.Client.Framing.Impl;

Expand All @@ -18,7 +15,6 @@ namespace RabbitMQ.Client
internal class RabbitMQActivitySource
{
internal static ActivitySource source = new ActivitySource("RabbitMQ.Client", typeof(RabbitMQActivitySource).Assembly.GetCustomAttribute<AssemblyInformationalVersionAttribute>().InformationalVersion);
private static readonly TextMapPropagator Propagator = new CompositeTextMapPropagator(new TextMapPropagator[] { new TraceContextPropagator(), new BaggagePropagator() });

static RabbitMQActivitySource()
{
Expand Down Expand Up @@ -46,11 +42,20 @@ internal static Activity Receive(string routingKey, string exchange, ulong deliv
if (source.HasListeners())
{
// Extract the PropagationContext of the upstream parent from the message headers.
PropagationContext parentContext = Propagator.Extract(default, readOnlyBasicProperties, ExtractTraceContextFromBasicProperties);
Baggage.Current = parentContext.Baggage;
Activity activity = StartRabbitMQActivity($"{routingKey} receive", ActivityKind.Consumer, parentContext.ActivityContext);
DistributedContextPropagator.Current.ExtractTraceIdAndState(readOnlyBasicProperties, ExtractTraceIdAndState, out string traceId, out string traceState);
IEnumerable<KeyValuePair<string, string>> baggage = DistributedContextPropagator.Current.ExtractBaggage(readOnlyBasicProperties, ExtractTraceIdAndState);
ActivityContext.TryParse(traceId, traceState, out ActivityContext parentContext);
Activity activity = StartRabbitMQActivity($"{routingKey} receive", ActivityKind.Consumer, parentContext);
if (activity != null && activity.IsAllDataRequested)
{
if (baggage != null)
{
foreach (var item in baggage)
{
Activity.Current?.SetBaggage(item.Key, item.Value);
}
}

PopulateMessagingTags("receive", routingKey, exchange, deliveryTag, readOnlyBasicProperties, bodySize, activity);
}

Expand All @@ -65,12 +70,20 @@ internal static Activity Process(BasicDeliverEventArgs deliverEventArgs)
if (source.HasListeners())
{
// Extract the PropagationContext of the upstream parent from the message headers.
PropagationContext parentContext = Propagator.Extract(default, deliverEventArgs.BasicProperties, ExtractTraceContextFromBasicProperties);
Baggage.Current = parentContext.Baggage;

Activity activity = StartRabbitMQActivity($"{deliverEventArgs.RoutingKey} process", ActivityKind.Consumer, parentContext.ActivityContext);
DistributedContextPropagator.Current.ExtractTraceIdAndState(deliverEventArgs.BasicProperties, ExtractTraceIdAndState, out string traceId, out string traceState);
IEnumerable<KeyValuePair<string, string>> baggage = DistributedContextPropagator.Current.ExtractBaggage(deliverEventArgs.BasicProperties, ExtractTraceIdAndState);
ActivityContext.TryParse(traceId, traceState, out ActivityContext parentContext);
Activity activity = StartRabbitMQActivity($"{deliverEventArgs.RoutingKey} process", ActivityKind.Consumer, parentContext);
if (activity != null && activity.IsAllDataRequested)
{
if (baggage != null)
{
foreach (var item in baggage)
{
Activity.Current?.SetBaggage(item.Key, item.Value);
}
}

PopulateMessagingTags("process", deliverEventArgs.RoutingKey, deliverEventArgs.Exchange, deliverEventArgs.DeliveryTag, deliverEventArgs.BasicProperties, deliverEventArgs.Body.Length, activity);
}

Expand Down Expand Up @@ -123,12 +136,18 @@ private static void PopulateMessagingTags(string operation, string routingKey, s
}
}
}

static IEnumerable<string> ExtractTraceContextFromBasicProperties<T>(T props, string key) where T : IReadOnlyBasicProperties
private static void ExtractTraceIdAndState(object carrier, string name, out string value, out IEnumerable<string> values)
{
if (props.Headers.TryGetValue(key, out var value) && value is byte[] bytes)
if (carrier is IReadOnlyBasicProperties props && props.Headers is not null && props.Headers.TryGetValue(name, out object propsVal) && propsVal is byte[] bytes)
{
value = Encoding.UTF8.GetString(bytes);
values = default;
}
else
{
yield return Encoding.UTF8.GetString(bytes);
value = default;
values = default;
}
}
}
Expand Down

0 comments on commit 0768f20

Please sign in to comment.