Skip to content
This repository has been archived by the owner on Dec 13, 2023. It is now read-only.

Fix rerunWorkflow places synchronous system tasks in the queue #2494

Merged
merged 3 commits into from
Oct 6, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.springframework.stereotype.Component;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -58,7 +59,7 @@ public JsonJqTransform(ObjectMapper objectMapper) {
@Override
public void start(Workflow workflow, Task task, WorkflowExecutor executor) {
final Map<String, Object> taskInput = task.getInputData();
final Map<String, Object> taskOutput = task.getOutputData();
final Map<String, Object> taskOutput = task.getOutputData() != null ? task.getOutputData(): new HashMap<>();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

getOutputData() does not return null. Can you please explain the need for a null check here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The outputData field is reset to null here: https://github.com/Netflix/conductor/blob/main/core/src/main/java/com/netflix/conductor/core/execution/WorkflowExecutor.java#L1642 I guess we could update that line to just clear the map instead of setting it null, but feel there might be other cases that I might neglected if I changed that. So I added the null check for outputData in the system task instead. And that's the only sync system task that need to add the null check

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It makes sense to clear the outputData or set it to an empty HashMap during rerun. If you look at the Task, the outputData field is defaulted to an empty map. So it's natural for the rest of the code to expect a non-null value from Task.getOutputData(). DoWhile also expects a non-null output data and would fail if null is returned.

Also, the fewer nulls we introduce, the less chance of an unexpected NullPointerException.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ya updated it to clear the outputData instead of setting to null


final String queryExpression = (String) taskInput.get(QUERY_EXPRESSION_PARAMETER);

Expand Down Expand Up @@ -87,6 +88,7 @@ public void start(Workflow workflow, Task task, WorkflowExecutor executor) {
taskOutput.put(OUTPUT_RESULT, result.get(0));
taskOutput.put(OUTPUT_RESULT_LIST, result);
}
task.setOutputData(taskOutput);
} catch (final Exception e) {
LOGGER.error("Error executing task: {} in workflow: {}", task.getTaskId(), workflow.getWorkflowId(), e);
task.setStatus(Task.Status.FAILED);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1648,12 +1648,19 @@ private boolean rerunWF(String workflowId, String taskId, Map<String, Object> ta
rerunFromTask.setStatus(IN_PROGRESS);
rerunFromTask.setStartTime(System.currentTimeMillis());
} else {
// Set the task to rerun as SCHEDULED
rerunFromTask.setStatus(SCHEDULED);
if (taskInput != null) {
rerunFromTask.setInputData(taskInput);
}
addTaskToQueue(rerunFromTask);
if (systemTaskRegistry.isSystemTask(rerunFromTask.getTaskType()) &&
!systemTaskRegistry.get(rerunFromTask.getTaskType()).isAsync()) {
// Start the synchronized system task directly
deciderService.populateTaskData(rerunFromTask);
systemTaskRegistry.get(rerunFromTask.getTaskType()).start(workflow, rerunFromTask, this);
} else {
// Set the task to rerun as SCHEDULED
rerunFromTask.setStatus(SCHEDULED);
addTaskToQueue(rerunFromTask);
}
}
executionDAOFacade.updateTask(rerunFromTask);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@
import static com.netflix.conductor.common.metadata.tasks.TaskType.SIMPLE;
import static com.netflix.conductor.common.metadata.tasks.TaskType.SUB_WORKFLOW;
import static com.netflix.conductor.common.metadata.tasks.TaskType.SWITCH;
import static com.netflix.conductor.common.metadata.tasks.TaskType.TASK_TYPE_JSON_JQ_TRANSFORM;
import static com.netflix.conductor.common.metadata.tasks.TaskType.TASK_TYPE_LAMBDA;
import static com.netflix.conductor.common.metadata.tasks.TaskType.TASK_TYPE_SUB_WORKFLOW;
import static com.netflix.conductor.common.metadata.tasks.TaskType.TASK_TYPE_WAIT;
Expand Down Expand Up @@ -180,6 +181,21 @@ public WorkflowSystemTask http2() {
return new WorkflowSystemTaskStub("HTTP2");
}

@Bean(TASK_TYPE_JSON_JQ_TRANSFORM)
public WorkflowSystemTask jsonBean() {
return new WorkflowSystemTaskStub("JSON_JQ_TRANSFORM") {
@Override
public boolean isAsync() {
return false;
}

@Override
public void start(Workflow workflow, Task task, WorkflowExecutor executor) {
task.setStatus(Task.Status.COMPLETED);
}
};
}

@Bean
public SystemTaskRegistry systemTaskRegistry(Set<WorkflowSystemTask> tasks) {
return new SystemTaskRegistry(tasks);
Expand Down Expand Up @@ -1324,6 +1340,59 @@ public void testRerunWorkflowWithTaskId() {
assertEquals(new HashSet<>(), workflow.getFailedReferenceTaskNames());
}

@Test
public void testRerunWorkflowWithSyncSystemTaskId() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍🏼

//setup
String workflowId = IDGenerator.generate();

Task task1 = new Task();
task1.setTaskType(TaskType.SIMPLE.name());
task1.setTaskDefName("task1");
task1.setReferenceTaskName("task1_ref");
task1.setWorkflowInstanceId(workflowId);
task1.setScheduledTime(System.currentTimeMillis());
task1.setTaskId(IDGenerator.generate());
task1.setStatus(Status.COMPLETED);
task1.setWorkflowTask(new WorkflowTask());
task1.setOutputData(new HashMap<>());

Task task2 = new Task();
task2.setTaskType(TaskType.JSON_JQ_TRANSFORM.name());
task2.setReferenceTaskName("task2_ref");
task2.setWorkflowInstanceId(workflowId);
task2.setScheduledTime(System.currentTimeMillis());
task2.setTaskId("system-task-id");
task2.setStatus(Status.FAILED);

Workflow workflow = new Workflow();
workflow.setWorkflowId(workflowId);
WorkflowDef workflowDef = new WorkflowDef();
workflowDef.setName("workflow");
workflowDef.setVersion(1);
workflow.setWorkflowDefinition(workflowDef);
workflow.setOwnerApp("junit_testRerunWorkflowId");
workflow.setStatus(WorkflowStatus.FAILED);
workflow.setReasonForIncompletion("task2 failed");
workflow.setFailedReferenceTaskNames(new HashSet<String>() {{
add("task2_ref");
}});
workflow.getTasks().addAll(Arrays.asList(task1, task2));
//end of setup

//when:
when(executionDAOFacade.getWorkflowById(workflow.getWorkflowId(), true)).thenReturn(workflow);
RerunWorkflowRequest rerunWorkflowRequest = new RerunWorkflowRequest();
rerunWorkflowRequest.setReRunFromWorkflowId(workflow.getWorkflowId());
rerunWorkflowRequest.setReRunFromTaskId(task2.getTaskId());
workflowExecutor.rerun(rerunWorkflowRequest);

//then:
assertEquals(Status.COMPLETED, task2.getStatus());
assertEquals(Workflow.WorkflowStatus.RUNNING, workflow.getStatus());
assertNull(workflow.getReasonForIncompletion());
assertEquals(new HashSet<>(), workflow.getFailedReferenceTaskNames());
}

@Test
public void testRerunSubWorkflowWithTaskId() {
//setup
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
package com.netflix.conductor.test.integration

import com.netflix.conductor.common.metadata.tasks.Task
import com.netflix.conductor.common.metadata.workflow.RerunWorkflowRequest
import com.netflix.conductor.common.run.Workflow
import com.netflix.conductor.test.base.AbstractSpecification
import spock.lang.Shared
Expand Down Expand Up @@ -84,4 +85,66 @@ class JsonJQTransformSpec extends AbstractSpecification {
tasks[0].reasonForIncompletion as String == "Cannot index string with string \"array\""
}
}

/**
* Given the following invalid input JSON
*{* "in1": "a",
* "in2": "b"
*}* using the same query from the success test, jq will try to get in1['array']
* and fail since 'in1' is a string.
*
* Re-run failed system task with the following valid input JSON will fix the workflow
*{* "in1": {* "array": [ "a", "b" ]
*},
* "in2": {* "array": [ "c", "d" ]
*}*}* expect the workflow task to transform to following result:
*{* out: [ "a", "b", "c", "d" ]
*}
*/
def "Test rerun workflow with failed json jq transform task"() {
given: "workflow input"
def invalidInput = new HashMap()
invalidInput['in1'] = "a"
invalidInput['in2'] = "b"

def validInput = new HashMap()
def input = new HashMap()
input['in1'] = new HashMap()
input['in1']['array'] = ["a", "b"]
input['in2'] = new HashMap()
input['in2']['array'] = ["c", "d"]
validInput['input'] = input
validInput['queryExpression'] = '.input as $_ | { out: ($_.in1.array + $_.in2.array) }'

when: "workflow which has the json jq transform task started"
def workflowInstanceId = workflowExecutor.startWorkflow(JSON_JQ_TRANSFORM_WF, 1,
'', invalidInput, null, null, null)

then: "verify that the workflow and task failed with expected error"
with(workflowExecutionService.getExecutionStatus(workflowInstanceId, true)) {
status == Workflow.WorkflowStatus.FAILED
tasks.size() == 1
tasks[0].status == Task.Status.FAILED
tasks[0].taskType == 'JSON_JQ_TRANSFORM'
tasks[0].reasonForIncompletion as String == "Cannot index string with string \"array\""
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The as String may not be necessary. Also, using single quotes ' ensures that Groovy uses a String and not a GString.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, updated!

}

when: "workflow which has the json jq transform task reran"
def reRunWorkflowRequest = new RerunWorkflowRequest()
reRunWorkflowRequest.reRunFromWorkflowId = workflowInstanceId
def reRunTaskId = workflowExecutionService.getExecutionStatus(workflowInstanceId, true).tasks[0].taskId
reRunWorkflowRequest.reRunFromTaskId = reRunTaskId
reRunWorkflowRequest.taskInput = validInput

workflowExecutor.rerun(reRunWorkflowRequest)

then: "verify that the workflow and task are completed with expected output"
with(workflowExecutionService.getExecutionStatus(workflowInstanceId, true)) {
status == Workflow.WorkflowStatus.COMPLETED
tasks.size() == 1
tasks[0].status == Task.Status.COMPLETED
tasks[0].taskType == 'JSON_JQ_TRANSFORM'
tasks[0].outputData as String == "[result:[out:[a, b, c, d]], resultList:[[out:[a, b, c, d]]]]"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same as above

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated as well, the as String is necessary for outputData though as it's a map, so kept it, but used single quotes for the match string instead

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I missed that it's a map here. In that case, it makes sense to assert the keys of the map rather than converting it to s String and asserting that. The assertion will fail, if the toString() logic in Map changes, its unlikely, but possible.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can rely on Java's implementation of toSTring() of the Map. It's kind of cumbersome to write a loop through the map to assert each key/value pair. What do you think?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Verifying the contents of a Map using its toString() is not ideal. It's also harder to understand and refactor. The goal of the test is not to verify if jackson-jq is doing its work, so you could

  1. write a simpler jq expression that is easy to verify or
  2. remove the outputData assertion altogether or
  3. assert that output.result != null and output.resultList != null

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make sense, didn't realize the outputData result is JsonNode. yea, the purpose of the test isn't for those kind of checks. Refactored to only do basic assertions.

}
}
}