Skip to content
This repository has been archived by the owner on Dec 13, 2023. It is now read-only.

Enhance WAIT task to support time-based wait scenarios #2986

Merged
merged 14 commits into from
May 31, 2022
Original file line number Diff line number Diff line change
Expand Up @@ -12,29 +12,90 @@
*/
package com.netflix.conductor.core.execution.tasks;

import java.text.ParseException;
import java.time.Duration;
import java.util.Date;
import java.util.Optional;

import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Component;

import com.netflix.conductor.core.execution.WorkflowExecutor;
import com.netflix.conductor.model.TaskModel;
import com.netflix.conductor.model.WorkflowModel;

import static com.netflix.conductor.common.metadata.tasks.TaskType.TASK_TYPE_WAIT;
import static com.netflix.conductor.model.TaskModel.Status.IN_PROGRESS;
import static com.netflix.conductor.core.utils.DateTimeUtils.parseDate;
import static com.netflix.conductor.core.utils.DateTimeUtils.parseDuration;
import static com.netflix.conductor.model.TaskModel.Status.*;

@Component(TASK_TYPE_WAIT)
public class Wait extends WorkflowSystemTask {

public static final String DURATION_INPUT = "duration";
public static final String UNTIL_INPUT = "until";

public Wait() {
super(TASK_TYPE_WAIT);
}

@Override
public void start(WorkflowModel workflow, TaskModel task, WorkflowExecutor workflowExecutor) {

String duration =
Optional.ofNullable(task.getInputData().get(DURATION_INPUT)).orElse("").toString();
String until =
Optional.ofNullable(task.getInputData().get(UNTIL_INPUT)).orElse("").toString();

if (StringUtils.isNotBlank(duration) && StringUtils.isNotBlank(until)) {
task.setReasonForIncompletion(
"Both 'duration' and 'until' specified. Please provide only one input");
task.setStatus(FAILED_WITH_TERMINAL_ERROR);
return;
}

if (StringUtils.isNotBlank(duration)) {

Duration timeDuration = parseDuration(duration);
long waitTimeout = System.currentTimeMillis() + (timeDuration.getSeconds() * 1000);
task.setWaitTimeout(waitTimeout);

long seconds = timeDuration.getSeconds();
task.setCallbackAfterSeconds(seconds);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The callbackAfterSeconds is moot since WAIT is a synchronous task and is not added to a queue.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated and removed.

} else if (StringUtils.isNotBlank(until)) {
try {
Date expiryDate = parseDate(until);
long timeInMS = expiryDate.getTime();
long now = System.currentTimeMillis();
long seconds = (timeInMS - now) / 1000;
task.setWaitTimeout(timeInMS);

} catch (ParseException parseException) {
task.setReasonForIncompletion(
"Invalid/Unsupported Wait Until format. Provided: " + until);
task.setStatus(FAILED_WITH_TERMINAL_ERROR);
}
}
task.setStatus(IN_PROGRESS);
}

@Override
public void cancel(WorkflowModel workflow, TaskModel task, WorkflowExecutor workflowExecutor) {
task.setStatus(TaskModel.Status.CANCELED);
}

@Override
public boolean execute(
WorkflowModel workflow, TaskModel task, WorkflowExecutor workflowExecutor) {
long timeOut = task.getWaitTimeout();
if (timeOut == 0) {
return false;
}
if (System.currentTimeMillis() > timeOut) {
task.setStatus(COMPLETED);
return true;
}

return false;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Copyright 2022 Netflix, Inc.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package com.netflix.conductor.core.utils;

import java.text.ParseException;
import java.time.Duration;
import java.util.Date;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import org.apache.commons.lang3.time.DateUtils;

public class DateTimeUtils {

private static final String[] patterns =
new String[] {"yyyy-MM-dd HH:mm", "yyyy-MM-dd HH:mm z", "yyyy-MM-dd"};

public static Duration parseDuration(String text) {
Matcher m =
Pattern.compile(
"\\s*(?:(\\d+)\\s*(?:days?|d))?"
+ "\\s*(?:(\\d+)\\s*(?:hours?|hrs?|h))?"
+ "\\s*(?:(\\d+)\\s*(?:minutes?|mins?|m))?"
+ "\\s*(?:(\\d+)\\s*(?:seconds?|secs?|s))?"
+ "\\s*",
Pattern.CASE_INSENSITIVE)
.matcher(text);
if (!m.matches()) throw new IllegalArgumentException("Not valid duration: " + text);

int days = (m.start(1) == -1 ? 0 : Integer.parseInt(m.group(1)));
int hours = (m.start(2) == -1 ? 0 : Integer.parseInt(m.group(2)));
int mins = (m.start(3) == -1 ? 0 : Integer.parseInt(m.group(3)));
int secs = (m.start(4) == -1 ? 0 : Integer.parseInt(m.group(4)));
return Duration.ofSeconds((days * 86400) + (hours * 60L + mins) * 60L + secs);
}

public static Date parseDate(String date) throws ParseException {
return DateUtils.parseDate(date, patterns);
}
}
16 changes: 16 additions & 0 deletions core/src/main/java/com/netflix/conductor/model/TaskModel.java
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,9 @@ public boolean isRetriable() {

private String subWorkflowId;

// Timeout after which the wait task should be marked as completed
private long waitTimeout;

/**
* Used to note that a sub workflow associated with SUB_WORKFLOW task has an action performed on
* it directly.
Expand Down Expand Up @@ -557,6 +560,14 @@ public boolean isLoopOverTask() {
return iteration > 0;
}

public long getWaitTimeout() {
return waitTimeout;
}

public void setWaitTimeout(long waitTimeout) {
this.waitTimeout = waitTimeout;
}

/**
* @return the queueWaitTime
*/
Expand Down Expand Up @@ -675,6 +686,9 @@ public String toString() {
+ ", domain='"
+ domain
+ '\''
+ ", waitTimeout='"
+ waitTimeout
+ '\''
+ ", inputMessage="
+ inputMessage
+ ", outputMessage="
Expand Down Expand Up @@ -742,6 +756,7 @@ && getStatus() == taskModel.getStatus()
&& Objects.equals(getTaskId(), taskModel.getTaskId())
&& Objects.equals(getReasonForIncompletion(), taskModel.getReasonForIncompletion())
&& Objects.equals(getWorkerId(), taskModel.getWorkerId())
&& Objects.equals(getWaitTimeout(), taskModel.getWaitTimeout())
&& Objects.equals(getOutputData(), taskModel.getOutputData())
&& Objects.equals(getWorkflowTask(), taskModel.getWorkflowTask())
&& Objects.equals(getDomain(), taskModel.getDomain())
Expand Down Expand Up @@ -786,6 +801,7 @@ public int hashCode() {
getReasonForIncompletion(),
getCallbackAfterSeconds(),
getWorkerId(),
getWaitTimeout(),
getOutputData(),
getWorkflowTask(),
getDomain(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,17 @@
import javax.validation.ConstraintValidatorContext;
import javax.validation.Payload;

import org.apache.commons.lang3.StringUtils;

import com.netflix.conductor.common.metadata.tasks.TaskDef;
import com.netflix.conductor.common.metadata.tasks.TaskType;
import com.netflix.conductor.common.metadata.workflow.WorkflowTask;
import com.netflix.conductor.core.utils.DateTimeUtils;

import static com.netflix.conductor.core.execution.tasks.Terminate.getTerminationStatusParameter;
import static com.netflix.conductor.core.execution.tasks.Terminate.validateInputStatus;
import static com.netflix.conductor.core.execution.tasks.Wait.DURATION_INPUT;
import static com.netflix.conductor.core.execution.tasks.Wait.UNTIL_INPUT;

import static java.lang.annotation.ElementType.ANNOTATION_TYPE;
import static java.lang.annotation.ElementType.TYPE;
Expand Down Expand Up @@ -102,6 +107,9 @@ public boolean isValid(WorkflowTask workflowTask, ConstraintValidatorContext con
case TaskType.TASK_TYPE_JSON_JQ_TRANSFORM:
valid = isJSONJQTransformTaskValid(workflowTask, context);
break;
case TaskType.TASK_TYPE_WAIT:
valid = isWaitTaskValid(workflowTask, context);
break;
}

return valid;
Expand Down Expand Up @@ -245,6 +253,40 @@ private boolean isDynamicTaskValid(
return valid;
}

private boolean isWaitTaskValid(
WorkflowTask workflowTask, ConstraintValidatorContext context) {
boolean valid = true;
String duration =
Optional.ofNullable(workflowTask.getInputParameters().get(DURATION_INPUT))
.orElse("")
.toString();
String until =
Optional.ofNullable(workflowTask.getInputParameters().get(UNTIL_INPUT))
.orElse("")
.toString();

if (StringUtils.isNotBlank(duration) && StringUtils.isNotBlank(until)) {
String message =
"Both 'duration' and 'until' specified. Please provide only one input";
context.buildConstraintViolationWithTemplate(message).addConstraintViolation();
valid = false;
}

try {
if (StringUtils.isNotBlank(duration)) {
DateTimeUtils.parseDuration(duration);
} else if (StringUtils.isNotBlank(until)) {
DateTimeUtils.parseDate(until);
}
} catch (Exception e) {
String message = "Wait time specified is invalid. The duration must be in ";
context.buildConstraintViolationWithTemplate(message).addConstraintViolation();
valid = false;
}

return valid;
}

private boolean isDynamicForkJoinValid(
WorkflowTask workflowTask, ConstraintValidatorContext context) {
boolean valid = true;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
/*
* Copyright 2022 Netflix, Inc.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package com.netflix.conductor.core.execution.tasks;

import java.text.ParseException;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Date;

import org.apache.commons.lang3.time.DateUtils;
import org.junit.Test;

import com.netflix.conductor.model.TaskModel;
import com.netflix.conductor.model.WorkflowModel;

import static org.junit.Assert.*;

public class TestWait {

private final Wait wait = new Wait();

@Test
public void testWaitForever() {

TaskModel task = new TaskModel();
task.setStatus(TaskModel.Status.SCHEDULED);
WorkflowModel model = new WorkflowModel();

wait.start(model, task, null);
assertEquals(TaskModel.Status.IN_PROGRESS, task.getStatus());
assertTrue(task.getOutputData().isEmpty());
}

@Test
public void testWaitUntil() throws ParseException {
String dateFormat = "yyyy-MM-dd HH:mm";

WorkflowModel model = new WorkflowModel();

TaskModel task = new TaskModel();
task.setStatus(TaskModel.Status.SCHEDULED);

DateTimeFormatter formatter = DateTimeFormatter.ofPattern(dateFormat);
LocalDateTime now = LocalDateTime.now();
String formatted = formatter.format(now);
System.out.println(formatted);

task.getInputData().put(Wait.UNTIL_INPUT, formatted);
Date parsed = DateUtils.parseDate(formatted, dateFormat);

wait.start(model, task, null);
assertEquals(TaskModel.Status.IN_PROGRESS, task.getStatus());
assertEquals(parsed.getTime(), task.getWaitTimeout());

// Execute runs when checking if the task has completed
boolean updated = wait.execute(model, task, null);
assertTrue(updated);
assertEquals(TaskModel.Status.COMPLETED, task.getStatus());
}

@Test
public void testWaitDuration() throws ParseException {
WorkflowModel model = new WorkflowModel();

TaskModel task = new TaskModel();
task.setStatus(TaskModel.Status.SCHEDULED);

task.getInputData().put(Wait.DURATION_INPUT, "1s");
wait.start(model, task, null);
long now = System.currentTimeMillis();

assertEquals(TaskModel.Status.IN_PROGRESS, task.getStatus());
assertEquals(now + 1000, task.getWaitTimeout());

try {
Thread.sleep(2_000);
} catch (InterruptedException e) {
}

// Execute runs when checking if the task has completed
boolean updated = wait.execute(model, task, null);
assertTrue(updated);
assertEquals(TaskModel.Status.COMPLETED, task.getStatus());
}

@Test
public void testInvalidWaitConfig() throws ParseException {
WorkflowModel model = new WorkflowModel();

TaskModel task = new TaskModel();
task.setStatus(TaskModel.Status.SCHEDULED);

task.getInputData().put(Wait.DURATION_INPUT, "1s");
task.getInputData().put(Wait.UNTIL_INPUT, "2022-12-12");
wait.start(model, task, null);
assertEquals(TaskModel.Status.FAILED_WITH_TERMINAL_ERROR, task.getStatus());
assertTrue(!task.getReasonForIncompletion().isEmpty());
}
}
Loading