Skip to content
This repository has been archived by the owner on Dec 13, 2023. It is now read-only.

Commit

Permalink
do not synchronously eval workflow during update task for select crit…
Browse files Browse the repository at this point in the history
…eria (#3146)

* do not synchronously eval workflow under select criteria

* expedite lazy evaluation with higher priority
  • Loading branch information
apanicker-nflx authored Aug 5, 2022
1 parent 8459632 commit 0b85b70
Show file tree
Hide file tree
Showing 10 changed files with 183 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,7 @@
import com.netflix.conductor.core.WorkflowContext;
import com.netflix.conductor.core.config.ConductorProperties;
import com.netflix.conductor.core.dal.ExecutionDAOFacade;
import com.netflix.conductor.core.exception.ConflictException;
import com.netflix.conductor.core.exception.NonTransientException;
import com.netflix.conductor.core.exception.NotFoundException;
import com.netflix.conductor.core.exception.TerminateWorkflowException;
import com.netflix.conductor.core.exception.TransientException;
import com.netflix.conductor.core.exception.*;
import com.netflix.conductor.core.execution.tasks.SystemTaskRegistry;
import com.netflix.conductor.core.execution.tasks.Terminate;
import com.netflix.conductor.core.execution.tasks.WorkflowSystemTask;
Expand All @@ -64,7 +60,7 @@
public class WorkflowExecutor {

private static final Logger LOGGER = LoggerFactory.getLogger(WorkflowExecutor.class);
private static final int PARENT_WF_PRIORITY = 10;
private static final int EXPEDITED_PRIORITY = 10;

private final MetadataDAO metadataDAO;
private final QueueDAO queueDAO;
Expand Down Expand Up @@ -640,7 +636,7 @@ private void updateAndPushParents(WorkflowModel workflow, String operation) {
parentWorkflow.setStatus(WorkflowModel.Status.RUNNING);
parentWorkflow.setLastRetriedTime(System.currentTimeMillis());
executionDAOFacade.updateWorkflow(parentWorkflow);
pushParentWorkflow(parentWorkflowId);
expediteLazyWorkflowEvaluation(parentWorkflowId);

workflow = parentWorkflow;
}
Expand Down Expand Up @@ -878,7 +874,7 @@ WorkflowModel completeWorkflow(WorkflowModel workflow) {
workflow.toShortString(),
workflow.getParentWorkflowId(),
workflow.getParentWorkflowTaskId());
pushParentWorkflow(workflow.getParentWorkflowId());
expediteLazyWorkflowEvaluation(workflow.getParentWorkflowId());
}

executionLockService.releaseLock(workflow.getWorkflowId());
Expand Down Expand Up @@ -965,7 +961,7 @@ public WorkflowModel terminateWorkflow(
workflow.toShortString(),
workflow.getParentWorkflowId(),
workflow.getParentWorkflowTaskId());
pushParentWorkflow(workflow.getParentWorkflowId());
expediteLazyWorkflowEvaluation(workflow.getParentWorkflowId());
}

if (!StringUtils.isBlank(failureWorkflow)) {
Expand Down Expand Up @@ -1193,7 +1189,52 @@ public void updateTask(TaskResult taskResult) {
task.getTaskDefName(), lastDuration, false, task.getStatus());
}

_decide(workflowId);
// sync evaluate workflow only if the task is not within a forked branch
if (isLazyEvaluateWorkflow(workflowInstance.getWorkflowDefinition(), task)) {
expediteLazyWorkflowEvaluation(workflowId);
} else {
_decide(workflowId);
}
}

/**
* Determines if a workflow can be lazily evaluated, if it meets any of these criteria
*
* <ul>
* <li>The task is NOT a loop task within DO_WHILE
* <li>The task is one of the intermediate tasks in a branch within a FORK_JOIN
* <li>The task is forked from a FORK_JOIN_DYNAMIC
* </ul>
*
* @param workflowDef The workflow definition of the workflow for which evaluation decision is
* to be made
* @param task The task which is attempting to trigger the evaluation
* @return true if workflow can be lazily evaluated, false otherwise
*/
@VisibleForTesting
boolean isLazyEvaluateWorkflow(WorkflowDef workflowDef, TaskModel task) {
if (task.isLoopOverTask()) {
return false;
}

String taskRefName = task.getReferenceTaskName();
List<WorkflowTask> workflowTasks = workflowDef.collectTasks();

List<WorkflowTask> forkTasks =
workflowTasks.stream()
.filter(t -> t.getType().equals(TaskType.FORK_JOIN.name()))
.collect(Collectors.toList());

List<WorkflowTask> joinTasks =
workflowTasks.stream()
.filter(t -> t.getType().equals(TaskType.JOIN.name()))
.collect(Collectors.toList());

if (forkTasks.stream().anyMatch(fork -> fork.has(taskRefName))) {
return joinTasks.stream().anyMatch(join -> join.getJoinOn().contains(taskRefName));
}

return workflowTasks.stream().noneMatch(t -> t.getTaskReferenceName().equals(taskRefName));
}

public TaskModel getTask(String taskId) {
Expand Down Expand Up @@ -1966,14 +2007,18 @@ private void executeSubworkflowTaskAndSyncData(
subWorkflowSystemTask.execute(subWorkflow, subWorkflowTask, this);
}

/** Pushes parent workflow id into the decider queue with a priority. */
private void pushParentWorkflow(String parentWorkflowId) {
if (queueDAO.containsMessage(DECIDER_QUEUE, parentWorkflowId)) {
queueDAO.postpone(DECIDER_QUEUE, parentWorkflowId, PARENT_WF_PRIORITY, 0);
/**
* Pushes workflow id into the decider queue with a higher priority to expedite evaluation.
*
* @param workflowId The workflow to be evaluated at higher priority
*/
private void expediteLazyWorkflowEvaluation(String workflowId) {
if (queueDAO.containsMessage(DECIDER_QUEUE, workflowId)) {
queueDAO.postpone(DECIDER_QUEUE, workflowId, EXPEDITED_PRIORITY, 0);
} else {
queueDAO.push(DECIDER_QUEUE, parentWorkflowId, PARENT_WF_PRIORITY, 0);
queueDAO.push(DECIDER_QUEUE, workflowId, EXPEDITED_PRIORITY, 0);
}

LOGGER.info("Pushed parent workflow {} to {}", parentWorkflowId, DECIDER_QUEUE);
LOGGER.info("Pushed workflow {} to {} for expedited evaluation", workflowId, DECIDER_QUEUE);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,10 @@ public static void recordGauge(String name, long count) {
gauge(classQualifier, name, count);
}

public static void recordCounter(String name, long count, String... additionalTags) {
getCounter(classQualifier, name, additionalTags).increment(count);
}

public static void recordQueueWaitTime(String taskType, long queueWaitTime) {
getTimer(classQualifier, "task_queue_wait", "taskType", taskType)
.record(queueWaitTime, TimeUnit.MILLISECONDS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2446,6 +2446,75 @@ public void testUpdateTaskWithOutCallbackAfterSeconds() {
argumentCaptor.getAllValues().get(0).getWorkflowInstanceId());
}

@Test
public void testIsLazyEvaluateWorkflow() {
// setup
WorkflowDef workflowDef = new WorkflowDef();
workflowDef.setName("lazyEvaluate");
workflowDef.setVersion(1);

WorkflowTask simpleTask = new WorkflowTask();
simpleTask.setType(SIMPLE.name());
simpleTask.setName("simple");
simpleTask.setTaskReferenceName("simple");

WorkflowTask forkTask = new WorkflowTask();
forkTask.setType(FORK_JOIN.name());
forkTask.setName("fork");
forkTask.setTaskReferenceName("fork");

WorkflowTask branchTask1 = new WorkflowTask();
branchTask1.setType(SIMPLE.name());
branchTask1.setName("branchTask1");
branchTask1.setTaskReferenceName("branchTask1");

WorkflowTask branchTask2 = new WorkflowTask();
branchTask2.setType(SIMPLE.name());
branchTask2.setName("branchTask2");
branchTask2.setTaskReferenceName("branchTask2");

forkTask.getForkTasks().add(Arrays.asList(branchTask1, branchTask2));

WorkflowTask joinTask = new WorkflowTask();
joinTask.setType(JOIN.name());
joinTask.setName("join");
joinTask.setTaskReferenceName("join");
joinTask.setJoinOn(List.of("branchTask2"));

WorkflowTask doWhile = new WorkflowTask();
doWhile.setType(DO_WHILE.name());
doWhile.setName("doWhile");
doWhile.setTaskReferenceName("doWhile");

WorkflowTask loopTask = new WorkflowTask();
loopTask.setType(SIMPLE.name());
loopTask.setName("loopTask");
loopTask.setTaskReferenceName("loopTask");

doWhile.setLoopOver(List.of(loopTask));

workflowDef.getTasks().addAll(List.of(simpleTask, forkTask, joinTask, doWhile));

TaskModel task = new TaskModel();

// when:
task.setReferenceTaskName("dynamic");
assertTrue(workflowExecutor.isLazyEvaluateWorkflow(workflowDef, task));

task.setReferenceTaskName("branchTask1");
assertFalse(workflowExecutor.isLazyEvaluateWorkflow(workflowDef, task));

task.setReferenceTaskName("branchTask2");
assertTrue(workflowExecutor.isLazyEvaluateWorkflow(workflowDef, task));

task.setReferenceTaskName("simple");
assertFalse(workflowExecutor.isLazyEvaluateWorkflow(workflowDef, task));

task.setReferenceTaskName("loopTask__1");
task.setIteration(1);
assertFalse(workflowExecutor.isLazyEvaluateWorkflow(workflowDef, task));
}

private WorkflowModel generateSampleWorkflow() {
// setup
WorkflowModel workflow = new WorkflowModel();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,9 @@ class DecisionTaskSpec extends AbstractSpecification {
when: "the task 'integration_task_20' is polled and completed"
def polledAndCompletedTask20Try1 = workflowTestUtil.pollAndCompleteTask('integration_task_20', 'task1.integration.worker')

and: "the workflow is evaluated"
sweep(workflowInstanceId)

then: "verify that the task is completed and acknowledged"
verifyPolledAndAcknowledgedTask(polledAndCompletedTask20Try1)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1045,6 +1045,9 @@ class DoWhileSpec extends AbstractSpecification {
when: "Polling and completing third task"
Tuple polledAndCompletedTask2 = workflowTestUtil.pollAndCompleteTask('integration_task_2', 'integration.test.worker')

and: "the workflow is evaluated"
sweep(workflowInstanceId)

then: "Verify that the task was polled and acknowledged and workflow is in completed state"
verifyPolledAndAcknowledgedTask(polledAndCompletedTask2)
with(workflowExecutionService.getExecutionStatus(workflowInstanceId, true)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,9 @@ class DynamicForkJoinSpec extends AbstractSpecification {
def pollAndCompleteTask3Try = workflowTestUtil.pollAndCompleteTask('integration_task_3', 'task3.worker',
['ok1': 'ov1'])

and: "workflow is evaluated by the reconciler"
sweep(workflowInstanceId)

then: "verify that the tasks were polled and acknowledged"
workflowTestUtil.verifyPolledAndAcknowledgedTask(pollAndCompleteTask2Try, ['k1': 'v1'])
workflowTestUtil.verifyPolledAndAcknowledgedTask(pollAndCompleteTask3Try, ['k2': 'v2'])
Expand Down Expand Up @@ -194,6 +197,9 @@ class DynamicForkJoinSpec extends AbstractSpecification {
when: "Poll and fail 'integration_task_2'"
def pollAndCompleteTask2Try = workflowTestUtil.pollAndFailTask('integration_task_2', 'task2.worker', 'it is a failure..')

and: "workflow is evaluated by the reconciler"
sweep(workflowInstanceId)

then: "verify that the tasks were polled and acknowledged"
workflowTestUtil.verifyPolledAndAcknowledgedTask(pollAndCompleteTask2Try, ['k1': 'v1'])

Expand Down Expand Up @@ -275,6 +281,9 @@ class DynamicForkJoinSpec extends AbstractSpecification {
when: "Poll and fail 'integration_task_2'"
def pollAndCompleteTask2Try1 = workflowTestUtil.pollAndFailTask('integration_task_2', 'task2.worker', 'it is a failure..')

and: "workflow is evaluated by the reconciler"
sweep(workflowInstanceId)

then: "verify that the tasks were polled and acknowledged"
workflowTestUtil.verifyPolledAndAcknowledgedTask(pollAndCompleteTask2Try1, ['k1': 'v1'])

Expand Down Expand Up @@ -319,6 +328,9 @@ class DynamicForkJoinSpec extends AbstractSpecification {
def pollAndCompleteTask3Try1 = workflowTestUtil.pollAndCompleteTask('integration_task_3', 'task3.worker',
['ok1': 'ov1'])

and: "workflow is evaluated by the reconciler"
sweep(workflowInstanceId)

then: "verify that the tasks were polled and acknowledged"
workflowTestUtil.verifyPolledAndAcknowledgedTask(pollAndCompleteTask2Try2, ['k1': 'v1'])
workflowTestUtil.verifyPolledAndAcknowledgedTask(pollAndCompleteTask3Try1, ['k2': 'v2'])
Expand Down Expand Up @@ -727,6 +739,9 @@ class DynamicForkJoinSpec extends AbstractSpecification {
def pollAndCompleteTask2Try = workflowTestUtil.pollAndCompleteTask('integration_task_2', 'task2.worker')
def pollAndCompleteTask3Try = workflowTestUtil.pollAndCompleteTask('integration_task_3', 'task3.worker')

and: "workflow is evaluated by the reconciler"
sweep(workflowInstanceId)

then: "verify that the tasks were polled and acknowledged"
workflowTestUtil.verifyPolledAndAcknowledgedTask(pollAndCompleteTask2Try, ['k1': 'v1'])
workflowTestUtil.verifyPolledAndAcknowledgedTask(pollAndCompleteTask3Try, ['k2': 'v2'])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,9 @@ class ExternalPayloadStorageSpec extends AbstractSpecification {
when: "the second task of the left fork is polled and completed with external payload storage"
polledAndAckLargePayloadTask = workflowTestUtil.pollAndCompleteLargePayloadTask('integration_task_3', 'task3.integration.worker', taskOutputPath)

and: "the workflow is evaluated"
sweep(workflowInstanceId)

then: "verify that the 'integration_task_3' was polled and acknowledged"
verifyPolledAndAcknowledgedLargePayloadTask(polledAndAckLargePayloadTask)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,9 @@ class ForkJoinSpec extends AbstractSpecification {
when: "The other node of the fork is completed by completing 'integration_task_2'"
def polledAndAckTask2Try1 = workflowTestUtil.pollAndCompleteTask('integration_task_2', 'task1.worker')

and: "workflow is evaluated"
sweep(workflowInstanceId)

then: "verify that the 'integration_task_2' was polled and acknowledged"
workflowTestUtil.verifyPolledAndAcknowledgedTask(polledAndAckTask2Try1)

Expand Down Expand Up @@ -215,6 +218,9 @@ class ForkJoinSpec extends AbstractSpecification {
def polledAndAckTask2Try1 = workflowTestUtil.pollAndFailTask('integration_task_2',
'task1.worker', 'Failed....')

and: "workflow is evaluated"
sweep(workflowInstanceId)

then: "verify that the 'integration_task_2' was polled and acknowledged"
workflowTestUtil.verifyPolledAndAcknowledgedTask(polledAndAckTask2Try1)

Expand Down Expand Up @@ -282,6 +288,9 @@ class ForkJoinSpec extends AbstractSpecification {
def polledAndAckTask2Try1 = workflowTestUtil.pollAndFailTask('integration_task_0_RT_2',
'task1.worker', 'Failed....')

and: "workflow is evaluated"
sweep(workflowInstanceId)

then: "verify that the 'integration_task_0_RT_1' was polled and acknowledged"
workflowTestUtil.verifyPolledAndAcknowledgedTask(polledAndAckTask2Try1)

Expand Down Expand Up @@ -326,10 +335,12 @@ class ForkJoinSpec extends AbstractSpecification {
then: "verify that the 'integration_task_3' was polled and acknowledged"
workflowTestUtil.verifyPolledAndAcknowledgedTask(polledAndAckTask3Try1)


when: "The other node of the fork is completed by completing 'integration_task_0_RT_2'"
def polledAndAckTask2Try2 = workflowTestUtil.pollAndCompleteTask('integration_task_0_RT_2', 'task1.worker')

and: "workflow is evaluated"
sweep(workflowInstanceId)

then: "verify that the 'integration_task_2' was polled and acknowledged"
workflowTestUtil.verifyPolledAndAcknowledgedTask(polledAndAckTask2Try2)

Expand Down Expand Up @@ -399,7 +410,6 @@ class ForkJoinSpec extends AbstractSpecification {
def polledAndAckTask12Try1 = workflowTestUtil.pollAndCompleteTask('integration_task_12', 'task12.worker')
def polledAndAckTask13Try1 = workflowTestUtil.pollAndCompleteTask('integration_task_13', 'task13.worker')


then: "verify that tasks 'integration_task_11', 'integration_task_12' and 'integration_task_13' were polled and acknowledged"
workflowTestUtil.verifyPolledAndAcknowledgedTask(polledAndAckTask11Try1)
workflowTestUtil.verifyPolledAndAcknowledgedTask(polledAndAckTask12Try1)
Expand Down Expand Up @@ -486,6 +496,9 @@ class ForkJoinSpec extends AbstractSpecification {
when: "Poll and Complete tasks: 'integration_task_20'"
def polledAndAckTask20Try1 = workflowTestUtil.pollAndCompleteTask('integration_task_20', 'task20.worker')

and: "workflow is evaluated"
sweep(workflowInstanceId)

then: "verify that tasks 'integration_task_20'polled and acknowledged"
workflowTestUtil.verifyPolledAndAcknowledgedTask(polledAndAckTask20Try1)

Expand Down Expand Up @@ -700,6 +713,9 @@ class ForkJoinSpec extends AbstractSpecification {
when: "poll and complete the 'integration_task_20'"
def polledAndAckTask20Try1 = workflowTestUtil.pollAndCompleteTask('integration_task_20', 'task20.worker')

and: "workflow is evaluated"
sweep(workflowInstanceId)

then: "verify that the task was polled and acknowledged"
workflowTestUtil.verifyPolledAndAcknowledgedTask(polledAndAckTask20Try1)

Expand Down Expand Up @@ -1022,6 +1038,9 @@ class ForkJoinSpec extends AbstractSpecification {
when: "the simple task is polled and completed"
def polledAndCompletedSimpleTask = workflowTestUtil.pollAndCompleteTask('integration_task_2', 'task2.worker')

and: "workflow is evaluated"
sweep(workflowInstanceId)

then: "verify that the task was polled and acknowledged"
workflowTestUtil.verifyPolledAndAcknowledgedTask(polledAndCompletedSimpleTask)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,9 @@ class SwitchTaskSpec extends AbstractSpecification {
when: "the task 'integration_task_20' is polled and completed"
def polledAndCompletedTask20Try1 = workflowTestUtil.pollAndCompleteTask('integration_task_20', 'task1.integration.worker')

and: "the workflow is evaluated"
sweep(workflowInstanceId)

then: "verify that the task is completed and acknowledged"
verifyPolledAndAcknowledgedTask(polledAndCompletedTask20Try1)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@
"taskReferenceName": "thejoin",
"type": "JOIN",
"joinOn": [
"basicJavaA",
"test_terminate_subworkflow",
"basicJavaB"
]
}
Expand Down

0 comments on commit 0b85b70

Please sign in to comment.