diff --git a/core/src/main/java/com/netflix/conductor/core/utils/ExternalPayloadStorageUtils.java b/core/src/main/java/com/netflix/conductor/core/utils/ExternalPayloadStorageUtils.java index 68c1c81ff2..faf191d916 100644 --- a/core/src/main/java/com/netflix/conductor/core/utils/ExternalPayloadStorageUtils.java +++ b/core/src/main/java/com/netflix/conductor/core/utils/ExternalPayloadStorageUtils.java @@ -192,6 +192,8 @@ public void verifyAndUpload(T entity, PayloadType payloadType) { } } catch (TransientException te) { throw te; + } catch (TerminateWorkflowException twe) { + throw twe; } catch (Exception e) { LOGGER.error( "Unable to upload payload to external storage for workflow: {}", workflowId, e); diff --git a/test-harness/src/test/groovy/com/netflix/conductor/test/integration/ExternalPayloadStorageSpec.groovy b/test-harness/src/test/groovy/com/netflix/conductor/test/integration/ExternalPayloadStorageSpec.groovy index dfa5394720..599adad48a 100644 --- a/test-harness/src/test/groovy/com/netflix/conductor/test/integration/ExternalPayloadStorageSpec.groovy +++ b/test-harness/src/test/groovy/com/netflix/conductor/test/integration/ExternalPayloadStorageSpec.groovy @@ -852,6 +852,104 @@ class ExternalPayloadStorageSpec extends AbstractSpecification { } } + def "Test fork join workflow exceed external storage limit should fail the task and workflow"() { + + given: "An existing fork join workflow definition" + metadataService.getWorkflowDef(FORK_JOIN_WF, 1) + + and: "input required to start large payload workflow" + def correlationId = 'fork_join_external_storage' + String workflowInputPath = uploadInitialWorkflowInput() + + when: "the workflow is started" + def workflowInstanceId = startWorkflow(FORK_JOIN_WF, 1, correlationId, null, workflowInputPath) + + then: "verify that the workflow is in a RUNNING state" + with(workflowExecutionService.getExecutionStatus(workflowInstanceId, true)) { + status == Workflow.WorkflowStatus.RUNNING + tasks.size() == 4 + tasks[0].status == Task.Status.COMPLETED + tasks[0].taskType == 'FORK' + tasks[1].status == Task.Status.SCHEDULED + tasks[1].taskType == 'integration_task_1' + tasks[2].status == Task.Status.SCHEDULED + tasks[2].taskType == 'integration_task_2' + tasks[3].status == Task.Status.IN_PROGRESS + tasks[3].taskType == 'JOIN' + } + + when: "the first task of the left fork is polled and completed" + def polledAndAckTask = workflowTestUtil.pollAndCompleteTask('integration_task_1', 'task1.integration.worker') + + then: "verify that the 'integration_task_1' was polled and acknowledged" + verifyPolledAndAcknowledgedTask(polledAndAckTask) + + and: "task is completed and the next task in the fork is scheduled" + with(workflowExecutionService.getExecutionStatus(workflowInstanceId, true)) { + status == Workflow.WorkflowStatus.RUNNING + tasks.size() == 5 + tasks[0].status == Task.Status.COMPLETED + tasks[0].taskType == 'FORK' + tasks[1].status == Task.Status.COMPLETED + tasks[1].taskType == 'integration_task_1' + tasks[2].status == Task.Status.SCHEDULED + tasks[2].taskType == 'integration_task_2' + tasks[3].status == Task.Status.IN_PROGRESS + tasks[3].taskType == 'JOIN' + tasks[4].status == Task.Status.SCHEDULED + tasks[4].taskType == 'integration_task_3' + } + + when: "the first task of the right fork is polled and completed with external payload storage" + String taskOutputPath = "${UUID.randomUUID()}.json" + mockExternalPayloadStorage.upload(taskOutputPath, mockExternalPayloadStorage.createLargePayload(500)) + def polledAndAckLargePayloadTask = workflowTestUtil.pollAndCompleteLargePayloadTask('integration_task_2', 'task2.integration.worker', taskOutputPath) + + then: "verify that the 'integration_task_2' was polled and acknowledged" + verifyPolledAndAcknowledgedLargePayloadTask(polledAndAckLargePayloadTask) + + and: "task is completed and the workflow is in running state" + with(workflowExecutionService.getExecutionStatus(workflowInstanceId, true)) { + status == Workflow.WorkflowStatus.RUNNING + tasks.size() == 5 + tasks[0].status == Task.Status.COMPLETED + tasks[0].taskType == 'FORK' + tasks[1].status == Task.Status.COMPLETED + tasks[1].taskType == 'integration_task_1' + tasks[2].status == Task.Status.COMPLETED + tasks[2].taskType == 'integration_task_2' + tasks[3].status == Task.Status.IN_PROGRESS + tasks[3].taskType == 'JOIN' + tasks[4].status == Task.Status.SCHEDULED + tasks[4].taskType == 'integration_task_3' + } + + when: "the second task of the left fork is polled and completed with external payload storage" + taskOutputPath = "${UUID.randomUUID()}.json" + mockExternalPayloadStorage.upload(taskOutputPath, mockExternalPayloadStorage.createLargePayload(500)) + polledAndAckLargePayloadTask = workflowTestUtil.pollAndCompleteLargePayloadTask('integration_task_3', 'task3.integration.worker', taskOutputPath) + + then: "verify that the 'integration_task_3' was polled and acknowledged" + verifyPolledAndAcknowledgedLargePayloadTask(polledAndAckLargePayloadTask) + + and: "task is completed and the join task is failed because of exceeding size limit" + Thread.sleep(5000L) + with(workflowExecutionService.getExecutionStatus(workflowInstanceId, true)) { + status == Workflow.WorkflowStatus.FAILED + tasks.size() == 5 + tasks[0].status == Task.Status.COMPLETED + tasks[0].taskType == 'FORK' + tasks[1].status == Task.Status.COMPLETED + tasks[1].taskType == 'integration_task_1' + tasks[2].status == Task.Status.COMPLETED + tasks[2].taskType == 'integration_task_2' + tasks[2].outputData.isEmpty() + tasks[3].status == Task.Status.FAILED_WITH_TERMINAL_ERROR + tasks[3].taskType == 'JOIN' + tasks[3].outputData.isEmpty() + } + } + private String uploadLargeTaskOutput() { String taskOutputPath = "${UUID.randomUUID()}.json" mockExternalPayloadStorage.upload(taskOutputPath, mockExternalPayloadStorage.readOutputDotJson(), 0) diff --git a/test-harness/src/test/java/com/netflix/conductor/test/utils/MockExternalPayloadStorage.java b/test-harness/src/test/java/com/netflix/conductor/test/utils/MockExternalPayloadStorage.java index e004c24369..f93baf6d25 100644 --- a/test-harness/src/test/java/com/netflix/conductor/test/utils/MockExternalPayloadStorage.java +++ b/test-harness/src/test/java/com/netflix/conductor/test/utils/MockExternalPayloadStorage.java @@ -168,4 +168,19 @@ public Map downloadPayload(String path) { } return new HashMap<>(); } + + public Map createLargePayload(int repeat) { + Map largePayload = new HashMap<>(); + try { + InputStream inputStream = readOutputDotJson(); + Map payload = objectMapper.readValue(inputStream, Map.class); + for (int i = 0; i < repeat; i++) { + largePayload.put(String.valueOf(i), payload); + } + } catch (IOException e) { + // just handle this exception here and return empty map so that test will fail in case + // this exception is thrown + } + return largePayload; + } }