Skip to content

Commit

Permalink
feat(core): allow to use a custom timezone on schedule (#746)
Browse files Browse the repository at this point in the history
  • Loading branch information
tchiotludo committed Sep 22, 2022
1 parent 91c3a17 commit 8194a7b
Show file tree
Hide file tree
Showing 5 changed files with 96 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,12 @@ public static ExecutionTrigger of(AbstractTrigger abstractTrigger, Output output
.variables(output.toMap())
.build();
}

public static ExecutionTrigger of(AbstractTrigger abstractTrigger, Map<String, Object> variables) {
return ExecutionTrigger.builder()
.id(abstractTrigger.getId())
.type(abstractTrigger.getType())
.variables(variables)
.build();
}
}
5 changes: 5 additions & 0 deletions core/src/main/java/io/kestra/core/models/tasks/Output.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import io.kestra.core.models.flows.State;
import io.kestra.core.serializers.JacksonMapper;

import java.time.ZoneId;
import java.util.Map;
import java.util.Optional;

Expand All @@ -14,4 +15,8 @@ default Optional<State.Type> finalState() {
default Map<String, Object> toMap() {
return JacksonMapper.toMap(this);
}

default Map<String, Object> toMap(ZoneId zoneId) {
return JacksonMapper.toMap(this, zoneId);
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package io.kestra.core.models.triggers.types;

import com.cronutils.model.Cron;
import com.cronutils.model.CronType;
import com.cronutils.model.definition.CronDefinitionBuilder;
import com.cronutils.model.time.ExecutionTime;
import com.cronutils.parser.CronParser;
Expand All @@ -27,6 +26,7 @@
import lombok.experimental.SuperBuilder;

import java.time.Duration;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.time.temporal.ChronoUnit;
import java.util.HashMap;
Expand Down Expand Up @@ -126,6 +126,12 @@ public class Schedule extends AbstractTrigger implements PollingTriggerInterface
)
private String cron;

@Schema(
title = "The time zone id to use for evaluate cron. Default value is the server default zone id."
)
@PluginProperty(dynamic = true)
private String timezone = ZoneId.systemDefault().toString();

@Schema(
title = "Backfill options in order to fill missing previous past date",
description = "Kestra will handle optionally a backfill. The concept of backfill is the replay the missing schedule because we create the flow later.\n" +
Expand Down Expand Up @@ -162,25 +168,28 @@ public class Schedule extends AbstractTrigger implements PollingTriggerInterface
public ZonedDateTime nextEvaluationDate(ConditionContext conditionContext, Optional<? extends TriggerContext> last) {
ExecutionTime executionTime = this.executionTime();

// previous present & scheduleConditions
if (last.isPresent() && this.scheduleConditions != null) {
Optional<ZonedDateTime> next = this.truePreviousNextDateWithCondition(
executionTime,
conditionContext,
last.get().getDate(),
true
);

if (next.isPresent()) {
return next.get().truncatedTo(ChronoUnit.SECONDS);
if (last.isPresent()) {
ZonedDateTime lastDate = convertDateTime(last.get().getDate());

// previous present & scheduleConditions
if (this.scheduleConditions != null) {
Optional<ZonedDateTime> next = this.truePreviousNextDateWithCondition(
executionTime,
conditionContext,
lastDate,
true
);

if (next.isPresent()) {
return next.get().truncatedTo(ChronoUnit.SECONDS);
}
}
}

// previous present but no scheduleConditions
if (last.isPresent()) {
return computeNextEvaluationDate(executionTime, last.get().getDate()).orElse(null);
// previous present but no scheduleConditions
return computeNextEvaluationDate(executionTime, lastDate).orElse(null);
}


// no previous present but backfill
if (backfill != null && backfill.getStart() != null) {
return backfill.getStart();
Expand All @@ -193,7 +202,7 @@ public ZonedDateTime nextEvaluationDate(ConditionContext conditionContext, Optio
public Optional<Execution> evaluate(ConditionContext conditionContext, TriggerContext context) throws Exception {
RunContext runContext = conditionContext.getRunContext();
ExecutionTime executionTime = this.executionTime();
ZonedDateTime previousDate = context.getDate();
ZonedDateTime previousDate = convertDateTime(context.getDate());

Output output = this.output(executionTime, previousDate).orElse(null);

Expand Down Expand Up @@ -242,7 +251,14 @@ public Optional<Execution> evaluate(ConditionContext conditionContext, TriggerCo
}
}

ExecutionTrigger executionTrigger = ExecutionTrigger.of(this, output);
Map<String, Object> variables;
if (this.timezone != null) {
variables = output.toMap(ZoneId.of(this.timezone));
} else {
variables = output.toMap();
}

ExecutionTrigger executionTrigger = ExecutionTrigger.of(this, variables);

Execution execution = Execution.builder()
.id(IdUtils.create())
Expand All @@ -269,12 +285,14 @@ private Optional<Output> output(ExecutionTime executionTime, ZonedDateTime date)
}

Output.OutputBuilder<?, ?> outputBuilder = Output.builder()
.date(next.get());
.date(convertDateTime(next.get()));

computeNextEvaluationDate(executionTime, next.get())
.map(this::convertDateTime)
.ifPresent(outputBuilder::next);

executionTime.lastExecution(date)
.map(this::convertDateTime)
.ifPresent(outputBuilder::previous);

Output output = outputBuilder.build();
Expand All @@ -299,6 +317,14 @@ private synchronized ExecutionTime executionTime() {
return this.executionTime;
}

private ZonedDateTime convertDateTime(ZonedDateTime date) {
if (this.timezone == null) {
return date;
}

return date.withZoneSameInstant(ZoneId.of(this.timezone));
}

private Optional<ZonedDateTime> computeNextEvaluationDate(ExecutionTime executionTime, ZonedDateTime date) {
return executionTime.nextExecution(date).map(zonedDateTime -> zonedDateTime.truncatedTo(ChronoUnit.SECONDS));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import io.kestra.core.serializers.ion.IonFactory;
import io.kestra.core.serializers.ion.IonModule;

import java.time.ZoneId;
import java.util.Map;
import java.util.TimeZone;

Expand Down Expand Up @@ -57,6 +58,13 @@ public static ObjectMapper ofYaml() {

private static final TypeReference<Map<String, Object>> TYPE_REFERENCE = new TypeReference<>() {};

public static Map<String, Object> toMap(Object object, ZoneId zoneId) {
return MAPPER
.copy()
.setTimeZone(TimeZone.getTimeZone(zoneId.getId()))
.convertValue(object, TYPE_REFERENCE);
}

public static Map<String, Object> toMap(Object object) {
return MAPPER.convertValue(object, TYPE_REFERENCE);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

import java.time.DayOfWeek;
import java.time.Duration;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.time.temporal.ChronoUnit;
Expand Down Expand Up @@ -349,6 +350,35 @@ void hourly() throws Exception {
assertThat(dateFromVars(vars.get("date"), date), is(date));
}

@Test
void timezone() throws Exception {
Schedule trigger = Schedule.builder().cron("12 9 1 * *").timezone("America/New_York").build();

ZonedDateTime date = ZonedDateTime.now()
.withZoneSameLocal(ZoneId.of("America/New_York"))
.withMonth(ZonedDateTime.now().getMonthValue() + 1)
.withDayOfMonth(1)
.withHour(9)
.withMinute(12)
.withSecond(0)
.truncatedTo(ChronoUnit.SECONDS)
.minus(1, ChronoUnit.MONTHS);

Optional<Execution> evaluate = trigger.evaluate(
conditionContext(),
triggerContext(date, trigger)
);

assertThat(evaluate.isPresent(), is(true));

var vars = (Map<String, String>) evaluate.get().getVariables().get("schedule");

assertThat(dateFromVars(vars.get("date"), date), is(date));
assertThat(ZonedDateTime.parse(vars.get("date")).getZone().getId(), is("-04:00"));
assertThat(dateFromVars(vars.get("next"), date), is(date.plusMonths(1)));
assertThat(dateFromVars(vars.get("previous"), date), is(date.minusMonths(1)));
}

private ConditionContext conditionContext() {
return ConditionContext.builder()
.runContext(runContextFactory.of())
Expand Down

0 comments on commit 8194a7b

Please sign in to comment.