Skip to content

Commit

Permalink
feat(core): introduce DynamicTask for dynamic task generation for a w…
Browse files Browse the repository at this point in the history
…orker task
  • Loading branch information
tchiotludo committed Apr 12, 2022
1 parent 487f2a4 commit 95c863f
Show file tree
Hide file tree
Showing 14 changed files with 164 additions and 66 deletions.
9 changes: 9 additions & 0 deletions core/src/main/java/io/kestra/core/models/flows/State.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,15 @@ public State(Type state, State actual) {
this.histories.add(new History(this.current, Instant.now()));
}

public static State of(Type state, List<History> histories) {
State result = new State(state);

result.histories.removeIf(history -> true);
result.histories.addAll(histories);

return result;
}

public State withState(Type state) {
if (this.current == state) {
log.warn("Can't change state, already " + current);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,13 @@
@Getter
public class GraphCluster extends AbstractGraphTask {
@JsonIgnore
private Graph<AbstractGraphTask, Relation> graph = new Graph<>();
private final Graph<AbstractGraphTask, Relation> graph = new Graph<>();

@JsonIgnore
private GraphClusterRoot root;
private final GraphClusterRoot root;

@JsonIgnore
private GraphClusterEnd end;
private final GraphClusterEnd end;

public GraphCluster() {
super();
Expand All @@ -36,10 +36,4 @@ public GraphCluster(Task task, TaskRun taskRun, List<String> values, RelationTyp
graph.addNode(this.root);
graph.addNode(this.end);
}

public GraphCluster(GraphCluster graphTask, TaskRun taskRun, List<String> values) {
super(graphTask.getTask(), taskRun, values, graphTask.getRelationType());

this.graph = graphTask.graph;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package io.kestra.core.models.tasks;

public interface DynamicTask {

}
13 changes: 9 additions & 4 deletions core/src/main/java/io/kestra/core/runners/ExecutorService.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import io.kestra.core.models.executions.TaskRun;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.State;
import io.kestra.core.models.tasks.DynamicTask;
import io.kestra.core.models.tasks.FlowableTask;
import io.kestra.core.models.tasks.ResolvedTask;
import io.kestra.core.models.tasks.Task;
Expand Down Expand Up @@ -603,6 +604,13 @@ private Executor handleFlowTask(final Executor executor) {
}

public Execution addDynamicTaskRun(Execution execution, Flow flow, WorkerTaskResult workerTaskResult) throws InternalException {
ArrayList<TaskRun> taskRuns = new ArrayList<>(execution.getTaskRunList());

// declared dynamic tasks
if (workerTaskResult.getDynamicTaskRuns() != null) {
taskRuns.addAll(workerTaskResult.getDynamicTaskRuns());
}

// if parent, can be a Worker task that generate dynamic tasks
if (workerTaskResult.getTaskRun().getParentTaskRunId() != null) {
try {
Expand All @@ -612,15 +620,12 @@ public Execution addDynamicTaskRun(Execution execution, Flow flow, WorkerTaskRes
Task parentTask = flow.findTaskByTaskId(parentTaskRun.getTaskId());

if (parentTask instanceof Worker) {
ArrayList<TaskRun> taskRuns = new ArrayList<>(execution.getTaskRunList());
taskRuns.add(workerTaskResult.getTaskRun());

return execution.withTaskRunList(taskRuns);
}
}
}

return null;
return taskRuns.size() > execution.getTaskRunList().size() ? execution.withTaskRunList(taskRuns) : null;
}

public boolean canBePurged(final Executor executor) {
Expand Down
20 changes: 16 additions & 4 deletions core/src/main/java/io/kestra/core/runners/RunContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,17 +38,21 @@
public class RunContext {
private final static ObjectMapper MAPPER = JacksonMapper.ofJson();

private VariableRenderer variableRenderer;
// Injected
private ApplicationContext applicationContext;
private VariableRenderer variableRenderer;
private StorageInterface storageInterface;
private String envPrefix;
private MetricRegistry meterRegistry;
private Path tempBasedPath;

private URI storageOutputPrefix;
private URI storageExecutionPrefix;
private String envPrefix;
private Map<String, Object> variables;
private List<AbstractMetricEntry<?>> metrics = new ArrayList<>();
private MetricRegistry meterRegistry;
private RunContextLogger runContextLogger;
private Path tempBasedPath;
private final List<WorkerTaskResult> dynamicWorkerTaskResult = new ArrayList<>();

protected transient Path temporaryDirectory;

/**
Expand Down Expand Up @@ -572,6 +576,14 @@ private String metricPrefix() {
return String.join(".", values);
}

public void dynamicWorkerResult(List<WorkerTaskResult> workerTaskResults) {
dynamicWorkerTaskResult.addAll(workerTaskResults);
}

public List<WorkerTaskResult> dynamicWorkerResults() {
return dynamicWorkerTaskResult;
}

public synchronized Path tempDir() {
return this.tempDir(true);
}
Expand Down
9 changes: 6 additions & 3 deletions core/src/main/java/io/kestra/core/runners/Worker.java
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,10 @@ private WorkerTaskResult run(WorkerTask workerTask, Boolean cleanUp) throws Queu
)
.get(() -> this.runAttempt(current.get()));

// save dynamic WorkerResults since cleanUpTransient will remove them
List<WorkerTaskResult> dynamicWorkerResults = finalWorkerTask.getRunContext().dynamicWorkerResults();

// remove tmp directory
if (cleanUp) {
finalWorkerTask.getRunContext().cleanup();
}
Expand Down Expand Up @@ -259,15 +263,15 @@ private WorkerTaskResult run(WorkerTask workerTask, Boolean cleanUp) throws Queu
// So we just tryed to failed the status of the worker task, in this case, no log can't be happend, just
// changing status must work in order to finish current task (except if we are near the upper bound size).
try {
WorkerTaskResult workerTaskResult = new WorkerTaskResult(finalWorkerTask);
WorkerTaskResult workerTaskResult = new WorkerTaskResult(finalWorkerTask, dynamicWorkerResults);
this.workerTaskResultQueue.emit(workerTaskResult);
return workerTaskResult;
} catch (QueueException e) {
finalWorkerTask = workerTask
.withTaskRun(workerTask.getTaskRun()
.withState(State.Type.FAILED)
);
WorkerTaskResult workerTaskResult = new WorkerTaskResult(finalWorkerTask);
WorkerTaskResult workerTaskResult = new WorkerTaskResult(finalWorkerTask, dynamicWorkerResults);
this.workerTaskResultQueue.emit(workerTaskResult);
return workerTaskResult;
} finally {
Expand Down Expand Up @@ -382,7 +386,6 @@ private List<TaskRunAttempt> addAttempt(WorkerTask workerTask, TaskRunAttempt ta
.build();
}

@SuppressWarnings("UnstableApiUsage")
public AtomicInteger getMetricRunningCount(WorkerTask workerTask) {
String[] tags = this.metricRegistry.tags(workerTask);
Arrays.sort(tags);
Expand Down
19 changes: 19 additions & 0 deletions core/src/main/java/io/kestra/core/runners/WorkerTaskResult.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@
import lombok.Builder;
import lombok.Value;

import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
import javax.validation.constraints.NotNull;

@Value
Expand All @@ -14,7 +17,23 @@ public class WorkerTaskResult {
@NotNull
TaskRun taskRun;

List<TaskRun> dynamicTaskRuns;

public WorkerTaskResult(TaskRun taskRun) {
this.taskRun = taskRun;
this.dynamicTaskRuns = new ArrayList<>();
}

public WorkerTaskResult(WorkerTask workerTask) {
this.taskRun = workerTask.getTaskRun();
this.dynamicTaskRuns = new ArrayList<>();
}

public WorkerTaskResult(WorkerTask workerTask, List<WorkerTaskResult> dynamicWorkerResults) {
this.taskRun = workerTask.getTaskRun();
this.dynamicTaskRuns = dynamicWorkerResults
.stream()
.map(WorkerTaskResult::getTaskRun)
.collect(Collectors.toList());
}
}
95 changes: 63 additions & 32 deletions core/src/main/java/io/kestra/core/services/ExecutionService.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,22 @@
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.State;
import io.kestra.core.models.hierarchies.GraphCluster;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.repositories.FlowRepositoryInterface;
import io.kestra.core.tasks.flows.Worker;
import io.kestra.core.utils.IdUtils;
import io.micronaut.context.ApplicationContext;
import io.micronaut.core.annotation.Nullable;

import java.util.*;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;

import static io.kestra.core.utils.Rethrow.throwFunction;
import static io.kestra.core.utils.Rethrow.throwPredicate;

@Singleton
public class ExecutionService {
Expand All @@ -37,19 +41,12 @@ public Execution restart(final Execution execution, @Nullable Integer revision)

final Flow flow = flowRepositoryInterface.findByExecution(execution);

Set<String> taskRunToRestart = this.taskRunWithAncestors(
Set<String> taskRunToRestart = this.taskRunToRestart(
execution,
execution
.getTaskRunList()
.stream()
.filter(taskRun -> taskRun.getState().getCurrent().isFailed())
.collect(Collectors.toList())
flow,
taskRun -> taskRun.getState().getCurrent().isFailed()
);

if (taskRunToRestart.size() == 0) {
throw new IllegalArgumentException("No failed task found to restart execution from !");
}

Map<String, String> mappingTaskRunId = this.mapTaskRunId(execution, revision == null);
final String newExecutionId = revision != null ? IdUtils.create() : null;

Expand All @@ -66,6 +63,10 @@ public Execution restart(final Execution execution, @Nullable Integer revision)
))
.collect(Collectors.toList());

// Worker task, we need to remove all child in order to be restarted
this.removeWorkerTask(flow, execution, taskRunToRestart, mappingTaskRunId)
.forEach(r -> newTaskRuns.removeIf(taskRun -> taskRun.getId().equals(r)));

// Build and launch new execution
Execution newExecution = execution
.childExecution(
Expand All @@ -77,6 +78,25 @@ public Execution restart(final Execution execution, @Nullable Integer revision)
return revision != null ? newExecution.withFlowRevision(revision) : newExecution;
}

private Set<String> taskRunToRestart(Execution execution, Flow flow, Predicate<TaskRun> predicate) throws InternalException {
// Original tasks to be restarted
Set<String> finalTaskRunToRestart = this
.taskRunWithAncestors(
execution,
execution
.getTaskRunList()
.stream()
.filter(predicate)
.collect(Collectors.toList())
);

if (finalTaskRunToRestart.size() == 0) {
throw new IllegalArgumentException("No task found to restart execution from!");
}

return finalTaskRunToRestart;
}

public Execution replay(final Execution execution, String taskRunId, @Nullable Integer revision) throws Exception {
if (!execution.getState().isTerninated()) {
throw new IllegalStateException("Execution must be terminated to be restarted, " +
Expand All @@ -87,19 +107,12 @@ public Execution replay(final Execution execution, String taskRunId, @Nullable I
final Flow flow = flowRepositoryInterface.findByExecution(execution);
GraphCluster graphCluster = GraphService.of(flow, execution);

Set<String> taskRunToRestart = this.taskRunWithAncestors(
Set<String> taskRunToRestart = this.taskRunToRestart(
execution,
execution
.getTaskRunList()
.stream()
.filter(taskRun -> taskRun.getId().equals(taskRunId))
.collect(Collectors.toList())
flow,
taskRun -> taskRun.getId().equals(taskRunId)
);

if (taskRunToRestart.size() == 0) {
throw new IllegalArgumentException("No task found to restart execution from !");
}

Map<String, String> mappingTaskRunId = this.mapTaskRunId(execution, false);
final String newExecutionId = IdUtils.create();

Expand All @@ -116,6 +129,7 @@ public Execution replay(final Execution execution, String taskRunId, @Nullable I
))
.collect(Collectors.toList());

// remove all child for replay task id
Set<String> taskRunToRemove = GraphService.successors(graphCluster, List.of(taskRunId))
.stream()
.filter(task -> task.getTaskRun() != null)
Expand All @@ -127,6 +141,10 @@ public Execution replay(final Execution execution, String taskRunId, @Nullable I
taskRunToRemove
.forEach(r -> newTaskRuns.removeIf(taskRun -> taskRun.getId().equals(r)));

// Worker task, we need to remove all child in order to be restarted
this.removeWorkerTask(flow, execution, taskRunToRestart, mappingTaskRunId)
.forEach(r -> newTaskRuns.removeIf(taskRun -> taskRun.getId().equals(r)));

// Build and launch new execution
Execution newExecution = execution.childExecution(
newExecutionId,
Expand All @@ -146,19 +164,12 @@ public Execution markAs(final Execution execution, String taskRunId, State.Type

final Flow flow = flowRepositoryInterface.findByExecution(execution);

Set<String> taskRunToRestart = this.taskRunWithAncestors(
Set<String> taskRunToRestart = this.taskRunToRestart(
execution,
execution
.getTaskRunList()
.stream()
.filter(taskRun -> taskRun.getId().equals(taskRunId))
.collect(Collectors.toList())
flow,
taskRun -> taskRun.getId().equals(taskRunId)
);

if (taskRunToRestart.size() == 0) {
throw new IllegalArgumentException("No task found to restart execution from !");
}

Execution newExecution = execution;

for (String s : taskRunToRestart) {
Expand All @@ -184,6 +195,26 @@ public Execution markAs(final Execution execution, String taskRunId, State.Type
.withState(State.Type.RESTARTED);
}

private Set<String> removeWorkerTask(Flow flow, Execution execution, Set<String> taskRunToRestart, Map<String, String> mappingTaskRunId) throws InternalException {
Set<String> workerTaskRunId = taskRunToRestart
.stream()
.filter(throwPredicate(s -> {
TaskRun taskRun = execution.findTaskRunByTaskRunId(s);
Task task = flow.findTaskByTaskId(taskRun.getTaskId());
return (task instanceof Worker);
}))
.collect(Collectors.toSet());

GraphCluster graphCluster = GraphService.of(flow, execution);

return GraphService.successors(graphCluster, new ArrayList<>(workerTaskRunId))
.stream()
.filter(task -> task.getTaskRun() != null)
.filter(s -> !workerTaskRunId.contains(s.getTaskRun().getId()))
.map(s -> mappingTaskRunId.get(s.getTaskRun().getId()))
.collect(Collectors.toSet());
}

private Set<String> getAncestors(Execution execution, TaskRun taskRun) {
return Stream
.concat(
Expand Down Expand Up @@ -215,10 +246,10 @@ private TaskRun mapTaskRun(
State.Type newStateType,
Boolean toRestart
) throws InternalException {
boolean isFlowable = flow.findTaskByTaskId(originalTaskRun.getTaskId()).isFlowable();
Task task = flow.findTaskByTaskId(originalTaskRun.getTaskId());

State alterState;
if (!isFlowable) {
if (!task.isFlowable() || task instanceof Worker) {
// The current task run is the reference task run, its default state will be newState
alterState = originalTaskRun.withState(newStateType).getState();
}
Expand Down
Loading

0 comments on commit 95c863f

Please sign in to comment.