Skip to content

Commit

Permalink
Implementing allOf, anyOf, createTimer methods (#11)
Browse files Browse the repository at this point in the history
Co-authored-by: Julio Rezende <jsilvarezend@microsoft.com>
Signed-off-by: Mahmut Canga <cangamahmut@gmail.com>
Co-authored-by: Julio Rezende <jsilvarezend@microsoft.com>
  • Loading branch information
2 people authored and macromania committed Jun 21, 2023
1 parent 730d22d commit 4bba4a4
Show file tree
Hide file tree
Showing 5 changed files with 224 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,15 @@

package io.dapr.examples.workflows;

import com.microsoft.durabletask.CompositeTaskFailedException;
import com.microsoft.durabletask.Task;
import com.microsoft.durabletask.TaskCanceledException;
import io.dapr.workflows.runtime.Workflow;
import io.dapr.workflows.runtime.WorkflowContext;

import java.time.Duration;
import java.util.Arrays;
import java.util.List;

/**
* Implementation of the DemoWorkflow for the server side.
Expand Down Expand Up @@ -45,6 +49,40 @@ public void run(WorkflowContext ctx) {
ctx.getLogger().warn(e.getMessage());
}

ctx.getLogger().info("Parallel Execution - Waiting for all tasks to finish...");
try {
Task<String> t1 = ctx.waitForExternalEvent("event1", Duration.ofSeconds(5), String.class);
Task<String> t2 = ctx.waitForExternalEvent("event2", Duration.ofSeconds(5), String.class);
Task<String> t3 = ctx.waitForExternalEvent("event3", Duration.ofSeconds(5), String.class);

List<String> results = ctx.allOf(Arrays.asList(t1, t2, t3)).await();
results.forEach(t -> ctx.getLogger().info("finished task: " + t));
ctx.getLogger().info("All tasks finished!");

} catch (CompositeTaskFailedException e) {
ctx.getLogger().warn(e.getMessage());
List<Exception> exceptions = e.getExceptions();
exceptions.forEach(ex -> ctx.getLogger().warn(ex.getMessage()));
}

ctx.getLogger().info("Parallel Execution - Waiting for any task to finish...");
try {
Task<String> e1 = ctx.waitForExternalEvent("e1", Duration.ofSeconds(5), String.class);
Task<String> e2 = ctx.waitForExternalEvent("e2", Duration.ofSeconds(5), String.class);
Task<String> e3 = ctx.waitForExternalEvent("e3", Duration.ofSeconds(5), String.class);
Task<Void> timeoutTask = ctx.createTimer(Duration.ofSeconds(1));

Task<?> winner = ctx.anyOf(Arrays.asList(e1, e2, e3, timeoutTask)).await();
if (winner == timeoutTask) {
ctx.getLogger().info("All tasks timed out!");
} else {
ctx.getLogger().info("One of the tasks finished!");
}
} catch (TaskCanceledException e) {
ctx.getLogger().warn("Timed out");
ctx.getLogger().warn(e.getMessage());
}

ctx.getLogger().info("Calling Activity...");
var input = new DemoActivityInput("Hello Activity!");
var output = ctx.callActivity(DemoWorkflowActivity.class.getName(), input, DemoActivityOutput.class).await();
Expand All @@ -53,7 +91,6 @@ public void run(WorkflowContext ctx) {
ctx.getLogger().info("Activity returned: " + output.getNewMessage());
ctx.getLogger().info("Activity returned: " + output.getOriginalMessage());


ctx.getLogger().info("Workflow finished");
ctx.complete("finished");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,19 @@ public static void main(String[] args) throws InterruptedException {
System.out.println("**SendExternalMessage**");
client.raiseEvent(instanceId, "TestEvent", "TestEventPayload");

System.out.println(separatorStr);
System.out.println("** Registering parallel Events to be captured by allOf(t1,t2,t3) **");
client.raiseEvent(instanceId, "event1", "TestEvent 1 Payload");
client.raiseEvent(instanceId, "event2", "TestEvent 2 Payload");
client.raiseEvent(instanceId, "event3", "TestEvent 3 Payload");
System.out.printf("Events raised for workflow with instanceId: %s\n", instanceId);

System.out.println(separatorStr);
System.out.println("** Registering Event to be captured by anyOf(t1,t2,t3) **");
client.raiseEvent(instanceId, "e2", "event 2 Payload");
System.out.printf("Event raised for workflow with instanceId: %s\n", instanceId);


System.out.println(separatorStr);
System.out.println("**WaitForInstanceCompletion**");
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

package io.dapr.workflows.runtime;

import com.microsoft.durabletask.CompositeTaskFailedException;
import com.microsoft.durabletask.Task;
import com.microsoft.durabletask.TaskCanceledException;
import com.microsoft.durabletask.TaskOptions;
Expand All @@ -23,6 +24,7 @@

import java.time.Duration;
import java.time.Instant;
import java.util.List;

/**
* Dapr workflow context implementation.
Expand Down Expand Up @@ -120,4 +122,25 @@ public boolean getIsReplaying() {
public <V> Task<V> callActivity(String name, Object input, TaskOptions options, Class<V> returnType) {
return this.innerContext.callActivity(name, input, options, returnType);
}

/**
* {@inheritDoc}
*/
public <V> Task<List<V>> allOf(List<Task<V>> tasks) throws CompositeTaskFailedException {
return this.innerContext.allOf(tasks);
}

/**
* {@inheritDoc}
*/
public Task<Task<?>> anyOf(List<Task<?>> tasks) {
return this.innerContext.anyOf(tasks);
}

/**
* {@inheritDoc}
*/
public Task<Void> createTimer(Duration duration) {
return this.innerContext.createTimer(duration);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

package io.dapr.workflows.runtime;

import com.microsoft.durabletask.CompositeTaskFailedException;
import com.microsoft.durabletask.Task;
import com.microsoft.durabletask.TaskCanceledException;
import com.microsoft.durabletask.TaskFailedException;
Expand All @@ -21,6 +22,9 @@

import java.time.Duration;
import java.time.Instant;
import java.time.ZonedDateTime;
import java.util.Arrays;
import java.util.List;

/**
* Context object used by workflow implementations to perform actions such as scheduling activities,
Expand Down Expand Up @@ -234,4 +238,117 @@ default Task<Void> callActivity(String name, Object input, TaskOptions options)
* @return {@code true} if the workflow is replaying, otherwise {@code false}
*/
boolean getIsReplaying();

/**
* Returns a new {@code Task} that is completed when all the given {@code Task}s complete. If any of the given
* {@code Task}s complete with an exception, the returned {@code Task} will also complete with an
* {@link CompositeTaskFailedException} containing details of the first encountered failure.
* The value of the returned {@code Task} is an ordered list of the return values of the given tasks.
* If no tasks are provided, returns a {@code Task} completed with value
* {@code null}.
*
* <p>This method is useful for awaiting the completion of a set of independent tasks before continuing to the next
* step in the orchestration, as in the following example:
* <pre>{@code
* Task<String> t1 = ctx.callActivity("MyActivity", String.class);
* Task<String> t2 = ctx.callActivity("MyActivity", String.class);
* Task<String> t3 = ctx.callActivity("MyActivity", String.class);
*
* List<String> orderedResults = ctx.allOf(List.of(t1, t2, t3)).await();
* }</pre>
*
* <p>Exceptions in any of the given tasks results in an unchecked {@link CompositeTaskFailedException}.
* This exception can be inspected to obtain failure details of individual {@link Task}s.
* <pre>{@code
* try {
* List<String> orderedResults = ctx.allOf(List.of(t1, t2, t3)).await();
* } catch (CompositeTaskFailedException e) {
* List<Exception> exceptions = e.getExceptions()
* }
* }</pre>
*
* @param tasks the list of {@code Task} objects
* @param <V> the return type of the {@code Task} objects
* @return the values of the completed {@code Task} objects in the same order as the source list
* @throws CompositeTaskFailedException if the specified {@code timeout} value expires before the event is received
*
*/
<V> Task<List<V>> allOf(List<Task<V>> tasks) throws CompositeTaskFailedException;

/**
* Returns a new {@code Task} that is completed when any of the tasks in {@code tasks} completes.
* See {@link #anyOf(Task[])} for more detailed information.
*
* @param tasks the list of {@code Task} objects
* @return a new {@code Task} that is completed when any of the given {@code Task}s complete
* @see #anyOf(Task[])
*/
Task<Task<?>> anyOf(List<Task<?>> tasks);

/**
* Returns a new {@code Task} that is completed when any of the given {@code Task}s complete. The value of the
* new {@code Task} is a reference to the completed {@code Task} object. If no tasks are provided, returns a
* {@code Task} that never completes.
*
* <p>This method is useful for waiting on multiple concurrent tasks and performing a task-specific operation when the
* first task completes, as in the following example:
* <pre>{@code
* Task<Void> event1 = ctx.waitForExternalEvent("Event1");
* Task<Void> event2 = ctx.waitForExternalEvent("Event2");
* Task<Void> event3 = ctx.waitForExternalEvent("Event3");
*
* Task<?> winner = ctx.anyOf(event1, event2, event3).await();
* if (winner == event1) {
* // ...
* } else if (winner == event2) {
* // ...
* } else if (winner == event3) {
* // ...
* }
* }</pre>
* The {@code anyOf} method can also be used for implementing long-running timeouts, as in the following example:
* <pre>{@code
* Task<Void> activityTask = ctx.callActivity("SlowActivity");
* Task<Void> timeoutTask = ctx.createTimer(Duration.ofMinutes(30));
*
* Task<?> winner = ctx.anyOf(activityTask, timeoutTask).await();
* if (winner == activityTask) {
* // completion case
* } else {
* // timeout case
* }
* }</pre>
*
* @param tasks the list of {@code Task} objects
* @return a new {@code Task} that is completed when any of the given {@code Task}s complete
*/
default Task<Task<?>> anyOf(Task<?>... tasks) {
return this.anyOf(Arrays.asList(tasks));
}

/**
* Creates a durable timer that expires after the specified delay.
*
* <p>Specifying a long delay (for example, a delay of a few days or more) may result in the creation of multiple,
* internally-managed durable timers. The orchestration code doesn't need to be aware of this behavior. However,
* it may be visible in framework logs and the stored history state.
*
* @param duration the amount of time before the timer should expire
* @return a new {@code Task} that completes after the specified delay
*/
Task<Void> createTimer(Duration duration);

/**
* Creates a durable timer that expires after the specified timestamp with specific zone.
*
* <p>Specifying a long delay (for example, a delay of a few days or more) may result in the creation of multiple,
* internally-managed timers. The workflow code doesn't need to be aware of this behavior. However,
* it may be visible in framework logs and the stored history state.
*
* @param zonedDateTime timestamp with specific zone when the timer should expire
* @return a new {@code Task} that completes after the specified delay
*/
default Task<Void> createTimer(ZonedDateTime zonedDateTime) {
throw new UnsupportedOperationException("This method is not implemented.");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,15 @@

package io.dapr.workflows.runtime;

import com.microsoft.durabletask.Task;
import com.microsoft.durabletask.TaskOrchestrationContext;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;

import java.time.Duration;
import java.util.Arrays;
import java.util.List;

import static org.mockito.Mockito.any;
import static org.mockito.Mockito.mock;
Expand Down Expand Up @@ -90,7 +93,7 @@ public void completeTest() {
}

@Test
public void getIsReplaying() {
public void getIsReplayingTest() {
context.getIsReplaying();
verify(mockInnerContext, times(1)).getIsReplaying();
}
Expand Down Expand Up @@ -118,4 +121,33 @@ public void getLoggerFirstTimeTest() {

verify(mockLogger, times(1)).info(expectedArg);
}

@Test
public void allOfTest() {
Task<Void> t1 = mockInnerContext.callActivity("task1");
Task<Void> t2 = mockInnerContext.callActivity("task2");
List<Task<Void>> taskList = Arrays.asList(t1, t2);
context.allOf(taskList);
verify(mockInnerContext, times(1)).allOf(taskList);
}

@Test
public void anyOfTest() {
Task<Void> t1 = mockInnerContext.callActivity("task1");
Task<Void> t2 = mockInnerContext.callActivity("task2");
Task<Void> t3 = mockInnerContext.callActivity("task3");
List<Task<?>> taskList = Arrays.asList(t1, t2);

context.anyOf(taskList);
verify(mockInnerContext, times(1)).anyOf(taskList);

context.anyOf(t1, t2, t3);
verify(mockInnerContext, times(1)).anyOf(Arrays.asList(t1, t2, t3));
}

@Test
public void createTimerTest() {
context.createTimer(Duration.ofSeconds(10));
verify(mockInnerContext, times(1)).createTimer(Duration.ofSeconds(10));
}
}

0 comments on commit 4bba4a4

Please sign in to comment.