Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Rewind APIs #352

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions src/Client/Core/DurableTaskClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,32 @@ public virtual Task ResumeInstanceAsync(string instanceId, CancellationToken can
public abstract Task ResumeInstanceAsync(
string instanceId, string? reason = null, CancellationToken cancellation = default);

/// <summary>
/// Rewinds the specified orchestration instance to a previous, non-failed state.
/// </summary>
/// <remarks>
/// <para>
/// Only orchestrations in a failed state can be rewound. Attempting to rewind an orchestration in a non-failed
/// state may result in either a no-op or a failure depending on the backend implementation.
/// </para><para>
/// Rewind works by rewriting an orchestration's history to remove the most recent failure records, and then
/// re-executing the orchestration with the modified history. This effectively "rewinds" the orchestration to a
/// previous "good" state, allowing it to re-execute the logic that caused the original failure.
/// </para><para>
/// Rewinding an orchestration is intended to be used in cases where a failure is caused by a transient issue that
/// has since been resolved. It is not intended to be used as a general-purpose retry mechanism.
/// </para>
/// </remarks>
/// <param name="instanceId">The instance ID of the orchestration to rewind.</param>
/// <param name="reason">The optional rewind reason, which is recorded in the orchestration history.</param>
/// <param name="cancellation">
/// A <see cref="CancellationToken"/> that can be used to cancel the rewind API call. Note that cancelling this
/// token does not cancel the rewind operation once it has been successfully enqueued.
/// </param>
/// <returns>A task that completes when the rewind operation was been successfully.</returns>
public abstract Task RewindInstanceAsync(
string instanceId, string? reason = null, CancellationToken cancellation = default);

/// <inheritdoc cref="GetInstanceAsync(string, bool, CancellationToken)"/>
public virtual Task<OrchestrationMetadata?> GetInstanceAsync(
string instanceId, CancellationToken cancellation)
Expand Down
22 changes: 22 additions & 0 deletions src/Client/Grpc/GrpcDurableTaskClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,28 @@ public override async Task ResumeInstanceAsync(
}
}

/// <inheritdoc/>
public override async Task RewindInstanceAsync(
string instanceId, string? reason = null, CancellationToken cancellation = default)
{
if (string.IsNullOrEmpty(instanceId))
{
throw new ArgumentNullException(nameof(instanceId));
}

try
{
await this.sidecarClient.RewindInstanceAsync(
new P.RewindInstanceRequest { InstanceId = instanceId, Reason = reason },
cancellationToken: cancellation);
}
catch (RpcException e) when (e.StatusCode == StatusCode.Cancelled)
{
throw new OperationCanceledException(
$"The {nameof(this.RewindInstanceAsync)} operation was canceled.", e, cancellation);
}
}

/// <inheritdoc/>
public override async Task<OrchestrationMetadata?> GetInstancesAsync(
string instanceId, bool getInputsAndOutputs = false, CancellationToken cancellation = default)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,16 @@ public override Task TerminateInstanceAsync(
return this.Client.ForceTerminateTaskOrchestrationAsync(instanceId, reason);
}

/// <inheritdoc/>
public override Task RewindInstanceAsync(
string instanceId, string? reason = null, CancellationToken cancellation = default)
{
// At the time of writing, there is no IOrchestrationXXXClient interface that supports rewind.
// Rather, it's only supported by specific backend implementations like Azure Storage. Once an interface
// with rewind is added to DurableTask.Core, we can add support for it here.
throw new NotSupportedException("Rewind is not supported by the current client.");
}

/// <inheritdoc/>
public override async Task<OrchestrationMetadata> WaitForInstanceCompletionAsync(
string instanceId, bool getInputsAndOutputs = false, CancellationToken cancellation = default)
Expand Down Expand Up @@ -219,7 +229,7 @@ public override async Task<OrchestrationMetadata> WaitForInstanceStartAsync(
}
}

[return: NotNullIfNotNull("state")]
[return: NotNullIfNotNull(nameof(state))]
OrchestrationMetadata? ToMetadata(Core.OrchestrationState? state, bool getInputsAndOutputs)
{
if (state is null)
Expand Down
268 changes: 268 additions & 0 deletions test/Grpc.IntegrationTests/OrchestrationErrorHandling.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// Licensed under the MIT License.

using System.Runtime.Serialization;
using DurableTask.Core;
using Microsoft.DurableTask.Client;
using Microsoft.DurableTask.Worker;
using Xunit.Abstractions;
Expand Down Expand Up @@ -589,6 +590,273 @@ static void ValidateInnermostFailureDetailsChain(TaskFailureDetails? failureDeta
ValidateInnermostFailureDetailsChain(metadata.FailureDetails.InnerFailure.InnerFailure);
}

/// <summary>
/// Tests the behavior of a failed orchestration when it is rewound and re-executed.
/// </summary>
[Fact]
public async Task RewindSingleFailedActivity()
{
bool isBusted = true;

TaskName orchestratorName = "BustedOrchestration";
TaskName activityName = "BustedActivity";

await using HostTestLifetime server = await this.StartWorkerAsync(b =>
{
b.AddTasks(tasks =>
tasks.AddOrchestratorFunc(orchestratorName, async ctx =>
{
await ctx.CallActivityAsync(activityName);
})
.AddActivityFunc(activityName, (TaskActivityContext context) =>
{
if (isBusted)
{
throw new Exception("Kah-BOOOOOM!!!");
}
}));
});

DurableTaskClient client = server.Client;

// Start the orchestration and wait for it to fail.
string instanceId = await client.ScheduleNewOrchestrationInstanceAsync(orchestratorName);
OrchestrationMetadata metadata = await client.WaitForInstanceCompletionAsync(instanceId, this.TimeoutToken);
Assert.Equal(OrchestrationRuntimeStatus.Failed, metadata.RuntimeStatus);

// Simulate "fixing" the original problem by setting the flag to false.
isBusted = false;

// Rewind the orchestration to put it back into a running state. It should complete successfully this time.
await client.RewindInstanceAsync(instanceId, "Rewind failed orchestration");
metadata = await client.WaitForInstanceCompletionAsync(instanceId, this.TimeoutToken);
Assert.Equal(OrchestrationRuntimeStatus.Completed, metadata.RuntimeStatus);
}

/// <summary>
/// Tests the behavior of a failed orchestration when it is rewound and re-executed multiple times.
/// </summary>
[Fact]
public async Task RewindMultipleFailedActivities_Serial()
{
bool isBusted1 = true;
bool isBusted2 = true;

TaskName orchestratorName = "BustedOrchestration";
TaskName activityName1 = "BustedActivity1";
TaskName activityName2 = "BustedActivity2";

int activity1CompletionCount = 0;
int activity2CompletionCount = 0;

await using HostTestLifetime server = await this.StartWorkerAsync(b =>
{
b.AddTasks(tasks =>
tasks.AddOrchestratorFunc(orchestratorName, async ctx =>
{
// Take the result of the first activity and pass it to the second activity
int result = await ctx.CallActivityAsync<int>(activityName1);
return await ctx.CallActivityAsync<int>(activityName2, input: result);
})
.AddActivityFunc(activityName1, (TaskActivityContext context) =>
{
if (isBusted1)
{
throw new Exception("Failure1");
}

activity1CompletionCount++;
return 1;
})
.AddActivityFunc(activityName2, (TaskActivityContext context, int input) =>
{
if (isBusted2)
{
throw new Exception("Failure2");
}

activity2CompletionCount++;
return input + 1;
}));
});

DurableTaskClient client = server.Client;

// Start the orchestration and wait for it to fail with an ApplicationException.
string instanceId = await client.ScheduleNewOrchestrationInstanceAsync(orchestratorName);
OrchestrationMetadata metadata = await client.WaitForInstanceCompletionAsync(instanceId, this.TimeoutToken);
Assert.Equal(OrchestrationRuntimeStatus.Failed, metadata.RuntimeStatus);
Assert.Equal("Failure1", metadata.FailureDetails?.ErrorMessage);

// Simulate "fixing" just the first problem by setting the first flag to false.
isBusted1 = false;

// Rewind the orchestration. It should fail again, but this time with a different error message.
await client.RewindInstanceAsync(instanceId, "Rewind failed orchestration");
metadata = await client.WaitForInstanceCompletionAsync(instanceId, this.TimeoutToken);
Assert.Equal(OrchestrationRuntimeStatus.Failed, metadata.RuntimeStatus);
Assert.Equal("Failure2", metadata.FailureDetails?.ErrorMessage);

// Simulate "fixing" the second problem by setting the second flag to false.
isBusted2 = false;

// Rewind the orchestration again to put it back into a running state. It should now complete successfully.
await client.RewindInstanceAsync(instanceId, "Rewind failed orchestration");
metadata = await client.WaitForInstanceCompletionAsync(instanceId, this.TimeoutToken);
Assert.Equal(OrchestrationRuntimeStatus.Completed, metadata.RuntimeStatus);
Assert.Equal(2, metadata.ReadOutputAs<int>());

// Confirm that each activity completed exactly once (i.e. successful activity calls aren't rewound).
Assert.Equal(1, activity1CompletionCount);
Assert.Equal(1, activity2CompletionCount);
}

/// <summary>
/// Tests the behavior of a failed orchestration when multiple failures occur as part of a fan-out/fan-in, and is
/// subsequently rewound and re-executed.
/// </summary>
[Fact]
public async Task RewindMultipleFailedActivities_Parallel()
{
bool isBusted = true;

TaskName orchestratorName = "BustedOrchestration";
TaskName activityName = "BustedActivity";

await using HostTestLifetime server = await this.StartWorkerAsync(b =>
{
b.AddTasks(tasks =>
tasks.AddOrchestratorFunc(orchestratorName, async ctx =>
{
// Run the activity function multiple times in parallel
IList<Task> tasks = Enumerable.Range(0, 10)
.Select(i => ctx.CallActivityAsync(activityName))
.ToList();

// Wait for all the activity functions to complete
await Task.WhenAll(tasks);
return "Done";
})
.AddActivityFunc(activityName, (TaskActivityContext context) =>
{
if (isBusted)
{
throw new Exception("Kah-BOOOOOM!!!");
}
}));
});

DurableTaskClient client = server.Client;

// Start the orchestration and wait for it to fail.
string instanceId = await client.ScheduleNewOrchestrationInstanceAsync(orchestratorName);
OrchestrationMetadata metadata = await client.WaitForInstanceCompletionAsync(instanceId, this.TimeoutToken);
Assert.Equal(OrchestrationRuntimeStatus.Failed, metadata.RuntimeStatus);

// Simulate "fixing" the original problem by setting the flag to false.
isBusted = false;

// Rewind the orchestration to put it back into a running state. It should complete successfully this time.
await client.RewindInstanceAsync(instanceId, "Rewind failed orchestration");
metadata = await client.WaitForInstanceCompletionAsync(instanceId, this.TimeoutToken);
Assert.Equal(OrchestrationRuntimeStatus.Completed, metadata.RuntimeStatus);
Assert.Equal("Done", metadata.ReadOutputAs<string>());
}

/// <summary>
/// Tests rewinding an orchestration that failed due to a failed sub-orchestration. The sub-orchestration is fixed
/// and the parent orchestration is rewound to allow the entire chain to complete successfully.
/// </summary>
[Fact]
public async Task RewindFailedSubOrchestration()
{
bool isBusted = true;

TaskName orchestratorName = "BustedOrchestrator";
TaskName subOrchestratorName = "BustedSubOrchestrator";

await using HostTestLifetime server = await this.StartWorkerAsync(b =>
{
b.AddTasks(tasks => tasks
.AddOrchestratorFunc(orchestratorName, async ctx =>
{
await ctx.CallSubOrchestratorAsync(subOrchestratorName);
})
.AddOrchestratorFunc(subOrchestratorName, ctx =>
{
if (isBusted)
{
throw new Exception("Kah-BOOOOOM!!!");
}
}));
});

DurableTaskClient client = server.Client;

// Start the orchestration and wait for it to fail.
string instanceId = await client.ScheduleNewOrchestrationInstanceAsync(orchestratorName);
OrchestrationMetadata metadata = await client.WaitForInstanceCompletionAsync(instanceId, this.TimeoutToken);
Assert.Equal(OrchestrationRuntimeStatus.Failed, metadata.RuntimeStatus);

// Simulate "fixing" the original problem by setting the flag to false.
isBusted = false;

// Rewind the orchestration to put it back into a running state. It should complete successfully this time.
await client.RewindInstanceAsync(instanceId, "Rewind failed orchestration");
metadata = await client.WaitForInstanceCompletionAsync(instanceId, this.TimeoutToken);
Assert.Equal(OrchestrationRuntimeStatus.Completed, metadata.RuntimeStatus);
}

/// <summary>
/// Tests rewinding an orchestration that failed due to a failed sub-orchestration, which itself failed due to an
/// activity. The entire orchestration chain is expected to fail, and rewinding the parent orchestration should
/// allow the entire chain to complete successfully.
/// </summary>
[Fact]
public async Task RewindFailedSubOrchestrationWithActivity()
{
bool isBusted = true;

TaskName orchestratorName = "BustedOrchestrator";
TaskName subOrchestratorName = "BustedSubOrchestrator";
TaskName activityName = "BustedActivity";

await using HostTestLifetime server = await this.StartWorkerAsync(b =>
{
b.AddTasks(tasks => tasks
.AddOrchestratorFunc(orchestratorName, async ctx =>
{
await ctx.CallSubOrchestratorAsync(subOrchestratorName);
})
.AddOrchestratorFunc(subOrchestratorName, async ctx =>
{
await ctx.CallActivityAsync(activityName);
})
.AddActivityFunc(activityName, (TaskActivityContext _) =>
{
if (isBusted)
{
throw new Exception("Kah-BOOOOOM!!!");
}
}));
});

DurableTaskClient client = server.Client;

// Start the orchestration and wait for it to fail.
string instanceId = await client.ScheduleNewOrchestrationInstanceAsync(orchestratorName);
OrchestrationMetadata metadata = await client.WaitForInstanceCompletionAsync(instanceId, this.TimeoutToken);
Assert.Equal(OrchestrationRuntimeStatus.Failed, metadata.RuntimeStatus);

// Simulate "fixing" the original problem by setting the flag to false.
isBusted = false;

// Rewind the orchestration to put it back into a running state. It should complete successfully this time.
await client.RewindInstanceAsync(instanceId, "Rewind failed orchestration");
metadata = await client.WaitForInstanceCompletionAsync(instanceId, this.TimeoutToken);
Assert.Equal(OrchestrationRuntimeStatus.Completed, metadata.RuntimeStatus);
}

static Exception MakeException(Type exceptionType, string message)
{
// We assume the contructor of the exception type takes a single string argument
Expand Down
Loading