Skip to content

Commit

Permalink
feat(*): schedule an execution on a fixed date
Browse files Browse the repository at this point in the history
  • Loading branch information
loicmathieu authored and MilosPaunovic committed Sep 19, 2024
1 parent 48b3f0b commit 6166b47
Show file tree
Hide file tree
Showing 30 changed files with 905 additions and 140 deletions.
18 changes: 14 additions & 4 deletions core/src/main/java/io/kestra/core/models/executions/Execution.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import lombok.extern.slf4j.Slf4j;

import java.time.Instant;
import java.time.ZonedDateTime;
import java.util.*;
import java.util.function.BiFunction;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -96,6 +97,10 @@ public class Execution implements DeletedInterface, TenantInterface {
@With
ExecutionMetadata metadata;

@With
@Nullable
Instant scheduleDate;

/**
* Factory method for constructing a new {@link Execution} object for the given {@link Flow} and inputs.
*
Expand All @@ -106,14 +111,16 @@ public class Execution implements DeletedInterface, TenantInterface {
*/
public static Execution newExecution(final Flow flow,
final BiFunction<Flow, Execution, Map<String, Object>> inputs,
final List<Label> labels) {
final List<Label> labels,
final Optional<ZonedDateTime> scheduleDate) {
Execution execution = builder()
.id(IdUtils.create())
.tenantId(flow.getTenantId())
.namespace(flow.getNamespace())
.flowId(flow.getId())
.flowRevision(flow.getRevision())
.state(new State())
.scheduleDate(scheduleDate.map(date -> date.toInstant()).orElse(null))
.build();

if (inputs != null) {
Expand Down Expand Up @@ -172,7 +179,8 @@ public Execution withState(State.Type state) {
this.originalId,
this.trigger,
this.deleted,
this.metadata
this.metadata,
this.scheduleDate
);
}

Expand Down Expand Up @@ -205,7 +213,8 @@ public Execution withTaskRun(TaskRun taskRun) throws InternalException {
this.originalId,
this.trigger,
this.deleted,
this.metadata
this.metadata,
this.scheduleDate
);
}

Expand All @@ -226,7 +235,8 @@ public Execution childExecution(String childExecutionId, List<TaskRun> taskRunLi
this.originalId,
this.trigger,
this.deleted,
this.metadata
this.metadata,
this.scheduleDate
);
}

Expand Down
2 changes: 1 addition & 1 deletion core/src/main/java/io/kestra/core/models/flows/State.java
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ public boolean isRestartable() {

@JsonIgnore
public boolean isResumable() {
return this.current.isPaused() || this.current.isRetrying();
return this.current.isPaused() || this.current.isRetrying() || this.current.isCreated();
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,23 @@ public interface PollingTriggerInterface extends WorkerTriggerInterface {
@PluginProperty
Duration getInterval();

/**
* Evaluate the trigger and create an execution if needed.
*/
Optional<Execution> evaluate(ConditionContext conditionContext, TriggerContext context) throws Exception;

/**
* Compute the next evaluation date of the trigger based on the existing trigger context: by default, it uses the current date and the interval.
* Schedulable triggers must override this method.
*/
default ZonedDateTime nextEvaluationDate(ConditionContext conditionContext, Optional<? extends TriggerContext> last) throws Exception {
return ZonedDateTime.now().plus(this.getInterval());
}

/**
* Compute the next evaluation date of the trigger: by default, it uses the current date and the interval.
* Schedulable triggers must override this method as it's used to init them when there is no evaluation date.
*/
default ZonedDateTime nextEvaluationDate() {
return ZonedDateTime.now().plus(this.getInterval());
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package io.kestra.core.models.triggers;

public enum RecoverMissedSchedules {
LAST,
NONE,
ALL
}
29 changes: 29 additions & 0 deletions core/src/main/java/io/kestra/core/models/triggers/Schedulable.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package io.kestra.core.models.triggers;

import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.models.conditions.ConditionContext;
import io.kestra.core.runners.RunContext;

import java.time.ZonedDateTime;

public interface Schedulable extends PollingTriggerInterface{
String PLUGIN_PROPERTY_RECOVER_MISSED_SCHEDULES = "recoverMissedSchedules";

/**
* Compute the previous evaluation of a trigger.
* This is used when a trigger misses some schedule to compute the next date to evaluate in the past.
*/
ZonedDateTime previousEvaluationDate(ConditionContext conditionContext) throws IllegalVariableEvaluationException;

RecoverMissedSchedules getRecoverMissedSchedules();

/**
* Load the default RecoverMissedSchedules from plugin property, or else ALL.
*/
default RecoverMissedSchedules defaultRecoverMissedSchedules(RunContext runContext) {
return runContext
.<String>pluginConfiguration(PLUGIN_PROPERTY_RECOVER_MISSED_SCHEDULES)
.map(conf -> RecoverMissedSchedules.valueOf(conf))
.orElse(RecoverMissedSchedules.ALL);
}
}
Original file line number Diff line number Diff line change
@@ -1,14 +1,18 @@
package io.kestra.core.models.triggers;

import io.kestra.core.models.Label;
import io.kestra.core.models.conditions.ConditionContext;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.executions.ExecutionTrigger;
import io.kestra.core.models.tasks.Output;
import io.kestra.core.models.flows.State;
import io.kestra.core.runners.DefaultRunContext;
import io.kestra.core.runners.FlowInputOutput;
import io.kestra.core.runners.RunContext;
import io.kestra.core.utils.IdUtils;

import java.util.Map;
import java.time.ZonedDateTime;
import java.util.*;

public abstract class TriggerService {
public static Execution generateExecution(
Expand Down Expand Up @@ -47,6 +51,52 @@ public static Execution generateRealtimeExecution(
return generateExecution(IdUtils.create(), trigger, context, executionTrigger, conditionContext.getFlow().getRevision());
}

public static Execution generateScheduledExecution(
AbstractTrigger trigger,
ConditionContext conditionContext,
TriggerContext context,
List<Label> labels,
Map<String, Object> inputs,
Map<String, Object> variables,
Optional<ZonedDateTime> scheduleDate
) {
RunContext runContext = conditionContext.getRunContext();
ExecutionTrigger executionTrigger = ExecutionTrigger.of(trigger, variables);

Execution execution = Execution.builder()
.id(runContext.getTriggerExecutionId())
.tenantId(context.getTenantId())
.namespace(context.getNamespace())
.flowId(context.getFlowId())
.flowRevision(conditionContext.getFlow().getRevision())
.labels(labels)
.state(new State())
.trigger(executionTrigger)
.scheduleDate(scheduleDate.map(date -> date.toInstant()).orElse(null))
.build();

Map<String, Object> allInputs = new HashMap<>();
// add flow inputs with default value
var flow = conditionContext.getFlow();
if (flow.getInputs() != null) {
flow.getInputs().stream()
.filter(input -> input.getDefaults() != null)
.forEach(input -> allInputs.put(input.getId(), input.getDefaults()));
}

if (inputs != null) {
allInputs.putAll(inputs);
}

// add inputs and inject defaults
if (!allInputs.isEmpty()) {
FlowInputOutput flowInputOutput = ((DefaultRunContext)runContext).getApplicationContext().getBean(FlowInputOutput.class);
execution = execution.withInputs(flowInputOutput.typedInputs(conditionContext.getFlow(), execution, allInputs));
}

return execution;
}

private static Execution generateExecution(
String id,
AbstractTrigger trigger,
Expand Down
19 changes: 11 additions & 8 deletions core/src/main/java/io/kestra/core/runners/ExecutableUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,18 @@
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.exceptions.InternalException;
import io.kestra.core.models.Label;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.executions.ExecutionTrigger;
import io.kestra.core.models.executions.TaskRun;
import io.kestra.core.models.executions.TaskRunAttempt;
import io.kestra.core.models.executions.*;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.FlowWithException;
import io.kestra.core.models.flows.State;
import io.kestra.core.models.property.Property;
import io.kestra.core.models.tasks.ExecutableTask;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.storages.Storage;
import lombok.extern.slf4j.Slf4j;

import java.time.Instant;
import java.time.ZonedDateTime;
import java.util.*;

@Slf4j
Expand Down Expand Up @@ -57,7 +57,8 @@ public static <T extends Task & ExecutableTask<?>> SubflowExecution<?> subflowEx
T currentTask,
TaskRun currentTaskRun,
Map<String, Object> inputs,
List<Label> labels
List<Label> labels,
Property<ZonedDateTime> scheduleDate
) throws IllegalVariableEvaluationException {
String subflowNamespace = runContext.render(currentTask.subflowId().namespace());
String subflowId = runContext.render(currentTask.subflowId().flowId());
Expand Down Expand Up @@ -90,18 +91,20 @@ public static <T extends Task & ExecutableTask<?>> SubflowExecution<?> subflowEx
);

FlowInputOutput flowInputOutput = ((DefaultRunContext)runContext).getApplicationContext().getBean(FlowInputOutput.class);
Instant scheduleOnDate = scheduleDate != null ? scheduleDate.as(runContext, ZonedDateTime.class).toInstant() : null;
Execution execution = Execution
.newExecution(
flow,
(f, e) -> flowInputOutput.typedInputs(f, e, inputs),
labels)
labels,
Optional.empty())
.withTrigger(ExecutionTrigger.builder()
.id(currentTask.getId())
.type(currentTask.getType())
.variables(variables)
.build()
);

)
.withScheduleDate(scheduleOnDate);
return SubflowExecution.builder()
.parentTask(currentTask)
.parentTaskRun(currentTaskRun.withState(State.Type.RUNNING))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -490,9 +490,7 @@ private Executor handleChildWorkerTaskResult(Executor executor) throws Exception
Instant nextRetryDate;
AbstractRetry retry;
AbstractRetry.Behavior behavior;
ExecutionDelay.ExecutionDelayBuilder executionDelayBuilder = ExecutionDelay.builder()
.taskRunId(taskRun.getId())
.executionId(executor.getExecution().getId());

// Case task has a retry
if (task.getRetry() != null) {
retry = task.getRetry();
Expand All @@ -518,7 +516,9 @@ else if (parentTask != null && parentTask.getRetry() != null) {
taskRun.nextRetryDate(retry);
}
if (nextRetryDate != null) {
executionDelayBuilder
ExecutionDelay.ExecutionDelayBuilder executionDelayBuilder = ExecutionDelay.builder()
.taskRunId(taskRun.getId())
.executionId(executor.getExecution().getId())
.date(nextRetryDate)
.state(State.Type.RUNNING)
.delayType(behavior.equals(AbstractRetry.Behavior.CREATE_NEW_EXECUTION) ?
Expand Down
6 changes: 3 additions & 3 deletions core/src/main/java/io/kestra/core/runners/RunnerUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ public Execution runOne(Flow flow, BiFunction<Flow, Execution, Map<String, Objec
duration = Duration.ofSeconds(15);
}

Execution execution = Execution.newExecution(flow, inputs, labels);
Execution execution = Execution.newExecution(flow, inputs, labels, Optional.empty());

return this.awaitExecution(isTerminatedExecution(execution, flow), throwRunnable(() -> {
this.executionQueue.emit(execution);
Expand All @@ -109,7 +109,7 @@ public Execution runOneUntilPaused(Flow flow, BiFunction<Flow, Execution, Map<St
duration = DEFAULT_MAX_WAIT_DURATION;
}

Execution execution = Execution.newExecution(flow, inputs, null);
Execution execution = Execution.newExecution(flow, inputs, null, Optional.empty());

return this.awaitExecution(isPausedExecution(execution), throwRunnable(() -> {
this.executionQueue.emit(execution);
Expand All @@ -135,7 +135,7 @@ public Execution runOneUntilRunning(Flow flow, BiFunction<Flow, Execution, Map<S
duration = DEFAULT_MAX_WAIT_DURATION;
}

Execution execution = Execution.newExecution(flow, inputs, null);
Execution execution = Execution.newExecution(flow, inputs, null, Optional.empty());

return this.awaitExecution(isRunningExecution(execution), throwRunnable(() -> {
this.executionQueue.emit(execution);
Expand Down
Loading

0 comments on commit 6166b47

Please sign in to comment.