Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[IDP-902] feat(fix): address initial comments for #101 #114

1 change: 0 additions & 1 deletion src/Shared/DomainEventWrapperCollection.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
using System.Collections;
using System.Runtime.InteropServices.ComTypes;

namespace Workleap.DomainEventPropagation;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ internal sealed class EventPropagationClient : IEventPropagationClient
private readonly EventPropagationPublisherOptions _eventPropagationPublisherOptions;
private readonly DomainEventsHandlerDelegate _pipeline;
private readonly EventGridPublisherClient _eventGridPublisherClient;
private readonly EventGridClient _eventGridClient;
private readonly EventGridClient _eventGridNamespaceClient;

/// <summary>
/// To support Namespace topic, we need to use the following EventGridClient https://github.com/Azure/azure-sdk-for-net/blob/Azure.Messaging.EventGrid_4.17.0-beta.1/sdk/eventgrid/Azure.Messaging.EventGridV2/src/Generated/EventGridClient.cs
Expand All @@ -31,7 +31,7 @@ public EventPropagationClient(
this._eventPropagationPublisherOptions = eventPropagationPublisherOptions.Value;
this._pipeline = publishingDomainEventBehaviors.Reverse().Aggregate((DomainEventsHandlerDelegate)this.SendDomainEventsAsync, BuildPipeline);
this._eventGridPublisherClient = eventGridPublisherClientFactory.CreateClient(EventPropagationPublisherOptions.CustomTopicClientName);
this._eventGridClient = eventGridClientFactory.CreateClient(EventPropagationPublisherOptions.NamespaceTopicClientName);
this._eventGridNamespaceClient = eventGridClientFactory.CreateClient(EventPropagationPublisherOptions.NamespaceTopicClientName);
}

private static DomainEventsHandlerDelegate BuildPipeline(DomainEventsHandlerDelegate accumulator, IPublishingDomainEventBehavior next)
Expand Down Expand Up @@ -110,7 +110,7 @@ private Task SendCloudEvents(
return topicType switch
{
TopicType.Custom => this._eventGridPublisherClient.SendEventsAsync(cloudEvents, cancellationToken),
TopicType.Namespace => this._eventGridClient.PublishCloudEventsAsync(topicName, cloudEvents, cancellationToken),
TopicType.Namespace => this._eventGridNamespaceClient.PublishCloudEventsAsync(topicName, cloudEvents, cancellationToken),
_ => throw new NotSupportedException($"Topic type {topicType} is not supported"),
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,15 @@ private void AddRegistrations(Action<EventPropagationPublisherOptions> configure
this.Services.TryAddEnumerable(ServiceDescriptor.Singleton<IValidateOptions<EventPropagationPublisherOptions>, EventPropagationPublisherOptionsValidator>());
this.Services.TryAddEnumerable(ServiceDescriptor.Singleton<IPublishingDomainEventBehavior, TracingPublishingDomainEventBehavior>());

this.Services.AddAzureClients(ConfigureEventPublisher);
this.Services.AddAzureClients(ConfigureEventPublisherClients);
}

private static void BindFromWellKnownConfigurationSection(EventPropagationPublisherOptions options, IConfiguration configuration)
{
configuration.GetSection(EventPropagationPublisherOptions.SectionName).Bind(options);
}

private static void ConfigureEventPublisher(AzureClientFactoryBuilder builder)
private static void ConfigureEventPublisherClients(AzureClientFactoryBuilder builder)
{
builder.AddClient<EventGridPublisherClient, EventGridPublisherClientOptions>(EventGridPublisherClientFactory)
.WithName(EventPropagationPublisherOptions.CustomTopicClientName)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
using System.Reflection;
using Azure.Messaging;
using FluentAssertions;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.DependencyInjection.Extensions;
using Microsoft.Extensions.Logging;
Expand Down Expand Up @@ -34,7 +33,7 @@ public async Task Given_IllFormedEvent_When_HandleCloudEventAsync_Then_ReturnsRe
var result = await handler.HandleCloudEventAsync(cloudEvent, CancellationToken.None);

// Then
result.Should().Be(EventProcessingStatus.Rejected);
Assert.Equal(EventProcessingStatus.Rejected, result);
}

[Fact]
Expand All @@ -50,7 +49,7 @@ public async Task Given_EventTypeNotRegistered_When_HandleCloudEventAsync_Then_R
var result = await handler.HandleCloudEventAsync(cloudEvent, CancellationToken.None);

// Then
result.Should().Be(EventProcessingStatus.Rejected);
Assert.Equal(EventProcessingStatus.Rejected, result);
}

[Fact]
Expand All @@ -69,7 +68,7 @@ public async Task Given_TypeWasRegisteredButNoHandler_When_HandleCloudEventAsync
var result = await handler.HandleCloudEventAsync(cloudEvent, CancellationToken.None);

// Then
result.Should().Be(EventProcessingStatus.Rejected);
Assert.Equal(EventProcessingStatus.Rejected, result);
}

[Fact]
Expand All @@ -92,7 +91,7 @@ public async Task Given_FailingEventHandler_When_HandleCloudEventAsync_Then_Retu
var result = await handler.HandleCloudEventAsync(cloudEvent, CancellationToken.None);

// Then
result.Should().Be(EventProcessingStatus.Released);
Assert.Equal(EventProcessingStatus.Released, result);
}

[Fact]
Expand All @@ -112,8 +111,9 @@ public async Task Given_EventHandler_When_HandleCloudEventAsync_Then_ReturnsHand
var result = await handler.HandleCloudEventAsync(cloudEvent, CancellationToken.None);

// Then
SampleEventTestHandler.ReceivedEvents.Should().Contain(e => e.Message == eventMessage);
result.Should().Be(EventProcessingStatus.Handled);
Assert.Single(SampleEventTestHandler.ReceivedEvents, e => e.Message == eventMessage);

Assert.Equal(EventProcessingStatus.Handled, result);
}

[Fact]
Expand All @@ -133,8 +133,9 @@ public async Task Given_EventHandlersFromAssembly_When_HandleCloudEventAsync_The
var result = await handler.HandleCloudEventAsync(cloudEvent, CancellationToken.None);

// Then
SampleEventTestHandler.ReceivedEvents.Should().Contain(e => e.Message == eventMessage);
result.Should().Be(EventProcessingStatus.Handled);
Assert.Single(SampleEventTestHandler.ReceivedEvents, e => e.Message == eventMessage);

Assert.Equal(EventProcessingStatus.Handled, result);
}

private static CloudEvent GivenSampleEvent(string message = "Hello World!")
Expand Down
Original file line number Diff line number Diff line change
@@ -1,18 +1,16 @@
using AutoBogus;
using AutoBogus;
using Azure.Messaging;
using FakeItEasy;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging.Abstractions;
using Microsoft.Extensions.Options;
using Workleap.DomainEventPropagation.EventGridClientAdapter;
using Workleap.DomainEventPropagation.Subscription.PullDelivery.Tests.TestExtensions;

namespace Workleap.DomainEventPropagation.Subscription.PullDelivery.Tests;

// Test class cannot be made static
#pragma warning disable CA1052
public class EventPullerTests
#pragma warning restore CA1052
public abstract class EventPullerServiceTests
{
private static IEventGridClientAdapter GivenFakeClient(IEventGridClientWrapperFactory clientWrapperFactory, string subName)
{
Expand Down Expand Up @@ -47,17 +45,15 @@ private static void RegisterCloudEventsInClient(IEventGridClientAdapter client,
A<CancellationToken>._)).Returns(events);
}

private static async Task StartWaitAndStop(IHostedService puller)
private static async Task StartWaitAndStop(IHostedService pullerService)
{
// We need this to start on the thread pool otherwise it will just block the test
#pragma warning disable CS4014
Task.Run(() => puller.StartAsync(CancellationToken.None));
// We need this to start on the thread pool otherwise it will just block the test
Task.Run(() => pullerService.StartAsync(CancellationToken.None)).Forget();
await Task.Delay(50);
#pragma warning restore CS4014
await puller.StopAsync(CancellationToken.None);
await pullerService.StopAsync(CancellationToken.None);
}

public class TwoSubscribers : EventPullerTests
public class TwoSubscribers : EventPullerServiceTests
{
[Fact]
public async Task GivenPuller_WhenStarted_ThenEveryRegisteredClientWasCalled()
Expand All @@ -77,8 +73,8 @@ public async Task GivenPuller_WhenStarted_ThenEveryRegisteredClientWasCalled()
var options2 = GivenEventPropagationSubscriptionOptions(optionMonitor, clientName2);

// When
using var puller = new EventPuller(scopeFactory, eventGridClientDescriptors, clientFactory, optionMonitor, new NullLogger<EventPuller>());
await StartWaitAndStop(puller);
using var pullerService = new EventPullerService(scopeFactory, eventGridClientDescriptors, clientFactory, optionMonitor, new NullLogger<EventPullerService>());
await StartWaitAndStop(pullerService);

// Then
A.CallTo(() => fakeClient1
Expand Down Expand Up @@ -121,8 +117,8 @@ public async Task GivenFailingClient_WhenErrorOccured_ThenKeepsPollingAndDoesNot
RegisterCloudEventsInClient(functionalClient, options2, eventBundle);

// When
using var puller = new EventPuller(GivenScopeFactory(eventHandler), eventGridClientDescriptors, clientFactory, optionMonitor, new NullLogger<EventPuller>());
await StartWaitAndStop(puller);
using var pullerService = new EventPullerService(GivenScopeFactory(eventHandler), eventGridClientDescriptors, clientFactory, optionMonitor, new NullLogger<EventPullerService>());
await StartWaitAndStop(pullerService);

// Then
A.CallTo(() => failingClient
Expand All @@ -135,9 +131,9 @@ public async Task GivenFailingClient_WhenErrorOccured_ThenKeepsPollingAndDoesNot
}
}

public class OneSubscriber : EventPullerTests, IDisposable
public class OneSubscriber : EventPullerServiceTests, IDisposable
{
private readonly EventPuller _puller;
private readonly EventPullerService _pullerService;
private readonly IEventGridClientAdapter _client;
private readonly EventPropagationSubscriptionOptions _option;
private readonly ICloudEventHandler _eventHandler;
Expand All @@ -161,7 +157,7 @@ public OneSubscriber()

this._eventHandler = A.Fake<ICloudEventHandler>(opt => opt.Strict());

this._puller = new EventPuller(GivenScopeFactory(this._eventHandler), eventGridClientDescriptors, clientFactory, optionMonitor, new NullLogger<EventPuller>());
this._pullerService = new EventPullerService(GivenScopeFactory(this._eventHandler), eventGridClientDescriptors, clientFactory, optionMonitor, new NullLogger<EventPullerService>());
}

public void Dispose()
Expand All @@ -174,7 +170,7 @@ protected virtual void Dispose(bool disposing)
{
if (disposing)
{
this._puller.Dispose();
this._pullerService.Dispose();
}
}

Expand All @@ -188,7 +184,7 @@ public async Task GivenPuller_WhenMultipleEventsAreReceived_ThenEveryEventsAreHa
call2.Returns(EventProcessingStatus.Handled);

// When
await StartWaitAndStop(this._puller);
await StartWaitAndStop(this._pullerService);

// Then
call1.MustHaveHappenedOnceOrMore();
Expand All @@ -203,7 +199,7 @@ public async Task GivenPuller_WhenHandlerReturnsHandledStatus_ThenEventAreAcknow
.Returns(EventProcessingStatus.Handled);

// When
await StartWaitAndStop(this._puller);
await StartWaitAndStop(this._pullerService);

// Then
A.CallTo(() => this._client.AcknowledgeCloudEventAsync(
Expand All @@ -227,7 +223,7 @@ public async Task GivenPullerWithOneSub_WhenHandlerReturnsReleaseStatus_ThenEven
.Returns(EventProcessingStatus.Released);

// When
await StartWaitAndStop(this._puller);
await StartWaitAndStop(this._pullerService);

// Then
A.CallTo(() => this._client.ReleaseCloudEventAsync(
Expand All @@ -251,7 +247,7 @@ public async Task GivenPullerWithOneSub_WhenHandlerReturnsRejectedStatus_ThenEve
.Returns(EventProcessingStatus.Rejected);

// When
await StartWaitAndStop(this._puller);
await StartWaitAndStop(this._pullerService);

// Then
A.CallTo(() => this._client.RejectCloudEventAsync(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
using System.Reflection;
using Azure.Messaging.EventGrid.Namespaces;
using FakeItEasy;
using FluentAssertions;
using Microsoft.Extensions.Azure;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
Expand Down Expand Up @@ -117,8 +116,9 @@ public void GivenNamedConfigurations_WhenResolveClientDescriptors_ThenListEveryR
var clientDescriptors = serviceProvider.GetRequiredService<IEnumerable<EventGridClientDescriptor>>().ToArray();

// Then
clientDescriptors.Should().HaveCount(2);
clientDescriptors.Select(d => d.Name).Should().BeEquivalentTo(sectionName1, sectionName2);
Assert.Equal(2, clientDescriptors.Length);

Assert.True(clientDescriptors.Select(d => d.Name).SequenceEqual([sectionName1, sectionName2]));
}

[Fact]
Expand All @@ -131,7 +131,7 @@ public void GivenNamedConfigurations_WhenResolveEventPuller_ThenCreatesEveryClie
GivenConfigurations(services, sectionName1, sectionName2);
var fakeClientFactory = A.Fake<IAzureClientFactory<EventGridClient>>();
services.Replace(new ServiceDescriptor(typeof(IAzureClientFactory<EventGridClient>), fakeClientFactory));
services.AddTransient<ILogger<EventPuller>, NullLogger<EventPuller>>();
services.AddTransient<ILogger<EventPullerService>, NullLogger<EventPullerService>>();
services.AddTransient<ILogger<ICloudEventHandler>, NullLogger<ICloudEventHandler>>();

// When
Expand All @@ -157,7 +157,7 @@ public void GivenNoSubscriber_WhenRegisterHandlerIndividually_ThenExceptionIsThr
.AddDomainEventHandler<SampleEvent, SampleEventTestHandler>();

// Then
act.Should().Throw<InvalidOperationException>();
Assert.Throws<InvalidOperationException>(act);
}

[Fact]
Expand All @@ -171,7 +171,7 @@ public void GivenNoSubscriber_WhenRegisterHandlersFromAssembly_ThenExceptionIsTh
.AddDomainEventHandlers(Assembly.GetAssembly(typeof(ServiceCollectionEventSubscriptionExtensionsTests))!);

// Then
act.Should().Throw<InvalidOperationException>();
Assert.Throws<InvalidOperationException>(act);
}

[Fact]
Expand All @@ -181,13 +181,13 @@ public void GivenMultipleHandlersForSameEvent_WhenRegistersThemIndividually_Then
var services = new ServiceCollection();

// When
var fct = () => services.AddPullDeliverySubscription()
var act = () => services.AddPullDeliverySubscription()
.AddTopicSubscription()
.AddDomainEventHandler<MisconfiguredTestAssembly.SampleEvent, MisconfiguredTestAssembly.SampleEventTestHandler>()
.AddDomainEventHandler<MisconfiguredTestAssembly.SampleEvent, MisconfiguredTestAssembly.AnotherSampleEventTestHandler>();

// Then
fct.Should().Throw<InvalidOperationException>();
Assert.Throws<InvalidOperationException>(act);
}

[Fact]
Expand All @@ -197,13 +197,13 @@ public void GivenMultipleHandlersForSameEvent_WhenIndividuallyRegisteredAndFromA
var services = new ServiceCollection();

// When
var fct = () => services.AddPullDeliverySubscription()
var act = () => services.AddPullDeliverySubscription()
.AddTopicSubscription()
.AddDomainEventHandler<SampleEvent, SampleEventTestHandler>()
.AddDomainEventHandlers(Assembly.GetAssembly(typeof(ServiceCollectionEventSubscriptionExtensionsTests))!);

// Then
fct.Should().Throw<InvalidOperationException>();
Assert.Throws<InvalidOperationException>(act);
}

[Fact]
Expand All @@ -217,7 +217,7 @@ public void GivenMultipleHandlersForSameEventInAssembly_WhenRegisterHandlers_The
.AddDomainEventHandlers(Assembly.GetAssembly(typeof(MisconfiguredTestAssembly.SampleEvent))!);

// Then
act.Should().Throw<InvalidOperationException>();
Assert.Throws<InvalidOperationException>(act);
}

private static void GivenConfigurations(IServiceCollection services, params string[] sections)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
namespace Workleap.DomainEventPropagation.Subscription.PullDelivery.Tests.TestExtensions;

internal static class TaskExtensions
{
/// <summary>
/// Observes the task to avoid the UnobservedTaskException event to be raised.
/// </summary>
/// <remarks>
/// Inspired from <see href="https://www.meziantou.net/fire-and-forget-a-task-in-dotnet.htm">this blog post</see>
/// </remarks>
public static void Forget(this Task task)
{
// note: this code is inspired by a tweet from Ben Adams: https://twitter.com/ben_a_adams/status/1045060828700037125
// Only care about tasks that may fault (not completed) or are faulted,
// so fast-path for SuccessfullyCompleted and Canceled tasks.
if (!task.IsCompleted || task.IsFaulted)
{
// use "_" (Discard operation) to remove the warning IDE0058: Because this call is not awaited, execution of the current method continues before the call is completed
// https://learn.microsoft.com/en-us/dotnet/csharp/fundamentals/functional/discards?WT.mc_id=DT-MVP-5003978#a-standalone-discard
_ = ForgetAwaited(task);
}

// Allocate the async/await state machine only when needed for performance reasons.
// More info about the state machine: https://blogs.msdn.microsoft.com/seteplia/2017/11/30/dissecting-the-async-methods-in-c/?WT.mc_id=DT-MVP-5003978
static async Task ForgetAwaited(Task task)
{
try
{
// No need to resume on the original SynchronizationContext, so use ConfigureAwait(false)
await task.ConfigureAwait(false);
}
catch
{
// Nothing to do here
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
<PackageReference Include="AutoBogus" Version="2.13.1" />
<PackageReference Include="Azure.Identity" Version="1.7.0" />
<PackageReference Include="FakeItEasy" Version="8.0.0" />
<PackageReference Include="FluentAssertions" Version="6.12.0" />
<PackageReference Include="GSoft.Extensions.Xunit" Version="1.0.1" />
<PackageReference Include="Microsoft.AspNetCore.Mvc.Testing" Version="6.0.20" />
<PackageReference Include="Microsoft.Extensions.Configuration" Version="6.0.1" />
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
using Azure.Messaging.EventGrid.Namespaces;

namespace Workleap.DomainEventPropagation.EventGridClientAdapter;
namespace Workleap.DomainEventPropagation.EventGridClientAdapter;

internal interface IEventGridClientAdapter
{
Expand Down
Loading