Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix ActivityCompletionClient cancellation and failure by WorkflowExecution #930

Merged
merged 1 commit into from
Oct 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ public void fail(Throwable failure) {
}
} else {
RespondActivityTaskFailedByIDRequest request = new RespondActivityTaskFailedByIDRequest();
request.setActivityID(activityId);
request.setReason(failure.getClass().getName());
request.setDetails(dataConverter.toData(failure));
request.setDomain(domain);
Expand Down Expand Up @@ -212,6 +213,7 @@ public void reportCancellation(Object details) {
}
} else {
RespondActivityTaskCanceledByIDRequest request = new RespondActivityTaskCanceledByIDRequest();
request.setActivityID(activityId);
request.setDetails(dataConverter.toData(details));
request.setDomain(domain);
request.setWorkflowID(execution.getWorkflowId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,10 @@ public void evaluate() throws Throwable {
};
}

public <T> T newWorkflowStub(Class<T> tClass) {
return getWorkflowClient().newWorkflowStub(tClass, workflowOptionsBuilder().build());
}

public TracingWorkflowInterceptorFactory getTracer() {
return context.getTracer();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,249 @@
/*
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this correct?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh, I didn't see the "modifiications copyright" down below, I guess it is from there. the licence URL looks sus, but I guess it is what it is.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The cadence java client is a fork of https://github.com/aws/aws-swf-flow-library 😄

*
* Modifications copyright (C) 2017 Uber Technologies, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
* use this file except in compliance with the License. A copy of the License is
* located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package com.uber.cadence.workflow;

import com.google.common.base.Preconditions;
import com.uber.cadence.WorkflowExecution;
import com.uber.cadence.activity.Activity;
import com.uber.cadence.activity.ActivityMethod;
import com.uber.cadence.activity.ActivityOptions;
import com.uber.cadence.activity.ActivityTask;
import com.uber.cadence.client.ActivityCompletionClient;
import com.uber.cadence.testUtils.CadenceTestRule;
import java.time.Duration;
import java.util.concurrent.CancellationException;
import org.junit.Rule;
import org.junit.Test;

public class ManualActivityCompletionWorkflowTest {
@Rule
public CadenceTestRule testRule =
CadenceTestRule.builder()
.withActivities(new ManualCompletionActivitiesImpl())
.withWorkflowTypes(ManualCompletionWorkflowImpl.class)
.startWorkersAutomatically()
.build();

@Test
public void testManualActivityCompletion() {
testRule.newWorkflowStub(ManualCompletionWorkflow.class).run();
}

public interface ManualCompletionActivities {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nonblocking / question: So, forgive my utter ignorance of testing in Java, but why declare an interface here?

@ActivityMethod
String asyncActivity();

@ActivityMethod
void completeAsyncActivity(String result);

@ActivityMethod
void completeAsyncActivityById(String result);

@ActivityMethod
void failAsyncActivity(String details);

@ActivityMethod
void failAsyncActivityById(String details);

@ActivityMethod
void cancelAsyncActivity(String details);

@ActivityMethod
void cancelAsyncActivityById(String details);
}

private class ManualCompletionActivitiesImpl implements ManualCompletionActivities {
private ActivityTask openTask;

@Override
public synchronized String asyncActivity() {
openTask = Activity.getTask();

Activity.doNotCompleteOnReturn();
return null;
}

@Override
public synchronized void completeAsyncActivity(String details) {
Preconditions.checkState(openTask != null);
getClient().complete(openTask.getTaskToken(), details);
}

@Override
public synchronized void completeAsyncActivityById(String details) {
Preconditions.checkState(openTask != null);
getClient().complete(getCurrentWorkflow(), openTask.getActivityId(), details);
}

@Override
public synchronized void failAsyncActivity(String details) {
Preconditions.checkState(openTask != null);
getClient()
.completeExceptionally(openTask.getTaskToken(), new ExceptionWithDetaills(details));
}

@Override
public synchronized void failAsyncActivityById(String details) {
Preconditions.checkState(openTask != null);
getClient()
.completeExceptionally(
getCurrentWorkflow(), openTask.getActivityId(), new ExceptionWithDetaills(details));
}

@Override
public synchronized void cancelAsyncActivity(String details) {
Preconditions.checkState(openTask != null);
getClient().reportCancellation(openTask.getTaskToken(), details);
}

@Override
public synchronized void cancelAsyncActivityById(String details) {
Preconditions.checkState(openTask != null);
getClient().reportCancellation(getCurrentWorkflow(), openTask.getActivityId(), details);
}

private WorkflowExecution getCurrentWorkflow() {
return Activity.getWorkflowExecution();
}

private ActivityCompletionClient getClient() {
return testRule.getWorkflowClient().newActivityCompletionClient();
}
}

public interface ManualCompletionWorkflow {
@WorkflowMethod
void run();
}

public static class ManualCompletionWorkflowImpl implements ManualCompletionWorkflow {
private final ManualCompletionActivities activities =
Workflow.newActivityStub(
ManualCompletionActivities.class,
new ActivityOptions.Builder()
.setScheduleToCloseTimeout(Duration.ofSeconds(10))
.build());

@Override
public void run() {
Promise<String> result = Async.function(activities::asyncActivity);
activities.completeAsyncActivity("1");
expectSuccess("1", result);
expectFailure(() -> activities.completeAsyncActivity("again"));

result = Async.function(activities::asyncActivity);
activities.completeAsyncActivityById("2");
expectSuccess("2", result);
expectFailure(() -> activities.completeAsyncActivityById("again"));

result = Async.function(activities::asyncActivity);
activities.failAsyncActivity("3");
expectFailureWithDetails(result, "3");
expectFailure(() -> activities.failAsyncActivity("again"));

result = Async.function(activities::asyncActivity);
activities.failAsyncActivityById("4");
expectFailureWithDetails(result, "4");
expectFailure(() -> activities.failAsyncActivityById("again"));

// Need to request cancellation, then the activity can respond with the cancel
CompletablePromise<String> completablePromise = Workflow.newPromise();
CancellationScope scope =
Workflow.newCancellationScope(
() -> {
completablePromise.completeFrom(Async.function(activities::asyncActivity));
});
result = completablePromise;
scope.run();
// Need to force a separate decision, otherwise the activity gets skipped since it's started
// and cancelled
// in the same decision
Workflow.sleep(1);
scope.cancel();
activities.cancelAsyncActivity("5");
expectCancelled(result);

// Need to request cancellation, then the activity can respond with the cancel
CompletablePromise<String> completablePromise2 = Workflow.newPromise();
scope =
Workflow.newCancellationScope(
() -> {
completablePromise2.completeFrom(Async.function(activities::asyncActivity));
});
scope.run();
// Need to force a separate decision, otherwise the activity gets skipped since it's started
// and cancelled in the same decision
Workflow.sleep(1);
scope.cancel();
result = completablePromise2;
activities.cancelAsyncActivityById("6");
expectCancelled(result);
}

private void expectCancelled(Promise<String> promise) {
try {
promise.get();
throw new IllegalStateException("expected activity failure");
} catch (CancellationException e) {
// good
}
}

private void expectFailureWithDetails(Promise<String> promise, String expectedDetails) {
try {
promise.get();
throw new IllegalStateException("expected activity failure");
} catch (ActivityFailureException e) {
if (!(e.getCause() instanceof ExceptionWithDetaills)) {
throw new IllegalStateException(
"Didn't receive correct cause, instead found this:", e.getCause());
}
String details = ((ExceptionWithDetaills) e.getCause()).details;
if (!expectedDetails.equals(details)) {
throw new IllegalStateException(
"Expected: '" + expectedDetails + "', got: '" + details + "'");
}
}
}

private void expectFailure(Runnable runnable) {
try {
runnable.run();
throw new IllegalStateException("expected activity failure");
} catch (ActivityFailureException e) {
// good
}
}

private void expectSuccess(String expected, Promise<String> actual) {
if (!expected.equals(actual.get())) {
throw new IllegalStateException("Expected: '" + expected + "', got: '" + actual + "'");
}
}
}

public static class ExceptionWithDetaills extends RuntimeException {
public String details;

public ExceptionWithDetaills() {}

public ExceptionWithDetaills(String details) {
this.details = details;
}
}
}