Skip to content
This repository has been archived by the owner on Dec 13, 2023. It is now read-only.

re-throw TerminateWorkflowException if payload exceed max threshold s… #3272

Merged
merged 1 commit into from
Oct 12, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,8 @@ public <T> void verifyAndUpload(T entity, PayloadType payloadType) {
}
} catch (TransientException te) {
throw te;
} catch (TerminateWorkflowException twe) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I presume that this fails the workflow, but the task whose payload is large will not be marked as FAILED. Should we try to fail the respective task?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will fail the task as well if the terminateworkflowexception is due to task exceed size limit:

task.setStatus(TaskModel.Status.FAILED_WITH_TERMINAL_ERROR);

There is a test case added verified that as well

throw twe;
} catch (Exception e) {
LOGGER.error(
"Unable to upload payload to external storage for workflow: {}", workflowId, e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -852,6 +852,104 @@ class ExternalPayloadStorageSpec extends AbstractSpecification {
}
}

def "Test fork join workflow exceed external storage limit should fail the task and workflow"() {

given: "An existing fork join workflow definition"
metadataService.getWorkflowDef(FORK_JOIN_WF, 1)

and: "input required to start large payload workflow"
def correlationId = 'fork_join_external_storage'
String workflowInputPath = uploadInitialWorkflowInput()

when: "the workflow is started"
def workflowInstanceId = startWorkflow(FORK_JOIN_WF, 1, correlationId, null, workflowInputPath)

then: "verify that the workflow is in a RUNNING state"
with(workflowExecutionService.getExecutionStatus(workflowInstanceId, true)) {
status == Workflow.WorkflowStatus.RUNNING
tasks.size() == 4
tasks[0].status == Task.Status.COMPLETED
tasks[0].taskType == 'FORK'
tasks[1].status == Task.Status.SCHEDULED
tasks[1].taskType == 'integration_task_1'
tasks[2].status == Task.Status.SCHEDULED
tasks[2].taskType == 'integration_task_2'
tasks[3].status == Task.Status.IN_PROGRESS
tasks[3].taskType == 'JOIN'
}

when: "the first task of the left fork is polled and completed"
def polledAndAckTask = workflowTestUtil.pollAndCompleteTask('integration_task_1', 'task1.integration.worker')

then: "verify that the 'integration_task_1' was polled and acknowledged"
verifyPolledAndAcknowledgedTask(polledAndAckTask)

and: "task is completed and the next task in the fork is scheduled"
with(workflowExecutionService.getExecutionStatus(workflowInstanceId, true)) {
status == Workflow.WorkflowStatus.RUNNING
tasks.size() == 5
tasks[0].status == Task.Status.COMPLETED
tasks[0].taskType == 'FORK'
tasks[1].status == Task.Status.COMPLETED
tasks[1].taskType == 'integration_task_1'
tasks[2].status == Task.Status.SCHEDULED
tasks[2].taskType == 'integration_task_2'
tasks[3].status == Task.Status.IN_PROGRESS
tasks[3].taskType == 'JOIN'
tasks[4].status == Task.Status.SCHEDULED
tasks[4].taskType == 'integration_task_3'
}

when: "the first task of the right fork is polled and completed with external payload storage"
String taskOutputPath = "${UUID.randomUUID()}.json"
mockExternalPayloadStorage.upload(taskOutputPath, mockExternalPayloadStorage.createLargePayload(500))
def polledAndAckLargePayloadTask = workflowTestUtil.pollAndCompleteLargePayloadTask('integration_task_2', 'task2.integration.worker', taskOutputPath)

then: "verify that the 'integration_task_2' was polled and acknowledged"
verifyPolledAndAcknowledgedLargePayloadTask(polledAndAckLargePayloadTask)

and: "task is completed and the workflow is in running state"
with(workflowExecutionService.getExecutionStatus(workflowInstanceId, true)) {
status == Workflow.WorkflowStatus.RUNNING
tasks.size() == 5
tasks[0].status == Task.Status.COMPLETED
tasks[0].taskType == 'FORK'
tasks[1].status == Task.Status.COMPLETED
tasks[1].taskType == 'integration_task_1'
tasks[2].status == Task.Status.COMPLETED
tasks[2].taskType == 'integration_task_2'
tasks[3].status == Task.Status.IN_PROGRESS
tasks[3].taskType == 'JOIN'
tasks[4].status == Task.Status.SCHEDULED
tasks[4].taskType == 'integration_task_3'
}

when: "the second task of the left fork is polled and completed with external payload storage"
taskOutputPath = "${UUID.randomUUID()}.json"
mockExternalPayloadStorage.upload(taskOutputPath, mockExternalPayloadStorage.createLargePayload(500))
polledAndAckLargePayloadTask = workflowTestUtil.pollAndCompleteLargePayloadTask('integration_task_3', 'task3.integration.worker', taskOutputPath)

then: "verify that the 'integration_task_3' was polled and acknowledged"
verifyPolledAndAcknowledgedLargePayloadTask(polledAndAckLargePayloadTask)

and: "task is completed and the join task is failed because of exceeding size limit"
Thread.sleep(5000L)
with(workflowExecutionService.getExecutionStatus(workflowInstanceId, true)) {
status == Workflow.WorkflowStatus.FAILED
tasks.size() == 5
tasks[0].status == Task.Status.COMPLETED
tasks[0].taskType == 'FORK'
tasks[1].status == Task.Status.COMPLETED
tasks[1].taskType == 'integration_task_1'
tasks[2].status == Task.Status.COMPLETED
tasks[2].taskType == 'integration_task_2'
tasks[2].outputData.isEmpty()
tasks[3].status == Task.Status.FAILED_WITH_TERMINAL_ERROR
tasks[3].taskType == 'JOIN'
tasks[3].outputData.isEmpty()
}
}

private String uploadLargeTaskOutput() {
String taskOutputPath = "${UUID.randomUUID()}.json"
mockExternalPayloadStorage.upload(taskOutputPath, mockExternalPayloadStorage.readOutputDotJson(), 0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,4 +168,19 @@ public Map<String, Object> downloadPayload(String path) {
}
return new HashMap<>();
}

public Map<String, Object> createLargePayload(int repeat) {
Map<String, Object> largePayload = new HashMap<>();
try {
InputStream inputStream = readOutputDotJson();
Map<String, Object> payload = objectMapper.readValue(inputStream, Map.class);
for (int i = 0; i < repeat; i++) {
largePayload.put(String.valueOf(i), payload);
}
} catch (IOException e) {
// just handle this exception here and return empty map so that test will fail in case
// this exception is thrown
}
return largePayload;
}
}