Skip to content
This repository has been archived by the owner on Dec 13, 2023. It is now read-only.

Commit

Permalink
Add tests for updateTask
Browse files Browse the repository at this point in the history
  • Loading branch information
jxu-nflx committed Jun 3, 2022
1 parent 7b90c64 commit 824caaf
Show file tree
Hide file tree
Showing 2 changed files with 160 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import com.netflix.conductor.common.config.TestObjectMapperConfiguration;
import com.netflix.conductor.common.metadata.tasks.PollData;
import com.netflix.conductor.common.metadata.tasks.TaskDef;
import com.netflix.conductor.common.metadata.tasks.TaskResult;
import com.netflix.conductor.common.metadata.tasks.TaskType;
import com.netflix.conductor.common.metadata.workflow.RerunWorkflowRequest;
import com.netflix.conductor.common.metadata.workflow.WorkflowDef;
Expand Down Expand Up @@ -2365,6 +2366,87 @@ public void testRetryOptionalSubWorkflow() {
assertEquals(WorkflowModel.Status.COMPLETED, workflow.getStatus());
}

@Test
public void testUpdateTaskWithCallbackAfterSeconds() {
String workflowId = "test-workflow-id";
WorkflowModel workflow = new WorkflowModel();
workflow.setWorkflowId(workflowId);
workflow.setStatus(WorkflowModel.Status.RUNNING);
workflow.setWorkflowDefinition(new WorkflowDef());

TaskModel simpleTask = new TaskModel();
simpleTask.setTaskType(TaskType.SIMPLE.name());
simpleTask.setReferenceTaskName("simpleTask");
simpleTask.setWorkflowInstanceId(workflowId);
simpleTask.setScheduledTime(System.currentTimeMillis());
simpleTask.setCallbackAfterSeconds(0);
simpleTask.setTaskId("simple-task-id");
simpleTask.setStatus(TaskModel.Status.IN_PROGRESS);

workflow.getTasks().addAll(Arrays.asList(simpleTask));
when(executionDAOFacade.getWorkflowModel(workflowId, true)).thenReturn(workflow);
when(executionDAOFacade.getTaskModel(simpleTask.getTaskId())).thenReturn(simpleTask);

TaskResult taskResult = new TaskResult();
taskResult.setWorkflowInstanceId(workflowId);
taskResult.setTaskId(simpleTask.getTaskId());
taskResult.setWorkerId("test-worker-id");
taskResult.log("not ready yet");
taskResult.setCallbackAfterSeconds(300);
taskResult.setStatus(TaskResult.Status.IN_PROGRESS);

workflowExecutor.updateTask(taskResult);
verify(queueDAO, times(1)).postpone(anyString(), anyString(), any(), any());
ArgumentCaptor<TaskModel> argumentCaptor = ArgumentCaptor.forClass(TaskModel.class);
verify(executionDAOFacade, times(1)).updateTask(argumentCaptor.capture());
assertEquals(TaskModel.Status.SCHEDULED, argumentCaptor.getAllValues().get(0).getStatus());
assertEquals(
taskResult.getCallbackAfterSeconds(),
argumentCaptor.getAllValues().get(0).getCallbackAfterSeconds());
assertEquals(
taskResult.getWorkflowInstanceId(),
argumentCaptor.getAllValues().get(0).getWorkerId());
}

@Test
public void testUpdateTaskWithOutCallbackAfterSeconds() {
String workflowId = "test-workflow-id";
WorkflowModel workflow = new WorkflowModel();
workflow.setWorkflowId(workflowId);
workflow.setStatus(WorkflowModel.Status.RUNNING);
workflow.setWorkflowDefinition(new WorkflowDef());

TaskModel simpleTask = new TaskModel();
simpleTask.setTaskType(TaskType.SIMPLE.name());
simpleTask.setReferenceTaskName("simpleTask");
simpleTask.setWorkflowInstanceId(workflowId);
simpleTask.setScheduledTime(System.currentTimeMillis());
simpleTask.setCallbackAfterSeconds(0);
simpleTask.setTaskId("simple-task-id");
simpleTask.setStatus(TaskModel.Status.IN_PROGRESS);

workflow.getTasks().addAll(Arrays.asList(simpleTask));
when(executionDAOFacade.getWorkflowModel(workflowId, true)).thenReturn(workflow);
when(executionDAOFacade.getTaskModel(simpleTask.getTaskId())).thenReturn(simpleTask);

TaskResult taskResult = new TaskResult();
taskResult.setWorkflowInstanceId(workflowId);
taskResult.setTaskId(simpleTask.getTaskId());
taskResult.setWorkerId("test-worker-id");
taskResult.log("not ready yet");
taskResult.setStatus(TaskResult.Status.IN_PROGRESS);

workflowExecutor.updateTask(taskResult);
verify(queueDAO, times(1)).postpone(anyString(), anyString(), any(), any());
ArgumentCaptor<TaskModel> argumentCaptor = ArgumentCaptor.forClass(TaskModel.class);
verify(executionDAOFacade, times(1)).updateTask(argumentCaptor.capture());
assertEquals(TaskModel.Status.SCHEDULED, argumentCaptor.getAllValues().get(0).getStatus());
assertEquals(0, argumentCaptor.getAllValues().get(0).getCallbackAfterSeconds());
assertEquals(
taskResult.getWorkflowInstanceId(),
argumentCaptor.getAllValues().get(0).getWorkerId());
}

private WorkflowModel generateSampleWorkflow() {
// setup
WorkflowModel workflow = new WorkflowModel();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -951,4 +951,82 @@ class SimpleWorkflowSpec extends AbstractSpecification {
simpleWorkflowDefinition.restartable = true
metadataService.updateWorkflowDef(simpleWorkflowDefinition)
}

def "Test simple workflow when update task's result with call back after seconds"() {

given: "A new simple workflow is started"
def correlationId = 'integration_test_1'
def workflowInput = new HashMap()
workflowInput['param1'] = 'p1 value'
workflowInput['param2'] = 'p2 value'

when: "start a new workflow with the input"
def workflowInstanceId = workflowExecutor.startWorkflow(LINEAR_WORKFLOW_T1_T2, 1,
correlationId, workflowInput,
null, null, null)

then: "verify that the workflow is in running state and the task queue has an entry for the first task of the workflow"
workflowInstanceId
with(workflowExecutionService.getExecutionStatus(workflowInstanceId, true)) {
status == Workflow.WorkflowStatus.RUNNING
tasks.size() == 1
tasks[0].status == Task.Status.SCHEDULED
}
workflowExecutionService.getTaskQueueSizes(['integration_task_1']).get('integration_task_1') == 1

when: "the first task 'integration_task_1' is polled and then sent back with no callBack seconds"
def pollTaskTry1 = workflowExecutionService.poll('integration_task_1', 'task1.integration.worker')
pollTaskTry1.outputData['op'] = 'task1.in.progress'
pollTaskTry1.status = Task.Status.IN_PROGRESS
workflowExecutionService.updateTask(new TaskResult(pollTaskTry1))

then: "verify that the task is polled and acknowledged"
pollTaskTry1

and: "the input data of the data is as expected"
pollTaskTry1.inputData.containsKey('p1')
pollTaskTry1.inputData['p1'] == 'p1 value'
pollTaskTry1.inputData.containsKey('p2')
pollTaskTry1.inputData['p1'] == 'p1 value'

and: "the task gets put back into the queue of 'integration_task_1' immediately for future poll"
workflowExecutionService.getTaskQueueSizes(['integration_task_1']).get('integration_task_1') == 1

and: "The task in in SCHEDULED status with workerId reset"
with(workflowExecutionService.getExecutionStatus(workflowInstanceId, true)) {
status == Workflow.WorkflowStatus.RUNNING
tasks.size() == 1
tasks[0].taskType == 'integration_task_1'
tasks[0].status == Task.Status.SCHEDULED
tasks[0].callbackAfterSeconds == 0
}

when: "the 'integration_task_1' task is polled again"
def pollTaskTry2 = workflowExecutionService.poll('integration_task_1', 'task1.integration.worker')
pollTaskTry2.outputData['op'] = 'task1.in.progress'
pollTaskTry2.status = Task.Status.IN_PROGRESS
pollTaskTry2.callbackAfterSeconds = 3600
workflowExecutionService.updateTask(new TaskResult(pollTaskTry2))

then: "verify that the task is polled and acknowledged"
pollTaskTry2

and: "the task gets put back into the queue of 'integration_task_1' with callbackAfterSeconds delay for future poll"
workflowExecutionService.getTaskQueueSizes(['integration_task_1']).get('integration_task_1') == 1

and: "The task in in SCHEDULED status with workerId reset"
with(workflowExecutionService.getExecutionStatus(workflowInstanceId, true)) {
status == Workflow.WorkflowStatus.RUNNING
tasks.size() == 1
tasks[0].taskType == 'integration_task_1'
tasks[0].status == Task.Status.SCHEDULED
tasks[0].callbackAfterSeconds == pollTaskTry2.callbackAfterSeconds
}

when: "the 'integration_task_1' task is polled again"
def pollTaskTry3 = workflowExecutionService.poll('integration_task_1', 'task1.integration.worker')

then: "verify that there was no task polled"
!pollTaskTry3
}
}

0 comments on commit 824caaf

Please sign in to comment.