Skip to content

Commit

Permalink
Review feedback adjustments.
Browse files Browse the repository at this point in the history
  • Loading branch information
MagnusSandgren committed Oct 17, 2024
1 parent bb8bfaa commit 9386a3c
Show file tree
Hide file tree
Showing 8 changed files with 138 additions and 41 deletions.
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System.Reflection;
using System.Diagnostics;
using System.Reflection;
using FluentValidation;
using Microsoft.Extensions.Options;

Expand Down Expand Up @@ -33,7 +34,7 @@ public FluentValidationOptions(params Assembly[] assemblies)
_validators = AssemblyScanner
.FindValidatorsInAssemblies(assemblies, includeInternalTypes: true)
.Where(x => x.InterfaceType.GenericTypeArguments.First() == OptionType)
.Select(x => (IValidator<TOptions>)Activator.CreateInstance(x.ValidatorType, nonPublic: true)!)
.Select(x => (IValidator<TOptions>)Activator.CreateInstance(x.ValidatorType, nonPublic: true)! ?? throw new UnreachableException())
.ToList();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,17 +27,24 @@ public interface IInfrastructureBuilder
IServiceCollection Build();
}

internal sealed class InfrastructureBuilder(IServiceCollection services, IConfiguration configuration, IHostEnvironment environment) :
internal sealed class InfrastructureBuilder :
IInfrastructureBuilder,
IPubSubInfrastructureChoice,
ISubscriptionInfrastructureOptions
{
private readonly IServiceCollection _services = services ?? throw new ArgumentNullException(nameof(services));
private readonly IConfiguration _configuration = configuration ?? throw new ArgumentNullException(nameof(configuration));
private readonly IHostEnvironment _environment = environment ?? throw new ArgumentNullException(nameof(environment));
private readonly IServiceCollection _services;
private readonly IConfiguration _configuration;
private readonly IHostEnvironment _environment;
private readonly List<Action<InfrastructureBuilderContext>> _actions = [];
private readonly List<Action<IBusRegistrationConfigurator>> _busConfigurations = [];

public InfrastructureBuilder(IServiceCollection services, IConfiguration configuration, IHostEnvironment environment)
{
_services = services ?? throw new ArgumentNullException(nameof(services));
_configuration = configuration ?? throw new ArgumentNullException(nameof(configuration));
_environment = environment ?? throw new ArgumentNullException(nameof(environment));
}

public IServiceCollection Build()
{
var infrastructureSettings = RegisterAndValidateInfrastructureSettings();
Expand Down Expand Up @@ -66,7 +73,7 @@ public ISubscriptionInfrastructureOptions WithPubSubCapabilities(params Assembly
{
_busConfigurations.Add(x => x
.AddConsumers(consumerAssemblies
.DefaultIfEmpty(Assembly.GetEntryAssembly())
.DefaultIfEmpty(Assembly.GetEntryAssembly() ?? throw new InvalidOperationException("Could not determine the entry assembly."))
.ToArray()));
return AddAction(context => AddPubSubCapabilities(context, _busConfigurations));
}
Expand All @@ -82,9 +89,9 @@ private InfrastructureBuilder AddAction(Action<InfrastructureBuilderContext> act

private InfrastructureSettings RegisterAndValidateInfrastructureSettings()
{
var infrastructureConfigurationSection = configuration
var infrastructureConfigurationSection = _configuration
.GetSection(InfrastructureSettings.ConfigurationSectionName);
services.AddOptions<InfrastructureSettings>()
_services.AddOptions<InfrastructureSettings>()
.Bind(infrastructureConfigurationSection)
.ValidateFluently();
var settings = infrastructureConfigurationSection.Get<InfrastructureSettings>()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ internal static void AddInfrastructure_Internal(InfrastructureBuilderContext bui
.AddSingleton<INotificationProcessingContextFactory, NotificationProcessingContextFactory>()

// HttpClient
.AddHttpClients(configuration.GetSection(InfrastructureSettings.ConfigurationSectionName))
.AddHttpClients(infrastructureSettings)

// Decorators
.Decorate(typeof(INotificationHandler<>), typeof(IdempotentNotificationHandler<>));
Expand Down Expand Up @@ -251,11 +251,11 @@ internal static void AddPubCapabilities(InfrastructureBuilderContext builderCont
}

private static IServiceCollection AddHttpClients(this IServiceCollection services,
IConfigurationSection infrastructureConfigurationSection)
InfrastructureSettings infrastructureSettings)
{
services.
AddMaskinportenHttpClient<ICloudEventBus, AltinnEventsClient, SettingsJwkClientDefinition>(
infrastructureConfigurationSection,
infrastructureSettings,
x => x.ClientSettings.ExhangeToAltinnToken = true)
.ConfigureHttpClient((services, client) =>
{
Expand All @@ -270,7 +270,7 @@ private static IServiceCollection AddHttpClients(this IServiceCollection service
.AddPolicyHandlerFromRegistry(PollyPolicy.DefaultHttpRetryPolicy);

services.AddMaskinportenHttpClient<IPartyNameRegistry, PartyNameRegistryClient, SettingsJwkClientDefinition>(
infrastructureConfigurationSection,
infrastructureSettings,
x => x.ClientSettings.ExhangeToAltinnToken = true)
.ConfigureHttpClient((services, client) =>
{
Expand All @@ -281,7 +281,7 @@ private static IServiceCollection AddHttpClients(this IServiceCollection service
.AddPolicyHandlerFromRegistry(PollyPolicy.DefaultHttpRetryPolicy);

services.AddMaskinportenHttpClient<IAltinnAuthorization, AltinnAuthorizationClient, SettingsJwkClientDefinition>(
infrastructureConfigurationSection,
infrastructureSettings,
x => x.ClientSettings.ExhangeToAltinnToken = true)
.ConfigureHttpClient((services, client) =>
{
Expand Down Expand Up @@ -347,14 +347,13 @@ private static IServiceCollection ConfigureFusionCache(this IServiceCollection s

private static IHttpClientBuilder AddMaskinportenHttpClient<TClient, TImplementation, TClientDefinition>(
this IServiceCollection services,
IConfiguration configuration,
InfrastructureSettings infrastructureSettings,
Action<TClientDefinition>? configureClientDefinition = null)
where TClient : class
where TImplementation : class, TClient
where TClientDefinition : class, IClientDefinition
{
var settings = configuration.Get<InfrastructureSettings>();
services.RegisterMaskinportenClientDefinition<TClientDefinition>(typeof(TClient)!.FullName, settings!.Maskinporten);
services.RegisterMaskinportenClientDefinition<TClientDefinition>(typeof(TClient).FullName, infrastructureSettings.Maskinporten);
return services
.AddHttpClient<TClient, TImplementation>()
.AddMaskinportenHttpMessageHandler<TClientDefinition, TClient>(configureClientDefinition);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ public InfrastructureSettingsValidator(
IValidator<AltinnPlatformSettings> altinnPlatformSettingsValidator,
IValidator<AltinnCdnPlatformSettings> altinnCdnPlatformSettingsValidator,
IValidator<MaskinportenSettings> maskinportenSettingsValidator,
IValidator<RedisSettings> redisSettingsValidator)
IValidator<RedisSettings> redisSettingsValidator,
IValidator<MassTransitSettings> massTransitSettingsValidator)
{
RuleFor(x => x.DialogDbConnectionString)
.NotEmpty();
Expand All @@ -69,17 +70,23 @@ public InfrastructureSettingsValidator(
RuleFor(x => x.Redis)
.NotEmpty()
.SetValidator(redisSettingsValidator);

RuleFor(x => x.MassTransit)
.SetValidator(massTransitSettingsValidator);
}

// This is here to be able to use the validator without having access to the service provider.
private InfrastructureSettingsValidator() : this(
new AltinnPlatformSettingsValidator(),
new AltinnCdnPlatformSettingsValidator(),
new MaskinportenSettingsValidator(),
new RedisSettingsValidator())
new RedisSettingsValidator(),
new MassTransitSettingsValidator())
{ }
}

internal sealed class MassTransitSettingsValidator : AbstractValidator<MassTransitSettings>;

internal sealed class AltinnPlatformSettingsValidator : AbstractValidator<AltinnPlatformSettings>
{
public AltinnPlatformSettingsValidator()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using System.Diagnostics.CodeAnalysis;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;

namespace Digdir.Domain.Dialogporten.Infrastructure.Persistence.IdempotentNotifications;

Expand All @@ -19,11 +20,17 @@ internal sealed class NotificationProcessingContext : INotificationProcessingCon
private readonly Action<Guid> _onDispose;
private readonly Guid _eventId;

private ILogger<NotificationProcessingContext>? _logger;
private DialogDbContext? _db;
private IServiceScope? _serviceScope;
private bool _acknowledged;

public NotificationProcessingContext(IServiceScopeFactory serviceScopeFactory, Guid eventId, Action<Guid> onDispose)
public bool Disposed { get; private set; }

public NotificationProcessingContext(
IServiceScopeFactory serviceScopeFactory,
Guid eventId,
Action<Guid> onDispose)
{
_serviceScopeFactory = serviceScopeFactory ?? throw new ArgumentNullException(nameof(serviceScopeFactory));
_onDispose = onDispose ?? throw new ArgumentNullException(nameof(onDispose));
Expand All @@ -32,7 +39,7 @@ public NotificationProcessingContext(IServiceScopeFactory serviceScopeFactory, G

public async Task Ack(CancellationToken cancellationToken = default)
{
EnsureInitialized();
EnsureAlive();
var existingAckInDatabase = _db.ChangeTracker
.Entries<NotificationAcknowledgement>()
.Any(x => x.Entity.EventId == _eventId && x.State
Expand All @@ -51,7 +58,7 @@ await _db.NotificationAcknowledgements

public async Task Nack(CancellationToken cancellationToken = default)
{
EnsureInitialized();
EnsureAlive();
if (!_acknowledged && _db.ChangeTracker.HasChanges())
{
await _db.SaveChangesAsync(cancellationToken);
Expand All @@ -60,7 +67,7 @@ public async Task Nack(CancellationToken cancellationToken = default)

public async Task AckHandler(string handlerName, CancellationToken cancellationToken = default)
{
EnsureInitialized();
EnsureAlive();
await _db.NotificationAcknowledgements.AddAsync(new()
{
EventId = _eventId,
Expand All @@ -70,7 +77,7 @@ await _db.NotificationAcknowledgements.AddAsync(new()

public Task<bool> HandlerIsAcked(string handlerName, CancellationToken cancellationToken = default)
{
EnsureInitialized();
EnsureAlive();
var acknowledged = _db.NotificationAcknowledgements
.Local
.Any(x => x.EventId == _eventId && x.NotificationHandler == handlerName);
Expand All @@ -81,14 +88,22 @@ public async ValueTask DisposeAsync()
{
if (!_acknowledged)
{
await Nack();
try
{
await Nack();
}
catch (Exception e)
{
_logger?.LogError(e, "Failed to save changes to database.");
}
}

_initializeLock.Dispose();
_serviceScope?.Dispose();
_serviceScope = null;
_db = null;
_onDispose(_eventId);
Disposed = true;
}

internal async Task Initialize(bool isFirstAttempt = false, CancellationToken cancellationToken = default)
Expand All @@ -104,6 +119,7 @@ internal async Task Initialize(bool isFirstAttempt = false, CancellationToken ca

_serviceScope = _serviceScopeFactory.CreateScope();
_db = _serviceScope.ServiceProvider.GetRequiredService<DialogDbContext>();
_logger = _serviceScope.ServiceProvider.GetRequiredService<ILogger<NotificationProcessingContext>>();
if (!isFirstAttempt)
{
await _db.NotificationAcknowledgements
Expand All @@ -117,10 +133,11 @@ await _db.NotificationAcknowledgements
}
}

[MemberNotNull(nameof(_db), nameof(_serviceScope))]
private void EnsureInitialized()
[MemberNotNull(nameof(_db), nameof(_serviceScope), nameof(_logger))]
private void EnsureAlive()
{
if (_db is null || _serviceScope is null)
ObjectDisposedException.ThrowIf(Disposed, this);
if (_db is null || _serviceScope is null || _logger is null)
{
throw new InvalidOperationException("Transaction not initialized.");
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
using System.Collections.Concurrent;
using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
using Digdir.Domain.Dialogporten.Domain.Common.EventPublisher;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;

namespace Digdir.Domain.Dialogporten.Infrastructure.Persistence.IdempotentNotifications;

Expand All @@ -10,34 +13,97 @@ public interface INotificationProcessingContextFactory
INotificationProcessingContext GetExistingContext(Guid eventId);
}

internal sealed class NotificationProcessingContextFactory : INotificationProcessingContextFactory
internal sealed class NotificationProcessingContextFactory : INotificationProcessingContextFactory, IDisposable
{
private readonly ConcurrentDictionary<Guid, NotificationProcessingContext> _contextByEventId = new();
private readonly ConcurrentDictionary<Guid, WeakReference<NotificationProcessingContext>> _contextByEventId = new();
private readonly IServiceScopeFactory _serviceScopeFactory;
private readonly ILogger<NotificationProcessingContextFactory> _logger;
private readonly PeriodicTimer _cleanupTimer = new(TimeSpan.FromMinutes(10));
private readonly CancellationTokenSource _cleanupCts = new();

public NotificationProcessingContextFactory(IServiceScopeFactory serviceScopeFactory)
public NotificationProcessingContextFactory(IServiceScopeFactory serviceScopeFactory, ILogger<NotificationProcessingContextFactory> logger)
{
_serviceScopeFactory = serviceScopeFactory ?? throw new ArgumentNullException(nameof(serviceScopeFactory));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
Task.Run(ContextHousekeeping);
}

public async Task<INotificationProcessingContext> CreateContext(
IDomainEvent domainEvent,
bool isFirstAttempt = false,
CancellationToken cancellationToken = default)
{
var transaction = _contextByEventId.GetOrAdd(
key: domainEvent.EventId,
valueFactory: eventId => new(_serviceScopeFactory, eventId, onDispose: RemoveTransaction));
await transaction.Initialize(isFirstAttempt, cancellationToken);
return transaction;
var transaction = GetOrAddContext(domainEvent.EventId);
try
{
await transaction.Initialize(isFirstAttempt, cancellationToken);
return transaction;
}
catch (Exception)
{
RemoveContext(domainEvent.EventId);
throw;
}
}

public INotificationProcessingContext GetExistingContext(Guid eventId)
{
return _contextByEventId.TryGetValue(eventId, out var transaction)
? transaction
return _contextByEventId.TryGetValue(eventId, out var weakContext)
&& TryGetLiveContext(weakContext, out var context)
? context
: throw new InvalidOperationException("Notification context not found.");
}

private void RemoveTransaction(Guid eventId) => _contextByEventId.TryRemove(eventId, out _);
public void Dispose()
{
_cleanupCts.Cancel();
_cleanupCts.Dispose();
_cleanupTimer.Dispose();
}

private NotificationProcessingContext GetOrAddContext(Guid eventId)
{
var weakContext = _contextByEventId.AddOrUpdate(eventId,
addValueFactory: eventId => new(new(_serviceScopeFactory, eventId, onDispose: RemoveContext)),
// Should the context, for whatever reason, be garbage collected or
// disposed but still remain in the dictionary, we should recreate it.
updateValueFactory: (eventId, old) => TryGetLiveContext(old, out _) ? old
: new(new(_serviceScopeFactory, eventId, onDispose: RemoveContext)));

return TryGetLiveContext(weakContext, out var context) ? context
: throw new UnreachableException("The context should be alive at this point in time.");
}

private void RemoveContext(Guid eventId) => _contextByEventId.TryRemove(eventId, out _);

private async Task ContextHousekeeping()
{
try
{
while (await _cleanupTimer.WaitForNextTickAsync(_cleanupCts.Token))
{
foreach (var key in _contextByEventId.Keys)
{
if (_contextByEventId.TryGetValue(key, out var weakContext)
&& !TryGetLiveContext(weakContext, out _))
{
RemoveContext(key);
}
}
}
}
catch (OperationCanceledException)
{
// Ignore
}
catch (Exception e)
{
_logger.LogWarning(e, "An unhandled exception occurred in the notification processing context cleanup task. This may lead to memory leaks.");
}
}

private static bool TryGetLiveContext(
WeakReference<NotificationProcessingContext> weakContext,
[NotNullWhen(true)] out NotificationProcessingContext? context)
=> weakContext.TryGetTarget(out context) && !context.Disposed;
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ public IdempotentNotificationHandler(INotificationHandler<TNotification> decorat

public async Task Handle(TNotification notification, CancellationToken cancellationToken)
{
var handlerName = _decorated.GetType().FullName!;
var handlerName = _decorated.GetType().FullName ?? throw new InvalidOperationException("Could not determine the handler name.");
var transaction = _processingContextFactory.GetExistingContext(notification.EventId);
if (await transaction.HandlerIsAcked(handlerName, cancellationToken))
{
Expand Down
Loading

0 comments on commit 9386a3c

Please sign in to comment.