Skip to content

Commit

Permalink
task lease extend (Netflix#3186)
Browse files Browse the repository at this point in the history
* Add method to extend lease for task

* Client to do extend lease for long running tasks automatically

* Fix format

* Refactor
  • Loading branch information
jxu-nflx authored Aug 19, 2022
1 parent ae10047 commit dd7c338
Show file tree
Hide file tree
Showing 10 changed files with 214 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,11 @@ class TaskPollExecutor {
private static final String OVERRIDE_DISCOVERY = "pollOutOfDiscovery";
private static final String ALL_WORKERS = "all";

private static final int LEASE_EXTEND_RETRY_COUNT = 3;
private static final double LEASE_EXTEND_DURATION_FACTOR = 0.8;
private ScheduledExecutorService leaseExtendExecutorService;
Map<String /* ID of the task*/, ScheduledFuture<?>> leaseExtendMap = new HashMap<>();

TaskPollExecutor(
EurekaClient eurekaClient,
TaskClient taskClient,
Expand Down Expand Up @@ -97,6 +102,15 @@ class TaskPollExecutor {
.uncaughtExceptionHandler(uncaughtExceptionHandler)
.build());
ThreadPoolMonitor.attach(REGISTRY, (ThreadPoolExecutor) executorService, workerNamePrefix);

LOGGER.info("Initialized the task lease extend executor");
leaseExtendExecutorService =
Executors.newSingleThreadScheduledExecutor(
new BasicThreadFactory.Builder()
.namingPattern("workflow-lease-extend-%d")
.daemon(true)
.uncaughtExceptionHandler(uncaughtExceptionHandler)
.build());
}

void pollAndExecute(Worker worker) {
Expand Down Expand Up @@ -161,6 +175,20 @@ void pollAndExecute(Worker worker) {
CompletableFuture.supplyAsync(
() -> processTask(task, worker, pollingSemaphore), executorService);

if (task.getResponseTimeoutSeconds() > 0 && worker.leaseExtendEnabled()) {
ScheduledFuture<?> leaseExtendFuture =
leaseExtendExecutorService.scheduleWithFixedDelay(
extendLease(task, taskCompletableFuture),
Math.round(
task.getResponseTimeoutSeconds()
* LEASE_EXTEND_DURATION_FACTOR),
Math.round(
task.getResponseTimeoutSeconds()
* LEASE_EXTEND_DURATION_FACTOR),
TimeUnit.SECONDS);
leaseExtendMap.put(task.getTaskId(), leaseExtendFuture);
}

taskCompletableFuture.whenComplete(this::finalizeTask);
} else {
// no task was returned in the poll, release the permit
Expand All @@ -175,7 +203,13 @@ void pollAndExecute(Worker worker) {
}
}

void shutdownExecutorService(ExecutorService executorService, int timeout) {
void shutdown(int timeout) {
shutdownAndAwaitTermination(executorService, timeout);
shutdownAndAwaitTermination(leaseExtendExecutorService, timeout);
leaseExtendMap.clear();
}

void shutdownAndAwaitTermination(ExecutorService executorService, int timeout) {
try {
executorService.shutdown();
if (executorService.awaitTermination(timeout, TimeUnit.SECONDS)) {
Expand Down Expand Up @@ -272,6 +306,12 @@ private void finalizeTask(Task task, Throwable throwable) {
task.getTaskId(),
task.getTaskDefName(),
task.getStatus());
String taskId = task.getTaskId();
ScheduledFuture<?> leaseExtendFuture = leaseExtendMap.get(taskId);
if (leaseExtendFuture != null) {
leaseExtendFuture.cancel(true);
leaseExtendMap.remove(taskId);
}
}
}

Expand Down Expand Up @@ -357,4 +397,32 @@ private PollingSemaphore getPollingSemaphore(String taskType) {
return pollingSemaphoreMap.get(ALL_WORKERS);
}
}

private Runnable extendLease(Task task, CompletableFuture<Task> taskCompletableFuture) {
return () -> {
if (taskCompletableFuture.isDone()) {
LOGGER.warn(
"Task processing for {} completed, but its lease extend was not cancelled",
task.getTaskId());
return;
}
LOGGER.info("Attempting to extend lease for {}", task.getTaskId());
try {
TaskResult result = new TaskResult(task);
result.setExtendLease(true);
retryOperation(
(TaskResult taskResult) -> {
taskClient.updateTask(taskResult);
return null;
},
LEASE_EXTEND_RETRY_COUNT,
result,
"extend lease");
MetricsContainer.incrementTaskLeaseExtendCount(task.getTaskDefName(), 1);
} catch (Exception e) {
MetricsContainer.incrementTaskLeaseExtendErrorCount(task.getTaskDefName(), e);
LOGGER.error("Failed to extend lease for {}", task.getTaskId(), e);
}
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,8 @@ public synchronized void init() {
* shutdown of your worker, during process termination.
*/
public void shutdown() {
taskPollExecutor.shutdownExecutorService(
taskPollExecutor.shutdownAndAwaitTermination(
scheduledExecutorService, shutdownGracePeriodSeconds);
taskPollExecutor.shutdown(shutdownGracePeriodSeconds);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ public class MetricsContainer {
private static final String TASK_ACK_FAILED = "task_ack_failed";
private static final String TASK_ACK_ERROR = "task_ack_error";
private static final String TASK_UPDATE_ERROR = "task_update_error";
private static final String TASK_LEASE_EXTEND_ERROR = "task_lease_extend_error";
private static final String TASK_LEASE_EXTEND_COUNTER = "task_lease_extend_counter";
private static final String TASK_POLL_COUNTER = "task_poll_counter";
private static final String TASK_EXECUTE_TIME = "task_execute_time";
private static final String TASK_POLL_TIME = "task_poll_time";
Expand Down Expand Up @@ -156,6 +158,19 @@ public static void incrementTaskUpdateErrorCount(String taskType, Throwable t) {
TASK_UPDATE_ERROR, TASK_TYPE, taskType, EXCEPTION, t.getClass().getSimpleName());
}

public static void incrementTaskLeaseExtendErrorCount(String taskType, Throwable t) {
incrementCount(
TASK_LEASE_EXTEND_ERROR,
TASK_TYPE,
taskType,
EXCEPTION,
t.getClass().getSimpleName());
}

public static void incrementTaskLeaseExtendCount(String taskType, int taskCount) {
getCounter(TASK_LEASE_EXTEND_COUNTER, TASK_TYPE, taskType).increment(taskCount);
}

public static void incrementTaskPollCount(String taskType, int taskCount) {
getCounter(TASK_POLL_COUNTER, TASK_TYPE, taskType).increment(taskCount);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,10 @@ default int getPollingInterval() {
return PropertyFactory.getInteger(getTaskDefName(), "pollInterval", 1000);
}

default boolean leaseExtendEnabled() {
return PropertyFactory.getBoolean(getTaskDefName(), "leaseExtendEnabled", false);
}

static Worker create(String taskType, Function<Task, TaskResult> executor) {
return new Worker() {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,15 @@
import com.netflix.conductor.client.http.TaskClient;
import com.netflix.conductor.client.worker.Worker;
import com.netflix.conductor.common.metadata.tasks.Task;
import com.netflix.conductor.common.metadata.tasks.TaskExecLog;
import com.netflix.conductor.common.metadata.tasks.TaskResult;
import com.netflix.discovery.EurekaClient;

import static com.netflix.conductor.common.metadata.tasks.TaskResult.Status.IN_PROGRESS;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.*;

Expand Down Expand Up @@ -526,6 +528,45 @@ public void testTaskThreadCount() throws InterruptedException {
verify(taskClient).pollTask(TEST_TASK_DEF_NAME, workerName, null);
}

@Test
public void testTaskLeaseExtend() throws InterruptedException {
Task task = testTask();
task.setResponseTimeoutSeconds(1);

Worker worker = mock(Worker.class);
when(worker.getPollingInterval()).thenReturn(3000);
when(worker.getTaskDefName()).thenReturn("test");
when(worker.execute(any())).thenReturn(new TaskResult(task));
when(worker.leaseExtendEnabled()).thenReturn(true);

TaskClient taskClient = Mockito.mock(TaskClient.class);
when(taskClient.pollTask(any(), any(), any())).thenReturn(task);

TaskResult result = new TaskResult(task);
result.getLogs().add(new TaskExecLog("lease extend"));
result.setExtendLease(true);

TaskPollExecutor taskPollExecutor =
new TaskPollExecutor(
null, taskClient, 1, 1, new HashMap<>(), "test-worker-", new HashMap<>());
CountDownLatch latch = new CountDownLatch(1);
doAnswer(
invocation -> {
assertTrue(
taskPollExecutor.leaseExtendMap.containsKey(task.getTaskId()));
latch.countDown();
return null;
})
.when(taskClient)
.updateTask(any());

Executors.newSingleThreadScheduledExecutor()
.scheduleAtFixedRate(
() -> taskPollExecutor.pollAndExecute(worker), 0, 5, TimeUnit.SECONDS);

latch.await();
}

private Task testTask() {
Task task = new Task();
task.setTaskId(UUID.randomUUID().toString());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ public enum Status {

private String subWorkflowId;

private boolean extendLease;

public TaskResult(Task task) {
this.workflowInstanceId = task.getWorkflowInstanceId();
this.taskId = task.getTaskId();
Expand Down Expand Up @@ -254,6 +256,14 @@ public void setSubWorkflowId(String subWorkflowId) {
this.subWorkflowId = subWorkflowId;
}

public boolean isExtendLease() {
return extendLease;
}

public void setExtendLease(boolean extendLease) {
this.extendLease = extendLease;
}

@Override
public String toString() {
return "TaskResult{"
Expand Down Expand Up @@ -285,6 +295,9 @@ public String toString() {
+ ", subWorkflowId='"
+ subWorkflowId
+ '\''
+ ", extendLease='"
+ extendLease
+ '\''
+ '}';
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -509,6 +509,11 @@ public void removeTask(String taskId) {
executionDAO.removeTask(taskId);
}

public void extendLease(TaskModel taskModel) {
taskModel.setUpdateTime(System.currentTimeMillis());
executionDAO.updateTask(taskModel);
}

public List<PollData> getTaskPollData(String taskName) {
return pollDataDAO.getPollData(taskName);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1034,6 +1034,9 @@ public WorkflowModel terminateWorkflow(
public void updateTask(TaskResult taskResult) {
if (taskResult == null) {
throw new IllegalArgumentException("Task object is null");
} else if (taskResult.isExtendLease()) {
extendLease(taskResult);
return;
}

String workflowId = taskResult.getWorkflowInstanceId();
Expand Down Expand Up @@ -1192,6 +1195,34 @@ public void updateTask(TaskResult taskResult) {
}
}

private void extendLease(TaskResult taskResult) {
TaskModel task =
Optional.ofNullable(executionDAOFacade.getTaskModel(taskResult.getTaskId()))
.orElseThrow(
() ->
new NotFoundException(
"No such task found by id: %s",
taskResult.getTaskId()));

LOGGER.debug(
"Extend lease for Task: {} belonging to Workflow: {}",
task,
task.getWorkflowInstanceId());
if (!task.getStatus().isTerminal()) {
try {
executionDAOFacade.extendLease(task);
} catch (Exception e) {
String errorMsg =
String.format(
"Error extend lease for Task: %s belonging to Workflow: %s",
task.getTaskId(), task.getWorkflowInstanceId());
LOGGER.error(errorMsg, e);
Monitors.recordTaskExtendLeaseError(task.getTaskType(), task.getWorkflowType());
throw new TransientException(errorMsg, e);
}
}
}

/**
* Determines if a workflow can be lazily evaluated, if it meets any of these criteria
*
Expand Down
10 changes: 10 additions & 0 deletions core/src/main/java/com/netflix/conductor/metrics/Monitors.java
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,16 @@ public static void recordTaskUpdateError(String taskType, String workflowType) {
taskType);
}

public static void recordTaskExtendLeaseError(String taskType, String workflowType) {
counter(
classQualifier,
"task_extendLease_error",
"workflowName",
workflowType,
"taskType",
taskType);
}

public static void recordTaskQueueOpError(String taskType, String workflowType) {
counter(
classQualifier,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2529,6 +2529,30 @@ public void testIsLazyEvaluateWorkflow() {
assertFalse(workflowExecutor.isLazyEvaluateWorkflow(workflowDef, task));
}

@Test
public void testTaskExtendLease() {
TaskModel simpleTask = new TaskModel();
simpleTask.setTaskType(TaskType.SIMPLE.name());
simpleTask.setReferenceTaskName("simpleTask");
simpleTask.setWorkflowInstanceId("test-workflow-id");
simpleTask.setScheduledTime(System.currentTimeMillis());
simpleTask.setCallbackAfterSeconds(0);
simpleTask.setTaskId("simple-task-id");
simpleTask.setStatus(TaskModel.Status.IN_PROGRESS);
when(executionDAOFacade.getTaskModel(simpleTask.getTaskId())).thenReturn(simpleTask);

TaskResult taskResult = new TaskResult();
taskResult.setWorkflowInstanceId(simpleTask.getWorkflowInstanceId());
taskResult.setTaskId(simpleTask.getTaskId());
taskResult.log("extend lease");
taskResult.setExtendLease(true);

workflowExecutor.updateTask(taskResult);
verify(executionDAOFacade, times(1)).extendLease(simpleTask);
verify(queueDAO, times(0)).postpone(anyString(), anyString(), anyInt(), anyLong());
verify(executionDAOFacade, times(0)).updateTask(any());
}

private WorkflowModel generateSampleWorkflow() {
// setup
WorkflowModel workflow = new WorkflowModel();
Expand Down

0 comments on commit dd7c338

Please sign in to comment.