Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding Configurable Exponential Backoff in Pull Delivery #150

Merged
merged 5 commits into from
Jul 18, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ public async Task GivenTwoEventsReceived_WhenHandleThrowUnhandledException_ThenE
public async Task GivenMultipleEventsReceivedWithCustomRetryDelays_WhenHandleThrowUnhandledException_ThenEventsAreReleasedWithDelay()
{
// Given
var client = this.GivenClient(options: GenerateOptions(retryDelays: [1, 2, 4, 8]));
var client = this.GivenClient(options: GenerateOptions(retryDelays: [TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(2), TimeSpan.FromSeconds(4), TimeSpan.FromSeconds(8)]));
var events = this.GivenEventsForClient(client, GenerateEvent(), GenerateEvent(deliveryCount: 3), GenerateEvent(deliveryCount: 4), GenerateEvent(deliveryCount: 5));
this.GivenClientFailsHandlingEvents(client);

Expand Down Expand Up @@ -152,8 +152,8 @@ private EventPullerClient GivenClient(string clientName = "client", EventPropaga
A.CallTo(() => client.AcknowledgeCloudEventsAsync(A<string>._, A<string>._, A<IEnumerable<string>>._, A<CancellationToken>._))
.Invokes(x => this.OnEventCompleted(eventHandlingResults.AcknowledgedEvents, x.GetArgument<IEnumerable<string>>(2)!));

A.CallTo(() => client.ReleaseCloudEventsAsync(A<string>._, A<string>._, A<IEnumerable<string>>._, A<int>._, A<CancellationToken>._))
.Invokes(x => this.OnEventCompleted(eventHandlingResults.ReleasedEvents, x.GetArgument<IEnumerable<string>>(2)!.Select(y => (y, x.GetArgument<int>(3)))));
A.CallTo(() => client.ReleaseCloudEventsAsync(A<string>._, A<string>._, A<IEnumerable<string>>._, A<TimeSpan>._, A<CancellationToken>._))
.Invokes(x => this.OnEventCompleted(eventHandlingResults.ReleasedEvents, x.GetArgument<IEnumerable<string>>(2)!.Select(y => (y, x.GetArgument<TimeSpan>(3)))));

A.CallTo(() => client.RejectCloudEventsAsync(A<string>._, A<string>._, A<IEnumerable<string>>._, A<CancellationToken>._))
.Invokes(x => this.OnEventCompleted(eventHandlingResults.RejectedEvents, x.GetArgument<IEnumerable<string>>(2)!));
Expand Down Expand Up @@ -241,7 +241,7 @@ private void ThenClientReleasedEvents(EventPullerClient client, params EventBund
{
if (client.Options.RetryDelays == null)
{
Assert.Contains(client.EventHandlingResult.ReleasedEvents, x => x.LockToken == eventBundle.LockToken && x.ReleaseDelay == (int)Math.Pow(2, eventBundle.DeliveryCount - 1));
Assert.Contains(client.EventHandlingResult.ReleasedEvents, x => x.LockToken == eventBundle.LockToken && x.ReleaseDelay == TimeSpan.FromSeconds((int)Math.Pow(2, eventBundle.DeliveryCount - 1)));
}
else
{
Expand All @@ -256,7 +256,7 @@ private void ThenClientRejectedEvents(EventPullerClient client, params EventBund
Assert.True(events.All(x => client.EventHandlingResult.RejectedEvents.Contains(x.LockToken)));
}

private static EventPropagationSubscriptionOptions GenerateOptions(string id = "id", int[]? retryDelays = null)
private static EventPropagationSubscriptionOptions GenerateOptions(string id = "id", TimeSpan[]? retryDelays = null)
{
return new EventPropagationSubscriptionOptions
{
Expand Down Expand Up @@ -289,5 +289,5 @@ protected virtual void Dispose(bool disposing)

internal sealed record EventPullerClient(string Name, IEventGridClientAdapter Client, ICloudEventHandler EventHandler, EventPropagationSubscriptionOptions Options, EventHandlingResult EventHandlingResult);

internal sealed record EventHandlingResult(List<string> AcknowledgedEvents, List<(string LockToken, int ReleaseDelay)> ReleasedEvents, List<string> RejectedEvents);
internal sealed record EventHandlingResult(List<string> AcknowledgedEvents, List<(string LockToken, TimeSpan ReleaseDelay)> ReleasedEvents, List<string> RejectedEvents);
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@ public Task AcknowledgeCloudEventsAsync(string topicName, string eventSubscripti
return this._adaptee.AcknowledgeCloudEventsAsync(topicName, eventSubscriptionName, new AcknowledgeOptions(lockTokens), cancellationToken);
}

public Task ReleaseCloudEventsAsync(string topicName, string eventSubscriptionName, IEnumerable<string> lockTokens, int releaseDelayInSeconds, CancellationToken cancellationToken)
public Task ReleaseCloudEventsAsync(string topicName, string eventSubscriptionName, IEnumerable<string> lockTokens, TimeSpan releaseDelay, CancellationToken cancellationToken)
{
return this._adaptee.ReleaseCloudEventsAsync(topicName, eventSubscriptionName, new ReleaseOptions(lockTokens), releaseDelayInSeconds, cancellationToken);
return this._adaptee.ReleaseCloudEventsAsync(topicName, eventSubscriptionName, new ReleaseOptions(lockTokens), releaseDelay.Seconds, cancellationToken);
}

public Task RejectCloudEventsAsync(string topicName, string eventSubscriptionName, IEnumerable<string> lockTokens, CancellationToken cancellationToken)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ internal interface IEventGridClientAdapter

Task AcknowledgeCloudEventsAsync(string topicName, string eventSubscriptionName, IEnumerable<string> lockTokens, CancellationToken cancellationToken);

Task ReleaseCloudEventsAsync(string topicName, string eventSubscriptionName, IEnumerable<string> lockTokens, int releaseDelayInSeconds, CancellationToken cancellationToken);
Task ReleaseCloudEventsAsync(string topicName, string eventSubscriptionName, IEnumerable<string> lockTokens, TimeSpan releaseDelay, CancellationToken cancellationToken);

Task RejectCloudEventsAsync(string topicName, string eventSubscriptionName, IEnumerable<string> lockTokens, CancellationToken cancellationToken);
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,5 @@ public class EventPropagationSubscriptionOptions

public int MaxDegreeOfParallelism { get; set; } = 1;

public IReadOnlyCollection<int>? RetryDelays { get; set; }
public IReadOnlyCollection<TimeSpan>? RetryDelays { get; set; }
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ private class EventGridSubscriptionEventPuller
private readonly ILogger<EventPullerService> _logger;
private readonly EventGridTopicSubscription _eventGridTopicSubscription;

private readonly Dictionary<int, TimeSpan>? _retryDelays;
private readonly TimeSpan? _defaultDelay;

private readonly Channel<EventBundle> _acknowledgeEventChannel = Channel.CreateBounded<EventBundle>(OutputChannelSize);
private readonly Channel<EventBundle> _releaseEventChannel = Channel.CreateBounded<EventBundle>(OutputChannelSize);
private readonly Channel<EventBundle> _rejectEventChannel = Channel.CreateBounded<EventBundle>(OutputChannelSize);
Expand All @@ -63,6 +66,15 @@ public EventGridSubscriptionEventPuller(
this._serviceScopeFactory = serviceScopeFactory;
this._logger = logger;
this._eventGridTopicSubscription = eventGridTopicSubscription;

if (eventGridTopicSubscription.RetryDelays != null)
{
this._retryDelays = eventGridTopicSubscription.RetryDelays
meziantou marked this conversation as resolved.
Show resolved Hide resolved
.Select((x, index) => new { DeliveryCount = index + 1, Delay = x })
.ToDictionary(x => x.DeliveryCount, x => x.Delay);

this._defaultDelay = eventGridTopicSubscription.RetryDelays.Last();
Le-Merle marked this conversation as resolved.
Show resolved Hide resolved
}
}

public Task StartReceivingEventsAsync(CancellationToken cancellationToken)
Expand Down Expand Up @@ -212,16 +224,9 @@ private ValueTask WaitForAvailableHandler(CancellationToken cancellationToken)
}
}

private int GetReleaseDelay(int deliveryCount)
private TimeSpan GetReleaseDelay(int deliveryCount)
{
if (this._eventGridTopicSubscription.RetryDelays == null)
{
return (int)Math.Min(Math.Pow(2, deliveryCount - 1), int.MaxValue);
}

return deliveryCount - 1 < this._eventGridTopicSubscription.RetryDelays.Count
? this._eventGridTopicSubscription.RetryDelays[deliveryCount - 1]
: this._eventGridTopicSubscription.RetryDelays.Last();
return this._retryDelays?.GetValueOrDefault(deliveryCount, this._defaultDelay!.Value) ?? TimeSpan.FromSeconds((int)Math.Min(Math.Pow(2, deliveryCount - 1), int.MaxValue));
}

private static IEnumerable<EventBundle> ReadCurrentContent(Channel<EventBundle> channel)
Expand All @@ -241,5 +246,5 @@ private static IEnumerable<EventBundle> ReadCurrentContent(Channel<EventBundle>
}
}

private record EventGridTopicSubscription(string TopicName, string SubscriptionName, int MaxHandlerDop, List<int>? RetryDelays, IEventGridClientAdapter Client);
private record EventGridTopicSubscription(string TopicName, string SubscriptionName, int MaxHandlerDop, List<TimeSpan>? RetryDelays, IEventGridClientAdapter Client);
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ Workleap.DomainEventPropagation.EventPropagationSubscriptionOptions.TopicName.ge
Workleap.DomainEventPropagation.EventPropagationSubscriptionOptions.TopicName.set -> void
Workleap.DomainEventPropagation.EventPropagationSubscriptionOptions.MaxDegreeOfParallelism.get -> int
Workleap.DomainEventPropagation.EventPropagationSubscriptionOptions.MaxDegreeOfParallelism.set -> void
Workleap.DomainEventPropagation.EventPropagationSubscriptionOptions.RetryDelays.get -> System.Collections.Generic.IReadOnlyCollection<int>?
Workleap.DomainEventPropagation.EventPropagationSubscriptionOptions.RetryDelays.get -> System.Collections.Generic.IReadOnlyCollection<System.TimeSpan>?
Workleap.DomainEventPropagation.EventPropagationSubscriptionOptions.RetryDelays.set -> void
Workleap.DomainEventPropagation.EventPropagationSubscriptionOptionsValidator
Workleap.DomainEventPropagation.EventPropagationSubscriptionOptionsValidator.EventPropagationSubscriptionOptionsValidator() -> void
Expand Down