diff --git a/examples/src/main/java/io/dapr/examples/workflows/DemoWorkflow.java b/examples/src/main/java/io/dapr/examples/workflows/DemoWorkflow.java index 954c8c7e1..167aa3e04 100644 --- a/examples/src/main/java/io/dapr/examples/workflows/DemoWorkflow.java +++ b/examples/src/main/java/io/dapr/examples/workflows/DemoWorkflow.java @@ -92,14 +92,33 @@ public void run(WorkflowContext ctx) { ctx.getLogger().info("Activity returned: " + output.getOriginalMessage()); - ctx.getLogger().info("Child-Workflow> Calling ChildWorkflow..."); - var childWorkflowInput = "Hello ChildWorkflow!"; - var childWorkflowOutput = ctx.callSubWorkflow(DemoSubWorkflow.class.getName(), childWorkflowInput, - String.class).await(); + boolean shouldComplete = true; + ctx.getLogger().info("Waiting for event: 'RestartEvent'..."); + try { + ctx.waitForExternalEvent("RestartEvent", Duration.ofSeconds(10)).await(); + ctx.getLogger().info("Received RestartEvent"); + ctx.getLogger().info("Restarting Workflow by calling continueAsNew..."); + ctx.continueAsNew("TestInputRestart", false); + shouldComplete = false; + } catch (TaskCanceledException e) { + ctx.getLogger().warn("Restart Timed out"); + ctx.getLogger().warn(e.getMessage()); + } - ctx.getLogger().info("Child-Workflow> returned: " + childWorkflowOutput); + if (shouldComplete) { + ctx.getLogger().info("Child-Workflow> Calling ChildWorkflow..."); + var childWorkflowInput = "Hello ChildWorkflow!"; + var childWorkflowOutput = + ctx.callSubWorkflow(DemoSubWorkflow.class.getName(), childWorkflowInput, String.class).await(); + + ctx.getLogger().info("Child-Workflow> returned: " + childWorkflowOutput); + + ctx.getLogger().info("Workflow finished"); + ctx.complete("finished"); + + return; + } - ctx.getLogger().info("Workflow finished"); - ctx.complete("finished"); + ctx.getLogger().info("Workflow restarted"); } } diff --git a/examples/src/main/java/io/dapr/examples/workflows/DemoWorkflowClient.java b/examples/src/main/java/io/dapr/examples/workflows/DemoWorkflowClient.java index e29616a03..b21566939 100644 --- a/examples/src/main/java/io/dapr/examples/workflows/DemoWorkflowClient.java +++ b/examples/src/main/java/io/dapr/examples/workflows/DemoWorkflowClient.java @@ -104,6 +104,19 @@ public static void main(String[] args) throws InterruptedException { System.out.println("Terminate this workflow instance manually before the timeout is reached"); client.terminateWorkflow(instanceToTerminateId, null); System.out.println(separatorStr); + + String restartingInstanceId = "restarting"; + client.scheduleNewWorkflow(DemoWorkflow.class, null, restartingInstanceId); + System.out.printf("Started new workflow instance with ID: %s%n", restartingInstanceId); + System.out.println("Sleeping 30 seconds to restart the workflow"); + TimeUnit.SECONDS.sleep(30); + + System.out.println("**SendExternalMessage: RestartEvent**"); + client.raiseEvent(restartingInstanceId, "RestartEvent", "RestartEventPayload"); + + System.out.println("Sleeping 30 seconds to terminate the eternal workflow"); + TimeUnit.SECONDS.sleep(30); + client.terminateWorkflow(restartingInstanceId, null); } System.out.println("Exiting DemoWorkflowClient."); diff --git a/sdk-workflows/src/main/java/io/dapr/workflows/runtime/DaprWorkflowContextImpl.java b/sdk-workflows/src/main/java/io/dapr/workflows/runtime/DaprWorkflowContextImpl.java index 10ed8436a..289ba7d2a 100644 --- a/sdk-workflows/src/main/java/io/dapr/workflows/runtime/DaprWorkflowContextImpl.java +++ b/sdk-workflows/src/main/java/io/dapr/workflows/runtime/DaprWorkflowContextImpl.java @@ -23,6 +23,7 @@ import org.slf4j.helpers.NOPLogger; import javax.annotation.Nullable; + import java.time.Duration; import java.time.Instant; import java.util.List; @@ -145,6 +146,7 @@ public Task createTimer(Duration duration) { return this.innerContext.createTimer(duration); } + /** * {@inheritDoc} */ @@ -157,9 +159,24 @@ public T getInput(Class targetType) { */ @Override public Task callSubWorkflow(String name, @Nullable Object input, @Nullable String instanceID, - @Nullable TaskOptions options, Class returnType) { + @Nullable TaskOptions options, Class returnType) { return this.innerContext.callSubOrchestrator(name, input, instanceID, options, returnType); } + /** + * {@inheritDoc} + */ + @Override + public void continueAsNew(Object input) { + this.innerContext.continueAsNew(input); + } + + /** + * {@inheritDoc} + */ + @Override + public void continueAsNew(Object input, boolean preserveUnprocessedEvents) { + this.innerContext.continueAsNew(input, preserveUnprocessedEvents); + } } diff --git a/sdk-workflows/src/main/java/io/dapr/workflows/runtime/WorkflowContext.java b/sdk-workflows/src/main/java/io/dapr/workflows/runtime/WorkflowContext.java index 22e6c954b..c8b90d6ce 100644 --- a/sdk-workflows/src/main/java/io/dapr/workflows/runtime/WorkflowContext.java +++ b/sdk-workflows/src/main/java/io/dapr/workflows/runtime/WorkflowContext.java @@ -21,6 +21,7 @@ import org.slf4j.Logger; import javax.annotation.Nullable; + import java.time.Duration; import java.time.Instant; import java.time.ZonedDateTime; @@ -269,10 +270,9 @@ default Task callActivity(String name, Object input, TaskOptions options) * } * * @param tasks the list of {@code Task} objects - * @param the return type of the {@code Task} objects + * @param the return type of the {@code Task} objects * @return the values of the completed {@code Task} objects in the same order as the source list * @throws CompositeTaskFailedException if the specified {@code timeout} value expires before the event is received - * */ Task> allOf(List> tasks) throws CompositeTaskFailedException; @@ -353,11 +353,12 @@ default Task createTimer(ZonedDateTime zonedDateTime) { throw new UnsupportedOperationException("This method is not implemented."); } + /** * Gets the deserialized input of the current task orchestration. * * @param targetType the {@link Class} object associated with {@code V} - * @param the expected type of the workflow input + * @param the expected type of the workflow input * @return the deserialized input as an object of type {@code V} or {@code null} if no input was provided. */ V getInput(Class targetType); @@ -365,12 +366,12 @@ default Task createTimer(ZonedDateTime zonedDateTime) { /** * Asynchronously invokes another workflow as a sub-workflow and returns a {@link Task} that completes * when the sub-workflow completes. - * + * *

See {@link #callSubWorkflow(String, Object, String, TaskOptions, Class)} for a full description. * - * @see #callSubWorkflow(String, Object, String, TaskOptions, Class) * @param name the name of the workflow to invoke * @return a new {@link Task} that completes when the sub-workflow completes or fails + * @see #callSubWorkflow(String, Object, String, TaskOptions, Class) */ default Task callSubWorkflow(String name) { return this.callSubWorkflow(name, null); @@ -379,10 +380,10 @@ default Task callSubWorkflow(String name) { /** * Asynchronously invokes another workflow as a sub-workflow and returns a {@link Task} that completes * when the sub-workflow completes. - * + * *

See {@link #callSubWorkflow(String, Object, String, TaskOptions, Class)} for a full description. * - * @param name the name of the workflow to invoke + * @param name the name of the workflow to invoke * @param input the serializable input to send to the sub-workflow * @return a new {@link Task} that completes when the sub-workflow completes or fails */ @@ -393,13 +394,13 @@ default Task callSubWorkflow(String name, Object input) { /** * Asynchronously invokes another workflow as a sub-workflow and returns a {@link Task} that completes * when the sub-workflow completes. - * + * *

See {@link #callSubWorkflow(String, Object, String, TaskOptions, Class)} for a full description. * - * @param name the name of the workflow to invoke - * @param input the serializable input to send to the sub-workflow + * @param name the name of the workflow to invoke + * @param input the serializable input to send to the sub-workflow * @param returnType the expected class type of the sub-workflow output - * @param the expected type of the sub-workflow output + * @param the expected type of the sub-workflow output * @return a new {@link Task} that completes when the sub-workflow completes or fails */ default Task callSubWorkflow(String name, Object input, Class returnType) { @@ -409,14 +410,14 @@ default Task callSubWorkflow(String name, Object input, Class returnTy /** * Asynchronously invokes another workflow as a sub-workflow and returns a {@link Task} that completes * when the sub-workflow completes. - * + * *

See {@link #callSubWorkflow(String, Object, String, TaskOptions, Class)} for a full description. * - * @param name the name of the workflow to invoke - * @param input the serializable input to send to the sub-workflow + * @param name the name of the workflow to invoke + * @param input the serializable input to send to the sub-workflow * @param instanceID the unique ID of the sub-workflow * @param returnType the expected class type of the sub-workflow output - * @param the expected type of the sub-workflow output + * @param the expected type of the sub-workflow output * @return a new {@link Task} that completes when the sub-workflow completes or fails */ default Task callSubWorkflow(String name, Object input, String instanceID, Class returnType) { @@ -426,13 +427,13 @@ default Task callSubWorkflow(String name, Object input, String instanceID /** * Asynchronously invokes another workflow as a sub-workflow and returns a {@link Task} that completes * when the sub-workflow completes. - * + * *

See {@link #callSubWorkflow(String, Object, String, TaskOptions, Class)} for a full description. * - * @param name the name of the workflow to invoke - * @param input the serializable input to send to the sub-workflow + * @param name the name of the workflow to invoke + * @param input the serializable input to send to the sub-workflow * @param instanceID the unique ID of the sub-workflow - * @param options additional options that control the execution and processing of the activity + * @param options additional options that control the execution and processing of the activity * @return a new {@link Task} that completes when the sub-workflow completes or fails */ default Task callSubWorkflow(String name, Object input, String instanceID, TaskOptions options) { @@ -444,7 +445,7 @@ default Task callSubWorkflow(String name, Object input, String instanceID, * when the sub-workflow completes. If the sub-workflow completes successfully, the returned * {@code Task}'s value will be the activity's output. If the sub-workflow fails, the returned {@code Task} * will complete exceptionally with a {@link TaskFailedException}. - * + * *

A sub-workflow has its own instance ID, history, and status that is independent of the parent workflow * that started it. There are many advantages to breaking down large orchestrations into sub-workflows: *

    @@ -461,22 +462,58 @@ default Task callSubWorkflow(String name, Object input, String instanceID, *
* The disadvantage is that there is overhead associated with starting a sub-workflow and processing its * output. This is typically only an issue for very small orchestrations. - * + * *

Because sub-workflows are independent of their parents, terminating a parent orchestration does not affect * any sub-workflows. sub-workflows must be terminated independently using their unique instance ID, * which is specified using the {@code instanceID} parameter * - * @param name the name of the workflow to invoke - * @param input the serializable input to send to the sub-workflow + * @param name the name of the workflow to invoke + * @param input the serializable input to send to the sub-workflow * @param instanceID the unique ID of the sub-workflow - * @param options additional options that control the execution and processing of the activity + * @param options additional options that control the execution and processing of the activity * @param returnType the expected class type of the sub-workflow output - * @param the expected type of the sub-workflow output + * @param the expected type of the sub-workflow output * @return a new {@link Task} that completes when the sub-workflow completes or fails */ - Task callSubWorkflow(String name, - @Nullable Object input, - @Nullable String instanceID, - @Nullable TaskOptions options, - Class returnType); + Task callSubWorkflow(String name, + @Nullable Object input, + @Nullable String instanceID, + @Nullable TaskOptions options, + Class returnType); + + /** + * Restarts the orchestration with a new input and clears its history. See {@link #continueAsNew(Object, boolean)} + * for a full description. + * + * @param input the serializable input data to re-initialize the instance with + */ + default void continueAsNew(Object input) { + this.continueAsNew(input, true); + } + + /** + * Restarts the orchestration with a new input and clears its history. + * + *

This method is primarily designed for eternal orchestrations, which are orchestrations that + * may not ever complete. It works by restarting the orchestration, providing it with a new input, + * and truncating the existing orchestration history. It allows an orchestration to continue + * running indefinitely without having its history grow unbounded. The benefits of periodically + * truncating history include decreased memory usage, decreased storage volumes, and shorter orchestrator + * replays when rebuilding state. + * + *

The results of any incomplete tasks will be discarded when an orchestrator calls {@code continueAsNew}. + * For example, if a timer is scheduled and then {@code continueAsNew} is called before the timer fires, the timer + * event will be discarded. The only exception to this is external events. By default, if an external event is + * received by an orchestration but not yet processed, the event is saved in the orchestration state unit it is + * received by a call to {@link #waitForExternalEvent}. These events will remain in memory + * even after an orchestrator restarts using {@code continueAsNew}. This behavior can be disabled by specifying + * {@code false} for the {@code preserveUnprocessedEvents} parameter value. + * + *

Orchestrator implementations should complete immediately after calling the{@code continueAsNew} method. + * + * @param input the serializable input data to re-initialize the instance with + * @param preserveUnprocessedEvents {@code true} to push unprocessed external events into the new orchestration + * history, otherwise {@code false} + */ + void continueAsNew(Object input, boolean preserveUnprocessedEvents); } diff --git a/sdk-workflows/src/test/java/io/dapr/workflows/runtime/DaprWorkflowContextImplTest.java b/sdk-workflows/src/test/java/io/dapr/workflows/runtime/DaprWorkflowContextImplTest.java index 6a3f147b4..a63ba7f70 100644 --- a/sdk-workflows/src/test/java/io/dapr/workflows/runtime/DaprWorkflowContextImplTest.java +++ b/sdk-workflows/src/test/java/io/dapr/workflows/runtime/DaprWorkflowContextImplTest.java @@ -122,6 +122,13 @@ public void getLoggerFirstTimeTest() { verify(mockLogger, times(1)).info(expectedArg); } + @Test + public void continueAsNewTest() { + String expectedInput = "TestInput"; + context.continueAsNew(expectedInput); + verify(mockInnerContext, times(1)).continueAsNew(expectedInput); + } + @Test public void allOfTest() { Task t1 = mockInnerContext.callActivity("task1");