Skip to content

Commit

Permalink
Hedging strategy also deep-copies context for primary execution (#1754)
Browse files Browse the repository at this point in the history
  • Loading branch information
martintmk authored Oct 30, 2023
1 parent 680705e commit 2cbe4f0
Show file tree
Hide file tree
Showing 12 changed files with 77 additions and 148 deletions.
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
```
BenchmarkDotNet v0.13.7, Windows 11 (10.0.22621.2283/22H2/2022Update/SunValley2) (Hyper-V)
BenchmarkDotNet v0.13.9+228a464e8be6c580ad9408e98f18813f6407fb5a, Windows 11 (10.0.22621.2428/22H2/2022Update/SunValley2) (Hyper-V)
Intel Xeon Platinum 8370C CPU 2.80GHz, 1 CPU, 16 logical and 8 physical cores
.NET SDK 7.0.401
[Host] : .NET 7.0.11 (7.0.1123.42427), X64 RyuJIT AVX2
.NET SDK 7.0.403
[Host] : .NET 7.0.13 (7.0.1323.51816), X64 RyuJIT AVX2
Job=MediumRun Toolchain=InProcessEmitToolchain IterationCount=15
LaunchCount=2 WarmupCount=10
```
| Method | Mean | Error | StdDev | Ratio | RatioSD | Gen0 | Allocated | Alloc Ratio |
|---------------------------- |---------:|----------:|----------:|------:|--------:|-------:|----------:|------------:|
| Hedging_Primary | 1.432 μs | 0.0042 μs | 0.0061 μs | 1.00 | 0.00 | - | 40 B | 1.00 |
| Hedging_Secondary | 2.253 μs | 0.0051 μs | 0.0075 μs | 1.57 | 0.01 | 0.0038 | 184 B | 4.60 |
| Hedging_Primary_AsyncWork | 3.903 μs | 0.0260 μs | 0.0381 μs | 2.73 | 0.03 | 0.0610 | 1636 B | 40.90 |
| Hedging_Secondary_AsyncWork | 4.936 μs | 0.0424 μs | 0.0595 μs | 3.45 | 0.05 | 0.0687 | 1838 B | 45.95 |
| Method | Mean | Error | StdDev | Ratio | RatioSD | Gen0 | Allocated | Alloc Ratio |
|---------------------------- |----------:|----------:|----------:|------:|--------:|-------:|----------:|------------:|
| Hedging_Primary | 1.284 μs | 0.0047 μs | 0.0066 μs | 1.00 | 0.00 | - | 40 B | 1.00 |
| Hedging_Secondary | 2.007 μs | 0.0043 μs | 0.0064 μs | 1.56 | 0.01 | 0.0038 | 184 B | 4.60 |
| Hedging_Primary_AsyncWork | 33.379 μs | 1.9491 μs | 2.9173 μs | 26.43 | 1.90 | 0.6104 | 15299 B | 382.48 |
| Hedging_Secondary_AsyncWork | 33.735 μs | 0.2915 μs | 0.4273 μs | 26.28 | 0.43 | 0.6104 | 15431 B | 385.77 |
3 changes: 1 addition & 2 deletions docs/strategies/hedging.md
Original file line number Diff line number Diff line change
Expand Up @@ -449,8 +449,7 @@ The hedging strategy supports the concurrent execution and cancellation of multi

Here's the flow:

- The strategy gets the primary context and creates a snapshot for deep cloning.
- For each hedged action execution, the hedging strategy makes a deep copy of the original context. The deep copy has its own cancellation token designated for that execution. Note that the first execution (primary) uses the original resilience context, albeit with a cloned set of resilience properties.
- The strategy gets the primary context and preserves it for deep-cloning.
- After the strategy has an accepted result from a hedged action, the resilience context from the action is merged back into the primary context.
- All ongoing hedged actions are cancelled and discarded. The hedging strategy awaits the propagation of cancellation.

Expand Down
3 changes: 0 additions & 3 deletions src/Polly.Core/Hedging/Controller/ContextSnapshot.cs

This file was deleted.

27 changes: 8 additions & 19 deletions src/Polly.Core/Hedging/Controller/HedgingExecutionContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ internal sealed class HedgingExecutionContext<T> : IAsyncDisposable
private readonly TimeProvider _timeProvider;
private readonly int _maxAttempts;
private readonly Action<HedgingExecutionContext<T>> _onReset;
private readonly ResilienceProperties _replacedProperties = new();

public HedgingExecutionContext(
ObjectPool<TaskExecution<T>> executionPool,
Expand All @@ -30,22 +29,17 @@ public HedgingExecutionContext(
_onReset = onReset;
}

internal void Initialize(ResilienceContext context)
{
Snapshot = new ContextSnapshot(context, context.Properties, context.CancellationToken);
_replacedProperties.Replace(Snapshot.OriginalProperties);
Snapshot.Context.Properties = _replacedProperties;
}
internal void Initialize(ResilienceContext context) => PrimaryContext = context;

public int LoadedTasks => _tasks.Count;

public ContextSnapshot Snapshot { get; private set; }
public ResilienceContext? PrimaryContext { get; private set; }

public bool IsInitialized => Snapshot.Context != null;
public bool IsInitialized => PrimaryContext != null;

public IReadOnlyList<TaskExecution<T>> Tasks => _tasks;

private bool ContinueOnCapturedContext => Snapshot.Context.ContinueOnCapturedContext;
private bool ContinueOnCapturedContext => PrimaryContext!.ContinueOnCapturedContext;

public async ValueTask<ExecutionInfo<T>> LoadExecutionAsync<TState>(
Func<ResilienceContext, TState, ValueTask<Outcome<T>>> primaryCallback,
Expand All @@ -65,7 +59,7 @@ public async ValueTask<ExecutionInfo<T>> LoadExecutionAsync<TState>(

var execution = _executionPool.Get();

if (await execution.InitializeAsync(type, Snapshot, primaryCallback, state, LoadedTasks).ConfigureAwait(ContinueOnCapturedContext))
if (await execution.InitializeAsync(type, PrimaryContext!, primaryCallback, state, LoadedTasks).ConfigureAwait(ContinueOnCapturedContext))
{
// we were able to start a new execution, register it
_tasks.Add(execution);
Expand Down Expand Up @@ -126,7 +120,7 @@ public async ValueTask DisposeAsync()
return TryRemoveExecutedTask();
}

using var delayTaskCancellation = CancellationTokenSource.CreateLinkedTokenSource(Snapshot.Context.CancellationToken);
using var delayTaskCancellation = CancellationTokenSource.CreateLinkedTokenSource(PrimaryContext!.CancellationToken);

var delayTask = _timeProvider.Delay(hedgingDelay, delayTaskCancellation.Token);
Task<Task> whenAnyHedgedTask = WaitForTaskCompetitionAsync();
Expand Down Expand Up @@ -192,10 +186,6 @@ static async Task<Task> AwaitTask(TaskExecution<T> task, bool continueOnCaptured

private void UpdateOriginalContext()
{
var originalContext = Snapshot.Context;
originalContext.CancellationToken = Snapshot.OriginalCancellationToken;
originalContext.Properties = Snapshot.OriginalProperties;

if (LoadedTasks == 0)
{
return;
Expand All @@ -205,17 +195,16 @@ private void UpdateOriginalContext()

if (Tasks.FirstOrDefault(static t => t.IsAccepted) is TaskExecution<T> acceptedExecution)
{
originalContext.Properties.Replace(acceptedExecution.Properties);
PrimaryContext!.Properties.AddOrReplaceProperties(acceptedExecution.Context.Properties);
}
}

private void Reset()
{
_replacedProperties.Clear();
_tasks.Clear();

_executingTasks.Clear();
Snapshot = default;
PrimaryContext = null;

_onReset(this);
}
Expand Down
36 changes: 7 additions & 29 deletions src/Polly.Core/Hedging/Controller/TaskExecution.cs
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,6 @@ public TaskExecution(HedgingHandler<T> handler, CancellationTokenSourcePool canc

public bool IsAccepted { get; private set; }

public ResilienceProperties Properties { get; } = new();

public ResilienceContext Context => _activeContext ?? throw new InvalidOperationException("TaskExecution is not initialized.");

public HedgedTaskType Type { get; set; }
Expand Down Expand Up @@ -89,7 +87,7 @@ public void Cancel()

public async ValueTask<bool> InitializeAsync<TState>(
HedgedTaskType type,
ContextSnapshot snapshot,
ResilienceContext primaryContext,
Func<ResilienceContext, TState, ValueTask<Outcome<T>>> primaryCallback,
TState state,
int attemptNumber)
Expand All @@ -98,22 +96,21 @@ public async ValueTask<bool> InitializeAsync<TState>(
Type = type;
_cancellationSource = _cancellationTokenSourcePool.Get(System.Threading.Timeout.InfiniteTimeSpan);
_startExecutionTimestamp = _timeProvider.GetTimestamp();
Properties.Replace(snapshot.OriginalProperties);
_activeContext = _cachedContext;
_activeContext.InitializeFrom(primaryContext, _cancellationSource!.Token);

if (snapshot.OriginalCancellationToken.CanBeCanceled)
if (primaryContext.CancellationToken.CanBeCanceled)
{
_cancellationRegistration = snapshot.OriginalCancellationToken.Register(o => ((CancellationTokenSource)o!).Cancel(), _cancellationSource);
_cancellationRegistration = primaryContext.CancellationToken.Register(o => ((CancellationTokenSource)o!).Cancel(), _cancellationSource);
}

PrepareContext(ref snapshot);

if (type == HedgedTaskType.Secondary)
{
Func<ValueTask<Outcome<T>>>? action = null;

try
{
action = _handler.GenerateAction(CreateArguments(primaryCallback, snapshot.Context, state, attemptNumber));
action = _handler.GenerateAction(CreateArguments(primaryCallback, primaryContext, state, attemptNumber));
if (action == null)
{
await ResetAsync().ConfigureAwait(false);
Expand All @@ -127,7 +124,7 @@ public async ValueTask<bool> InitializeAsync<TState>(
return true;
}

await HandleOnHedgingAsync(snapshot.Context, attemptNumber - 1).ConfigureAwait(Context.ContinueOnCapturedContext);
await HandleOnHedgingAsync(primaryContext, attemptNumber - 1).ConfigureAwait(Context.ContinueOnCapturedContext);

ExecutionTaskSafe = ExecuteSecondaryActionAsync(action);
}
Expand Down Expand Up @@ -193,7 +190,6 @@ public async ValueTask ResetAsync()
IsAccepted = false;
Outcome = default;
IsHandled = false;
Properties.Clear();
OnReset = null;
AttemptNumber = 0;
_activeContext = null;
Expand Down Expand Up @@ -246,22 +242,4 @@ private async Task UpdateOutcomeAsync(Outcome<T> outcome)
IsHandled = await _handler.ShouldHandle(args).ConfigureAwait(Context.ContinueOnCapturedContext);
TelemetryUtil.ReportExecutionAttempt(_telemetry, Context, outcome, AttemptNumber, ExecutionTime, IsHandled);
}

private void PrepareContext(ref ContextSnapshot snapshot)
{
if (Type == HedgedTaskType.Primary)
{
// now just replace the properties
_activeContext = snapshot.Context;
}
else
{
// secondary hedged tasks get their own unique context
_activeContext = _cachedContext;
_activeContext.InitializeFrom(snapshot.Context);
}

_activeContext.Properties = Properties;
_activeContext.CancellationToken = _cancellationSource!.Token;
}
}
6 changes: 4 additions & 2 deletions src/Polly.Core/ResilienceContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -61,15 +61,17 @@ internal ResilienceContext()
/// <summary>
/// Gets the custom properties attached to the context.
/// </summary>
public ResilienceProperties Properties { get; internal set; } = new();
public ResilienceProperties Properties { get; } = new();

internal void InitializeFrom(ResilienceContext context)
internal void InitializeFrom(ResilienceContext context, CancellationToken cancellationToken)
{
OperationKey = context.OperationKey;
ResultType = context.ResultType;
IsSynchronous = context.IsSynchronous;
CancellationToken = context.CancellationToken;
ContinueOnCapturedContext = context.ContinueOnCapturedContext;
CancellationToken = cancellationToken;
Properties.AddOrReplaceProperties(context.Properties);
}

[ExcludeFromCodeCoverage]
Expand Down
6 changes: 1 addition & 5 deletions src/Polly.Core/ResilienceProperties.cs
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,8 @@ public void Set<TValue>(ResiliencePropertyKey<TValue> key, TValue value)
Options[key.Key] = value;
}

internal void Replace(ResilienceProperties other)
internal void AddOrReplaceProperties(ResilienceProperties other)
{
Clear();

// try to avoid enumerator allocation
if (other.Options is Dictionary<string, object?> otherOptions)
{
Expand All @@ -78,7 +76,5 @@ internal void Replace(ResilienceProperties other)
}
}
}

internal void Clear() => Options.Clear();
}

Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public void Ctor_Ok()
var context = Create();

context.LoadedTasks.Should().Be(0);
context.Snapshot.Context.Should().BeNull();
context.PrimaryContext.Should().BeNull();

context.Should().NotBeNull();
}
Expand All @@ -67,11 +67,7 @@ public void Initialize_Ok()

context.Initialize(_resilienceContext);

context.Snapshot.Context.Should().Be(_resilienceContext);
context.Snapshot.Context.Properties.Should().NotBeSameAs(props);
context.Snapshot.OriginalProperties.Should().BeSameAs(props);
context.Snapshot.OriginalCancellationToken.Should().Be(_cts.Token);
context.Snapshot.Context.Properties.Options.Should().HaveCount(1);
context.PrimaryContext.Should().Be(_resilienceContext);
context.IsInitialized.Should().BeTrue();
}

Expand Down Expand Up @@ -409,7 +405,7 @@ public async Task Complete_EnsureCleaned()
await context.DisposeAsync();

context.LoadedTasks.Should().Be(0);
context.Snapshot.Context.Should().BeNull();
context.PrimaryContext!.Should().BeNull();

_onReset.WaitOne(AssertTimeout);
_resets.Count.Should().Be(1);
Expand Down
Loading

0 comments on commit 2cbe4f0

Please sign in to comment.