diff --git a/examples/src/main/java/io/dapr/examples/workflows/DemoWorkflow.java b/examples/src/main/java/io/dapr/examples/workflows/DemoWorkflow.java index 9c45ae1c9..917d9298d 100644 --- a/examples/src/main/java/io/dapr/examples/workflows/DemoWorkflow.java +++ b/examples/src/main/java/io/dapr/examples/workflows/DemoWorkflow.java @@ -13,11 +13,15 @@ package io.dapr.examples.workflows; +import com.microsoft.durabletask.CompositeTaskFailedException; +import com.microsoft.durabletask.Task; import com.microsoft.durabletask.TaskCanceledException; import io.dapr.workflows.runtime.Workflow; import io.dapr.workflows.runtime.WorkflowContext; import java.time.Duration; +import java.util.Arrays; +import java.util.List; /** * Implementation of the DemoWorkflow for the server side. @@ -45,6 +49,40 @@ public void run(WorkflowContext ctx) { ctx.getLogger().warn(e.getMessage()); } + ctx.getLogger().info("Parallel Execution - Waiting for all tasks to finish..."); + try { + Task t1 = ctx.waitForExternalEvent("event1", Duration.ofSeconds(5), String.class); + Task t2 = ctx.waitForExternalEvent("event2", Duration.ofSeconds(5), String.class); + Task t3 = ctx.waitForExternalEvent("event3", Duration.ofSeconds(5), String.class); + + List results = ctx.allOf(Arrays.asList(t1, t2, t3)).await(); + results.forEach(t -> ctx.getLogger().info("finished task: " + t)); + ctx.getLogger().info("All tasks finished!"); + + } catch (CompositeTaskFailedException e) { + ctx.getLogger().warn(e.getMessage()); + List exceptions = e.getExceptions(); + exceptions.forEach(ex -> ctx.getLogger().warn(ex.getMessage())); + } + + ctx.getLogger().info("Parallel Execution - Waiting for any task to finish..."); + try { + Task e1 = ctx.waitForExternalEvent("e1", Duration.ofSeconds(5), String.class); + Task e2 = ctx.waitForExternalEvent("e2", Duration.ofSeconds(5), String.class); + Task e3 = ctx.waitForExternalEvent("e3", Duration.ofSeconds(5), String.class); + Task timeoutTask = ctx.createTimer(Duration.ofSeconds(1)); + + Task winner = ctx.anyOf(Arrays.asList(e1, e2, e3, timeoutTask)).await(); + if (winner == timeoutTask) { + ctx.getLogger().info("All tasks timed out!"); + } else { + ctx.getLogger().info("One of the tasks finished!"); + } + } catch (TaskCanceledException e) { + ctx.getLogger().warn("Timed out"); + ctx.getLogger().warn(e.getMessage()); + } + ctx.getLogger().info("Calling Activity..."); var input = new DemoActivityInput("Hello Activity!"); var output = ctx.callActivity(DemoWorkflowActivity.class.getName(), input, DemoActivityOutput.class).await(); @@ -53,7 +91,6 @@ public void run(WorkflowContext ctx) { ctx.getLogger().info("Activity returned: " + output.getNewMessage()); ctx.getLogger().info("Activity returned: " + output.getOriginalMessage()); - ctx.getLogger().info("Workflow finished"); ctx.complete("finished"); } diff --git a/examples/src/main/java/io/dapr/examples/workflows/DemoWorkflowClient.java b/examples/src/main/java/io/dapr/examples/workflows/DemoWorkflowClient.java index 758300b55..e29616a03 100644 --- a/examples/src/main/java/io/dapr/examples/workflows/DemoWorkflowClient.java +++ b/examples/src/main/java/io/dapr/examples/workflows/DemoWorkflowClient.java @@ -59,6 +59,19 @@ public static void main(String[] args) throws InterruptedException { System.out.println("**SendExternalMessage**"); client.raiseEvent(instanceId, "TestEvent", "TestEventPayload"); + System.out.println(separatorStr); + System.out.println("** Registering parallel Events to be captured by allOf(t1,t2,t3) **"); + client.raiseEvent(instanceId, "event1", "TestEvent 1 Payload"); + client.raiseEvent(instanceId, "event2", "TestEvent 2 Payload"); + client.raiseEvent(instanceId, "event3", "TestEvent 3 Payload"); + System.out.printf("Events raised for workflow with instanceId: %s\n", instanceId); + + System.out.println(separatorStr); + System.out.println("** Registering Event to be captured by anyOf(t1,t2,t3) **"); + client.raiseEvent(instanceId, "e2", "event 2 Payload"); + System.out.printf("Event raised for workflow with instanceId: %s\n", instanceId); + + System.out.println(separatorStr); System.out.println("**WaitForInstanceCompletion**"); try { diff --git a/sdk-workflows/src/main/java/io/dapr/workflows/runtime/DaprWorkflowContextImpl.java b/sdk-workflows/src/main/java/io/dapr/workflows/runtime/DaprWorkflowContextImpl.java index c0c0fc3f6..e3c70d7f0 100644 --- a/sdk-workflows/src/main/java/io/dapr/workflows/runtime/DaprWorkflowContextImpl.java +++ b/sdk-workflows/src/main/java/io/dapr/workflows/runtime/DaprWorkflowContextImpl.java @@ -13,6 +13,7 @@ package io.dapr.workflows.runtime; +import com.microsoft.durabletask.CompositeTaskFailedException; import com.microsoft.durabletask.Task; import com.microsoft.durabletask.TaskCanceledException; import com.microsoft.durabletask.TaskOptions; @@ -23,6 +24,7 @@ import java.time.Duration; import java.time.Instant; +import java.util.List; /** * Dapr workflow context implementation. @@ -120,4 +122,25 @@ public boolean getIsReplaying() { public Task callActivity(String name, Object input, TaskOptions options, Class returnType) { return this.innerContext.callActivity(name, input, options, returnType); } + + /** + * {@inheritDoc} + */ + public Task> allOf(List> tasks) throws CompositeTaskFailedException { + return this.innerContext.allOf(tasks); + } + + /** + * {@inheritDoc} + */ + public Task> anyOf(List> tasks) { + return this.innerContext.anyOf(tasks); + } + + /** + * {@inheritDoc} + */ + public Task createTimer(Duration duration) { + return this.innerContext.createTimer(duration); + } } diff --git a/sdk-workflows/src/main/java/io/dapr/workflows/runtime/WorkflowContext.java b/sdk-workflows/src/main/java/io/dapr/workflows/runtime/WorkflowContext.java index b12816551..f793b8fae 100644 --- a/sdk-workflows/src/main/java/io/dapr/workflows/runtime/WorkflowContext.java +++ b/sdk-workflows/src/main/java/io/dapr/workflows/runtime/WorkflowContext.java @@ -13,6 +13,7 @@ package io.dapr.workflows.runtime; +import com.microsoft.durabletask.CompositeTaskFailedException; import com.microsoft.durabletask.Task; import com.microsoft.durabletask.TaskCanceledException; import com.microsoft.durabletask.TaskFailedException; @@ -21,6 +22,9 @@ import java.time.Duration; import java.time.Instant; +import java.time.ZonedDateTime; +import java.util.Arrays; +import java.util.List; /** * Context object used by workflow implementations to perform actions such as scheduling activities, @@ -234,4 +238,117 @@ default Task callActivity(String name, Object input, TaskOptions options) * @return {@code true} if the workflow is replaying, otherwise {@code false} */ boolean getIsReplaying(); + + /** + * Returns a new {@code Task} that is completed when all the given {@code Task}s complete. If any of the given + * {@code Task}s complete with an exception, the returned {@code Task} will also complete with an + * {@link CompositeTaskFailedException} containing details of the first encountered failure. + * The value of the returned {@code Task} is an ordered list of the return values of the given tasks. + * If no tasks are provided, returns a {@code Task} completed with value + * {@code null}. + * + *

This method is useful for awaiting the completion of a set of independent tasks before continuing to the next + * step in the orchestration, as in the following example: + *

{@code
+   * Task t1 = ctx.callActivity("MyActivity", String.class);
+   * Task t2 = ctx.callActivity("MyActivity", String.class);
+   * Task t3 = ctx.callActivity("MyActivity", String.class);
+   *
+   * List orderedResults = ctx.allOf(List.of(t1, t2, t3)).await();
+   * }
+ * + *

Exceptions in any of the given tasks results in an unchecked {@link CompositeTaskFailedException}. + * This exception can be inspected to obtain failure details of individual {@link Task}s. + *

{@code
+   * try {
+   *     List orderedResults = ctx.allOf(List.of(t1, t2, t3)).await();
+   * } catch (CompositeTaskFailedException e) {
+   *     List exceptions = e.getExceptions()
+   * }
+   * }
+ * + * @param tasks the list of {@code Task} objects + * @param the return type of the {@code Task} objects + * @return the values of the completed {@code Task} objects in the same order as the source list + * @throws CompositeTaskFailedException if the specified {@code timeout} value expires before the event is received + * + */ + Task> allOf(List> tasks) throws CompositeTaskFailedException; + + /** + * Returns a new {@code Task} that is completed when any of the tasks in {@code tasks} completes. + * See {@link #anyOf(Task[])} for more detailed information. + * + * @param tasks the list of {@code Task} objects + * @return a new {@code Task} that is completed when any of the given {@code Task}s complete + * @see #anyOf(Task[]) + */ + Task> anyOf(List> tasks); + + /** + * Returns a new {@code Task} that is completed when any of the given {@code Task}s complete. The value of the + * new {@code Task} is a reference to the completed {@code Task} object. If no tasks are provided, returns a + * {@code Task} that never completes. + * + *

This method is useful for waiting on multiple concurrent tasks and performing a task-specific operation when the + * first task completes, as in the following example: + *

{@code
+   * Task event1 = ctx.waitForExternalEvent("Event1");
+   * Task event2 = ctx.waitForExternalEvent("Event2");
+   * Task event3 = ctx.waitForExternalEvent("Event3");
+   *
+   * Task winner = ctx.anyOf(event1, event2, event3).await();
+   * if (winner == event1) {
+   *     // ...
+   * } else if (winner == event2) {
+   *     // ...
+   * } else if (winner == event3) {
+   *     // ...
+   * }
+   * }
+ * The {@code anyOf} method can also be used for implementing long-running timeouts, as in the following example: + *
{@code
+   * Task activityTask = ctx.callActivity("SlowActivity");
+   * Task timeoutTask = ctx.createTimer(Duration.ofMinutes(30));
+   *
+   * Task winner = ctx.anyOf(activityTask, timeoutTask).await();
+   * if (winner == activityTask) {
+   *     // completion case
+   * } else {
+   *     // timeout case
+   * }
+   * }
+ * + * @param tasks the list of {@code Task} objects + * @return a new {@code Task} that is completed when any of the given {@code Task}s complete + */ + default Task> anyOf(Task... tasks) { + return this.anyOf(Arrays.asList(tasks)); + } + + /** + * Creates a durable timer that expires after the specified delay. + * + *

Specifying a long delay (for example, a delay of a few days or more) may result in the creation of multiple, + * internally-managed durable timers. The orchestration code doesn't need to be aware of this behavior. However, + * it may be visible in framework logs and the stored history state. + * + * @param duration the amount of time before the timer should expire + * @return a new {@code Task} that completes after the specified delay + */ + Task createTimer(Duration duration); + + /** + * Creates a durable timer that expires after the specified timestamp with specific zone. + * + *

Specifying a long delay (for example, a delay of a few days or more) may result in the creation of multiple, + * internally-managed timers. The workflow code doesn't need to be aware of this behavior. However, + * it may be visible in framework logs and the stored history state. + * + * @param zonedDateTime timestamp with specific zone when the timer should expire + * @return a new {@code Task} that completes after the specified delay + */ + default Task createTimer(ZonedDateTime zonedDateTime) { + throw new UnsupportedOperationException("This method is not implemented."); + } } diff --git a/sdk-workflows/src/test/java/io/dapr/workflows/runtime/DaprWorkflowContextImplTest.java b/sdk-workflows/src/test/java/io/dapr/workflows/runtime/DaprWorkflowContextImplTest.java index a65fb7164..9de8a9740 100644 --- a/sdk-workflows/src/test/java/io/dapr/workflows/runtime/DaprWorkflowContextImplTest.java +++ b/sdk-workflows/src/test/java/io/dapr/workflows/runtime/DaprWorkflowContextImplTest.java @@ -13,12 +13,15 @@ package io.dapr.workflows.runtime; +import com.microsoft.durabletask.Task; import com.microsoft.durabletask.TaskOrchestrationContext; import org.junit.Before; import org.junit.Test; import org.slf4j.Logger; import java.time.Duration; +import java.util.Arrays; +import java.util.List; import static org.mockito.Mockito.any; import static org.mockito.Mockito.mock; @@ -90,7 +93,7 @@ public void completeTest() { } @Test - public void getIsReplaying() { + public void getIsReplayingTest() { context.getIsReplaying(); verify(mockInnerContext, times(1)).getIsReplaying(); } @@ -118,4 +121,33 @@ public void getLoggerFirstTimeTest() { verify(mockLogger, times(1)).info(expectedArg); } + + @Test + public void allOfTest() { + Task t1 = mockInnerContext.callActivity("task1"); + Task t2 = mockInnerContext.callActivity("task2"); + List> taskList = Arrays.asList(t1, t2); + context.allOf(taskList); + verify(mockInnerContext, times(1)).allOf(taskList); + } + + @Test + public void anyOfTest() { + Task t1 = mockInnerContext.callActivity("task1"); + Task t2 = mockInnerContext.callActivity("task2"); + Task t3 = mockInnerContext.callActivity("task3"); + List> taskList = Arrays.asList(t1, t2); + + context.anyOf(taskList); + verify(mockInnerContext, times(1)).anyOf(taskList); + + context.anyOf(t1, t2, t3); + verify(mockInnerContext, times(1)).anyOf(Arrays.asList(t1, t2, t3)); + } + + @Test + public void createTimerTest() { + context.createTimer(Duration.ofSeconds(10)); + verify(mockInnerContext, times(1)).createTimer(Duration.ofSeconds(10)); + } }