Skip to content

Commit

Permalink
continueAsNew Implementation (#13)
Browse files Browse the repository at this point in the history
Signed-off-by: Mahmut Canga <cangamahmut@gmail.com>
  • Loading branch information
macromania committed Jun 21, 2023
1 parent d1b64ba commit 34b8f82
Show file tree
Hide file tree
Showing 5 changed files with 131 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -92,14 +92,33 @@ public void run(WorkflowContext ctx) {
ctx.getLogger().info("Activity returned: " + output.getOriginalMessage());


ctx.getLogger().info("Child-Workflow> Calling ChildWorkflow...");
var childWorkflowInput = "Hello ChildWorkflow!";
var childWorkflowOutput = ctx.callSubWorkflow(DemoSubWorkflow.class.getName(), childWorkflowInput,
String.class).await();
boolean shouldComplete = true;
ctx.getLogger().info("Waiting for event: 'RestartEvent'...");
try {
ctx.waitForExternalEvent("RestartEvent", Duration.ofSeconds(10)).await();
ctx.getLogger().info("Received RestartEvent");
ctx.getLogger().info("Restarting Workflow by calling continueAsNew...");
ctx.continueAsNew("TestInputRestart", false);
shouldComplete = false;
} catch (TaskCanceledException e) {
ctx.getLogger().warn("Restart Timed out");
ctx.getLogger().warn(e.getMessage());
}

ctx.getLogger().info("Child-Workflow> returned: " + childWorkflowOutput);
if (shouldComplete) {
ctx.getLogger().info("Child-Workflow> Calling ChildWorkflow...");
var childWorkflowInput = "Hello ChildWorkflow!";
var childWorkflowOutput =
ctx.callSubWorkflow(DemoSubWorkflow.class.getName(), childWorkflowInput, String.class).await();

ctx.getLogger().info("Child-Workflow> returned: " + childWorkflowOutput);

ctx.getLogger().info("Workflow finished");
ctx.complete("finished");

return;
}

ctx.getLogger().info("Workflow finished");
ctx.complete("finished");
ctx.getLogger().info("Workflow restarted");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,19 @@ public static void main(String[] args) throws InterruptedException {
System.out.println("Terminate this workflow instance manually before the timeout is reached");
client.terminateWorkflow(instanceToTerminateId, null);
System.out.println(separatorStr);

String restartingInstanceId = "restarting";
client.scheduleNewWorkflow(DemoWorkflow.class, null, restartingInstanceId);
System.out.printf("Started new workflow instance with ID: %s%n", restartingInstanceId);
System.out.println("Sleeping 30 seconds to restart the workflow");
TimeUnit.SECONDS.sleep(30);

System.out.println("**SendExternalMessage: RestartEvent**");
client.raiseEvent(restartingInstanceId, "RestartEvent", "RestartEventPayload");

System.out.println("Sleeping 30 seconds to terminate the eternal workflow");
TimeUnit.SECONDS.sleep(30);
client.terminateWorkflow(restartingInstanceId, null);
}

System.out.println("Exiting DemoWorkflowClient.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.slf4j.helpers.NOPLogger;

import javax.annotation.Nullable;

import java.time.Duration;
import java.time.Instant;
import java.util.List;
Expand Down Expand Up @@ -145,6 +146,7 @@ public Task<Void> createTimer(Duration duration) {
return this.innerContext.createTimer(duration);
}


/**
* {@inheritDoc}
*/
Expand All @@ -157,9 +159,24 @@ public <T> T getInput(Class<T> targetType) {
*/
@Override
public <V> Task<V> callSubWorkflow(String name, @Nullable Object input, @Nullable String instanceID,
@Nullable TaskOptions options, Class<V> returnType) {
@Nullable TaskOptions options, Class<V> returnType) {

return this.innerContext.callSubOrchestrator(name, input, instanceID, options, returnType);
}

/**
* {@inheritDoc}
*/
@Override
public void continueAsNew(Object input) {
this.innerContext.continueAsNew(input);
}

/**
* {@inheritDoc}
*/
@Override
public void continueAsNew(Object input, boolean preserveUnprocessedEvents) {
this.innerContext.continueAsNew(input, preserveUnprocessedEvents);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.slf4j.Logger;

import javax.annotation.Nullable;

import java.time.Duration;
import java.time.Instant;
import java.time.ZonedDateTime;
Expand Down Expand Up @@ -269,10 +270,9 @@ default Task<Void> callActivity(String name, Object input, TaskOptions options)
* }</pre>
*
* @param tasks the list of {@code Task} objects
* @param <V> the return type of the {@code Task} objects
* @param <V> the return type of the {@code Task} objects
* @return the values of the completed {@code Task} objects in the same order as the source list
* @throws CompositeTaskFailedException if the specified {@code timeout} value expires before the event is received
*
*/
<V> Task<List<V>> allOf(List<Task<V>> tasks) throws CompositeTaskFailedException;

Expand Down Expand Up @@ -353,24 +353,25 @@ default Task<Void> createTimer(ZonedDateTime zonedDateTime) {
throw new UnsupportedOperationException("This method is not implemented.");
}


/**
* Gets the deserialized input of the current task orchestration.
*
* @param targetType the {@link Class} object associated with {@code V}
* @param <V> the expected type of the workflow input
* @param <V> the expected type of the workflow input
* @return the deserialized input as an object of type {@code V} or {@code null} if no input was provided.
*/
<V> V getInput(Class<V> targetType);

/**
* Asynchronously invokes another workflow as a sub-workflow and returns a {@link Task} that completes
* when the sub-workflow completes.
*
*
* <p>See {@link #callSubWorkflow(String, Object, String, TaskOptions, Class)} for a full description.
*
* @see #callSubWorkflow(String, Object, String, TaskOptions, Class)
* @param name the name of the workflow to invoke
* @return a new {@link Task} that completes when the sub-workflow completes or fails
* @see #callSubWorkflow(String, Object, String, TaskOptions, Class)
*/
default Task<Void> callSubWorkflow(String name) {
return this.callSubWorkflow(name, null);
Expand All @@ -379,10 +380,10 @@ default Task<Void> callSubWorkflow(String name) {
/**
* Asynchronously invokes another workflow as a sub-workflow and returns a {@link Task} that completes
* when the sub-workflow completes.
*
*
* <p>See {@link #callSubWorkflow(String, Object, String, TaskOptions, Class)} for a full description.
*
* @param name the name of the workflow to invoke
* @param name the name of the workflow to invoke
* @param input the serializable input to send to the sub-workflow
* @return a new {@link Task} that completes when the sub-workflow completes or fails
*/
Expand All @@ -393,13 +394,13 @@ default Task<Void> callSubWorkflow(String name, Object input) {
/**
* Asynchronously invokes another workflow as a sub-workflow and returns a {@link Task} that completes
* when the sub-workflow completes.
*
*
* <p>See {@link #callSubWorkflow(String, Object, String, TaskOptions, Class)} for a full description.
*
* @param name the name of the workflow to invoke
* @param input the serializable input to send to the sub-workflow
* @param name the name of the workflow to invoke
* @param input the serializable input to send to the sub-workflow
* @param returnType the expected class type of the sub-workflow output
* @param <V> the expected type of the sub-workflow output
* @param <V> the expected type of the sub-workflow output
* @return a new {@link Task} that completes when the sub-workflow completes or fails
*/
default <V> Task<V> callSubWorkflow(String name, Object input, Class<V> returnType) {
Expand All @@ -409,14 +410,14 @@ default <V> Task<V> callSubWorkflow(String name, Object input, Class<V> returnTy
/**
* Asynchronously invokes another workflow as a sub-workflow and returns a {@link Task} that completes
* when the sub-workflow completes.
*
*
* <p>See {@link #callSubWorkflow(String, Object, String, TaskOptions, Class)} for a full description.
*
* @param name the name of the workflow to invoke
* @param input the serializable input to send to the sub-workflow
* @param name the name of the workflow to invoke
* @param input the serializable input to send to the sub-workflow
* @param instanceID the unique ID of the sub-workflow
* @param returnType the expected class type of the sub-workflow output
* @param <V> the expected type of the sub-workflow output
* @param <V> the expected type of the sub-workflow output
* @return a new {@link Task} that completes when the sub-workflow completes or fails
*/
default <V> Task<V> callSubWorkflow(String name, Object input, String instanceID, Class<V> returnType) {
Expand All @@ -426,13 +427,13 @@ default <V> Task<V> callSubWorkflow(String name, Object input, String instanceID
/**
* Asynchronously invokes another workflow as a sub-workflow and returns a {@link Task} that completes
* when the sub-workflow completes.
*
*
* <p>See {@link #callSubWorkflow(String, Object, String, TaskOptions, Class)} for a full description.
*
* @param name the name of the workflow to invoke
* @param input the serializable input to send to the sub-workflow
* @param name the name of the workflow to invoke
* @param input the serializable input to send to the sub-workflow
* @param instanceID the unique ID of the sub-workflow
* @param options additional options that control the execution and processing of the activity
* @param options additional options that control the execution and processing of the activity
* @return a new {@link Task} that completes when the sub-workflow completes or fails
*/
default Task<Void> callSubWorkflow(String name, Object input, String instanceID, TaskOptions options) {
Expand All @@ -444,7 +445,7 @@ default Task<Void> callSubWorkflow(String name, Object input, String instanceID,
* when the sub-workflow completes. If the sub-workflow completes successfully, the returned
* {@code Task}'s value will be the activity's output. If the sub-workflow fails, the returned {@code Task}
* will complete exceptionally with a {@link TaskFailedException}.
*
*
* <p>A sub-workflow has its own instance ID, history, and status that is independent of the parent workflow
* that started it. There are many advantages to breaking down large orchestrations into sub-workflows:
* <ul>
Expand All @@ -461,22 +462,58 @@ default Task<Void> callSubWorkflow(String name, Object input, String instanceID,
* </ul>
* The disadvantage is that there is overhead associated with starting a sub-workflow and processing its
* output. This is typically only an issue for very small orchestrations.
*
*
* <p>Because sub-workflows are independent of their parents, terminating a parent orchestration does not affect
* any sub-workflows. sub-workflows must be terminated independently using their unique instance ID,
* which is specified using the {@code instanceID} parameter
*
* @param name the name of the workflow to invoke
* @param input the serializable input to send to the sub-workflow
* @param name the name of the workflow to invoke
* @param input the serializable input to send to the sub-workflow
* @param instanceID the unique ID of the sub-workflow
* @param options additional options that control the execution and processing of the activity
* @param options additional options that control the execution and processing of the activity
* @param returnType the expected class type of the sub-workflow output
* @param <V> the expected type of the sub-workflow output
* @param <V> the expected type of the sub-workflow output
* @return a new {@link Task} that completes when the sub-workflow completes or fails
*/
<V> Task<V> callSubWorkflow(String name,
@Nullable Object input,
@Nullable String instanceID,
@Nullable TaskOptions options,
Class<V> returnType);
<V> Task<V> callSubWorkflow(String name,
@Nullable Object input,
@Nullable String instanceID,
@Nullable TaskOptions options,
Class<V> returnType);

/**
* Restarts the orchestration with a new input and clears its history. See {@link #continueAsNew(Object, boolean)}
* for a full description.
*
* @param input the serializable input data to re-initialize the instance with
*/
default void continueAsNew(Object input) {
this.continueAsNew(input, true);
}

/**
* Restarts the orchestration with a new input and clears its history.
*
* <p>This method is primarily designed for eternal orchestrations, which are orchestrations that
* may not ever complete. It works by restarting the orchestration, providing it with a new input,
* and truncating the existing orchestration history. It allows an orchestration to continue
* running indefinitely without having its history grow unbounded. The benefits of periodically
* truncating history include decreased memory usage, decreased storage volumes, and shorter orchestrator
* replays when rebuilding state.
*
* <p>The results of any incomplete tasks will be discarded when an orchestrator calls {@code continueAsNew}.
* For example, if a timer is scheduled and then {@code continueAsNew} is called before the timer fires, the timer
* event will be discarded. The only exception to this is external events. By default, if an external event is
* received by an orchestration but not yet processed, the event is saved in the orchestration state unit it is
* received by a call to {@link #waitForExternalEvent}. These events will remain in memory
* even after an orchestrator restarts using {@code continueAsNew}. This behavior can be disabled by specifying
* {@code false} for the {@code preserveUnprocessedEvents} parameter value.
*
* <p>Orchestrator implementations should complete immediately after calling the{@code continueAsNew} method.
*
* @param input the serializable input data to re-initialize the instance with
* @param preserveUnprocessedEvents {@code true} to push unprocessed external events into the new orchestration
* history, otherwise {@code false}
*/
void continueAsNew(Object input, boolean preserveUnprocessedEvents);
}
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,13 @@ public void getLoggerFirstTimeTest() {
verify(mockLogger, times(1)).info(expectedArg);
}

@Test
public void continueAsNewTest() {
String expectedInput = "TestInput";
context.continueAsNew(expectedInput);
verify(mockInnerContext, times(1)).continueAsNew(expectedInput);
}

@Test
public void allOfTest() {
Task<Void> t1 = mockInnerContext.callActivity("task1");
Expand Down

0 comments on commit 34b8f82

Please sign in to comment.