Skip to content

Commit

Permalink
Add command and event payloads to activity
Browse files Browse the repository at this point in the history
By exposing these as custom properties, we allow listeners to perform additional operations on them without having to extend the message bus itself.
  • Loading branch information
kzu committed Jul 7, 2023
1 parent 0595fcc commit 01888e0
Show file tree
Hide file tree
Showing 4 changed files with 84 additions and 13 deletions.
10 changes: 5 additions & 5 deletions src/Merq.Core/MessageBus.cs
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ public bool CanHandle(IExecutable command) => canHandleMap.GetOrAdd(GetCommandTy
public void Execute(ICommand command)
{
var type = GetCommandType(command);
using var activity = StartActivity(type, Telemetry.Process);
using var activity = StartCommandActivity(type, command);

try
{
Expand Down Expand Up @@ -222,7 +222,7 @@ public void Execute(ICommand command)
public TResult Execute<TResult>(ICommand<TResult> command)
{
var type = GetCommandType(command);
using var activity = StartActivity(type, Telemetry.Process);
using var activity = StartCommandActivity(type, command);

try
{
Expand Down Expand Up @@ -253,7 +253,7 @@ public TResult Execute<TResult>(ICommand<TResult> command)
public Task ExecuteAsync(IAsyncCommand command, CancellationToken cancellation = default)
{
var type = GetCommandType(command);
using var activity = StartActivity(type, Telemetry.Process);
using var activity = StartCommandActivity(type, command);

try
{
Expand Down Expand Up @@ -286,7 +286,7 @@ public Task ExecuteAsync(IAsyncCommand command, CancellationToken cancellation =
public Task<TResult> ExecuteAsync<TResult>(IAsyncCommand<TResult> command, CancellationToken cancellation = default)
{
var type = GetCommandType(command);
using var activity = StartActivity(type, Telemetry.Process);
using var activity = StartCommandActivity(type, command);

try
{
Expand Down Expand Up @@ -315,7 +315,7 @@ public Task<TResult> ExecuteAsync<TResult>(IAsyncCommand<TResult> command, Cance
public void Notify<TEvent>(TEvent e)
{
var type = (e ?? throw new ArgumentNullException(nameof(e))).GetType();
using var activity = StartActivity(type, Publish);
using var activity = StartEventActivity(type, e);
var watch = Stopwatch.StartNew();

try
Expand Down
16 changes: 14 additions & 2 deletions src/Merq.Core/Telemetry.cs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,12 @@ static Telemetry()
// NOTE: this is not an entirely satisfactory way to tell events from commands apart.
public const string Process = nameof(Process);

public static Activity? StartActivity(Type type, string operation, [CallerMemberName] string? member = default, [CallerFilePath] string? file = default, [CallerLineNumber] int? line = default)
public static Activity? StartCommandActivity(Type type, object command) => StartActivity(type, Process, "Command", command);

public static Activity? StartEventActivity(Type type, object @event) => StartActivity(type, Publish, "Event", @event);

public static Activity? StartActivity(Type type, string operation, string? property = default, object? value = default,
[CallerMemberName] string? member = default, [CallerFilePath] string? file = default, [CallerLineNumber] int? line = default)
{
if (operation == Publish)
events.Add(1, new KeyValuePair<string, object?>("Name", type.FullName));
Expand All @@ -55,7 +60,7 @@ static Telemetry()
// Requirement is that the destination has low cardinality. In our case, the destination is
// the logical operation being performed, such as "Execute", "Notify" or "Deliver". The
// operation is actually the type being acted on (such as CreateUser -a command- or UserCreated -event).
return tracer.StartActivity(ActivityKind.Producer, name: $"{operation}/{type.FullName}")
var activity = tracer.CreateActivity($"{operation}/{type.FullName}", ActivityKind.Producer)
?.SetTag("code.function", member)
?.SetTag("code.filepath", file)
?.SetTag("code.lineno", line)
Expand All @@ -65,6 +70,13 @@ static Telemetry()
?.SetTag("messaging.operation", operation.ToLowerInvariant())
?.SetTag("messaging.protocol.name", type.Assembly.GetName().Name)
?.SetTag("messaging.protocol.version", type.Assembly.GetName().Version?.ToString() ?? "unknown");

if (property != null && value != null)
activity?.SetCustomProperty(property, value);

activity?.Start();

return activity;
}

public static void RecordException(this Activity? activity, Exception e)
Expand Down
57 changes: 57 additions & 0 deletions src/Merq.Tests/MessageBusSpec.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Reactive.Subjects;
using System.Threading;
using System.Threading.Tasks;
Expand Down Expand Up @@ -443,5 +444,61 @@ public async Task when_executing_non_public_àsynccommand_result_then_invokes_ha
Assert.Equal(42, await bus.ExecuteAsync(new NonPublicAsyncCommandResult(), CancellationToken.None));
}

[Fact]
public void when_notifying_can_access_event_from_activity_stop()
{
object? e = default;
using var listener = new ActivityListener
{
ActivityStopped = activity => e = activity.GetCustomProperty("Event"),
Sample = (ref ActivityCreationOptions<ActivityContext> _) => ActivitySamplingResult.AllData,
ShouldListenTo = source => source.Name == "Merq",
};

ActivitySource.AddActivityListener(listener);

bus.Notify(new ConcreteEvent());

Assert.NotNull(e);
Assert.IsType<ConcreteEvent>(e);
}

[Fact]
public void when_executing_can_access_event_from_activity_stop()
{
object? c = default;
using var listener = new ActivityListener
{
ActivityStopped = activity => c = activity.GetCustomProperty("Command"),
Sample = (ref ActivityCreationOptions<ActivityContext> _) => ActivitySamplingResult.AllData,
ShouldListenTo = source => source.Name == "Merq",
};

ActivitySource.AddActivityListener(listener);

Assert.Throws<InvalidOperationException>(() => bus.Execute(new Command()));

Assert.NotNull(c);
}

[Fact]
public void when_execute_throws_activity_has_error_status()
{
Activity? activity = default;
using var listener = new ActivityListener
{
ActivityStarted = x => activity = x,
Sample = (ref ActivityCreationOptions<ActivityContext> _) => ActivitySamplingResult.AllData,
ShouldListenTo = source => source.Name == "Merq",
};

ActivitySource.AddActivityListener(listener);

Assert.Throws<InvalidOperationException>(() => bus.Execute(new Command()));

Assert.NotNull(activity);
Assert.Equal(ActivityStatusCode.Error, activity.Status);
}

class NestedEvent { }
}
14 changes: 8 additions & 6 deletions src/Samples/ConsoleApp/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,14 @@
var bus = services.GetRequiredService<IMessageBus>();

// .NET-style activity listening
//using var listener = new ActivityListener
//{
// ActivityStarted = activity => MarkupLine($"[red]Activity started: {activity.OperationName}[/]"),
// ActivityStopped = activity => MarkupLine($"[red]Activity stopped: {activity.OperationName}[/]"),
// ShouldListenTo = source => source.Name == "Merq.Core",
//};
using var listener = new ActivityListener
{
ActivityStarted = activity => MarkupLine($"[grey]Activity started: {activity.OperationName}[/]"),
ActivityStopped = activity => MarkupLine($"[grey]Activity stopped: {activity.OperationName}[/]"),
ShouldListenTo = source => source.Name == "Merq",
};

ActivitySource.AddActivityListener(listener);

// Setup OpenTelemetry: https://learn.microsoft.com/en-us/dotnet/core/diagnostics/distributed-tracing-instrumentation-walkthroughs
using var tracer = Sdk
Expand Down

0 comments on commit 01888e0

Please sign in to comment.