Skip to content

Commit

Permalink
fix(core): Schedule with scheduleCondition was never triggered after …
Browse files Browse the repository at this point in the history
…first trigger
  • Loading branch information
tchiotludo committed Feb 15, 2022
1 parent 604f98b commit f636301
Show file tree
Hide file tree
Showing 6 changed files with 157 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
code = {
"- conditions:",
" - type: io.kestra.core.models.conditions.types.DayWeekInMonthCondition",
" dayOfWeek: io.kestra.core.models.conditions.types.DayWeekInMonthCondition",
" dayOfWeek: MONDAY",
" dayInMonth: FIRST",
}
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
public interface PollingTriggerInterface {
Optional<Execution> evaluate(ConditionContext conditionContext, TriggerContext context) throws Exception;

default ZonedDateTime nextEvaluationDate(Optional<? extends TriggerContext> last) {
default ZonedDateTime nextEvaluationDate(ConditionContext conditionContext, Optional<? extends TriggerContext> last) {
return ZonedDateTime.now();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,23 +117,41 @@ public class Schedule extends AbstractTrigger implements PollingTriggerInterface
private Map<String, String> inputs;

@Override
public ZonedDateTime nextEvaluationDate(Optional<? extends TriggerContext> last) {
if (last.isPresent()) {
return computeNextEvaluationDate(last.get().getDate()).orElse(null);
} else {
if (backfill != null && backfill.getStart() != null) {
return backfill.getStart();
public ZonedDateTime nextEvaluationDate(ConditionContext conditionContext, Optional<? extends TriggerContext> last) {
ExecutionTime executionTime = this.executionTime();

// previous present & scheduleConditions
if (last.isPresent() && this.scheduleConditions != null) {

Optional<ZonedDateTime> next = this.truePreviousNextDateWithCondition(
executionTime,
conditionContext,
last.get().getDate(),
true
);

if (next.isPresent()) {
return next.get().truncatedTo(ChronoUnit.SECONDS);
}
}

// previous present but no scheduleConditions
if (last.isPresent()) {
return computeNextEvaluationDate(executionTime, last.get().getDate()).orElse(null);
}

return computeNextEvaluationDate(ZonedDateTime.now()).orElse(null);
// no previous present but backfill
if (backfill != null && backfill.getStart() != null) {
return backfill.getStart();
}

// no previous present & no backfill, just provide now
return computeNextEvaluationDate(executionTime, ZonedDateTime.now()).orElse(null);
}

public Optional<Execution> evaluate(ConditionContext conditionContext, TriggerContext context) throws Exception {
Cron parse = CRON_PARSER.parse(this.cron);
RunContext runContext = conditionContext.getRunContext();

ExecutionTime executionTime = ExecutionTime.forCron(parse);
ExecutionTime executionTime = this.executionTime();
Output output = this.output(executionTime, context.getDate()).orElse(null);

if (output == null || output.getDate() == null) {
Expand Down Expand Up @@ -208,7 +226,7 @@ private Optional<Output> output(ExecutionTime executionTime, ZonedDateTime date)
Output.OutputBuilder<?, ?> outputBuilder = Output.builder()
.date(next.get());

computeNextEvaluationDate(next.get())
computeNextEvaluationDate(executionTime, next.get())
.ifPresent(outputBuilder::next);

executionTime.lastExecution(date)
Expand All @@ -226,29 +244,30 @@ private ConditionContext conditionContext(ConditionContext conditionContext, Out
));
}

private Optional<ZonedDateTime> computeNextEvaluationDate(ZonedDateTime date) {
private ExecutionTime executionTime() {
Cron parse = CRON_PARSER.parse(this.cron);
ExecutionTime executionTime = ExecutionTime.forCron(parse);

return ExecutionTime.forCron(parse);
}

private Optional<ZonedDateTime> computeNextEvaluationDate(ExecutionTime executionTime, ZonedDateTime date) {
return executionTime.nextExecution(date).map(zonedDateTime -> zonedDateTime.truncatedTo(ChronoUnit.SECONDS));
}

private Output trueOutputWithCondition(ExecutionTime executionTime, ConditionContext conditionContext, Output output) {
Output.OutputBuilder<?, ?> outputBuilder = Output.builder()
.date(output.getDate());

this.truePreviousNextDateWithCondition(executionTime, conditionContext, output, true)
this.truePreviousNextDateWithCondition(executionTime, conditionContext, output.getDate(), true)
.ifPresent(outputBuilder::next);

this.truePreviousNextDateWithCondition(executionTime, conditionContext, output, false)
this.truePreviousNextDateWithCondition(executionTime, conditionContext, output.getDate(), false)
.ifPresent(outputBuilder::previous);

return outputBuilder.build();
}

private Optional<ZonedDateTime> truePreviousNextDateWithCondition(ExecutionTime executionTime, ConditionContext conditionContext, Output output, boolean next) {
ZonedDateTime toTestDate = output.getDate();

private Optional<ZonedDateTime> truePreviousNextDateWithCondition(ExecutionTime executionTime, ConditionContext conditionContext, ZonedDateTime toTestDate, boolean next) {
while (
(next && toTestDate.getYear() < ZonedDateTime.now().getYear() + 10) ||
(!next && toTestDate.getYear() > ZonedDateTime.now().getYear() - 10)
Expand Down Expand Up @@ -293,7 +312,6 @@ private boolean validateScheduleCondition(ConditionContext conditionContext) {
return true;
}


@SuperBuilder
@ToString
@EqualsAndHashCode
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ private void handle() {

return FlowWithPollingTriggerNextDate.of(
f,
f.getPollingTrigger().nextEvaluationDate(Optional.of(lastTrigger))
f.getPollingTrigger().nextEvaluationDate(f.getConditionContext(), Optional.of(lastTrigger))
);
}
})
Expand Down Expand Up @@ -395,7 +395,7 @@ private Trigger getLastTrigger(FlowWithPollingTrigger f, ZonedDateTime now) {
return triggerState
.findLast(f.getTriggerContext())
.orElseGet(() -> {
ZonedDateTime nextDate = f.getPollingTrigger().nextEvaluationDate(Optional.empty());
ZonedDateTime nextDate = f.getPollingTrigger().nextEvaluationDate(f.getConditionContext(), Optional.empty());

Trigger build = Trigger.builder()
.date(nextDate.compareTo(now) < 0 ? nextDate : now)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ void everyMinute() throws Exception {
@Test
void noBackfillNextDate() {
Schedule trigger = Schedule.builder().cron("0 0 * * *").build();
ZonedDateTime next = trigger.nextEvaluationDate(Optional.empty());
ZonedDateTime next = trigger.nextEvaluationDate(conditionContext(), Optional.empty());

assertThat(next.getDayOfMonth(), is(ZonedDateTime.now().plusDays(1).getDayOfMonth()));
}
Expand All @@ -132,7 +132,7 @@ void noBackfillNextDate() {
void noBackfillNextDateContext() {
Schedule trigger = Schedule.builder().cron("0 0 * * *").build();
ZonedDateTime date = ZonedDateTime.parse("2020-01-01T00:00:00+01:00[Europe/Paris]");
ZonedDateTime next = trigger.nextEvaluationDate(Optional.of(triggerContext(date, trigger)));
ZonedDateTime next = trigger.nextEvaluationDate(conditionContext(), Optional.of(triggerContext(date, trigger)));

assertThat(next.format(DateTimeFormatter.ISO_LOCAL_DATE), is(date.plusDays(1).format(DateTimeFormatter.ISO_LOCAL_DATE)));
}
Expand All @@ -145,7 +145,7 @@ void backfillNextDate() {
.cron("0 0 * * *")
.backfill(Schedule.ScheduleBackfill.builder().start(date).build())
.build();
ZonedDateTime next = trigger.nextEvaluationDate(Optional.empty());
ZonedDateTime next = trigger.nextEvaluationDate(conditionContext(), Optional.empty());

assertThat(next.format(DateTimeFormatter.ISO_LOCAL_DATE), is(date.format(DateTimeFormatter.ISO_LOCAL_DATE)));
}
Expand All @@ -157,15 +157,15 @@ void backfillNextDateContext() {
.backfill(Schedule.ScheduleBackfill.builder().start(ZonedDateTime.parse("2020-01-01T00:00:00+01:00[Europe/Paris]")).build())
.build();
ZonedDateTime date = ZonedDateTime.parse("2020-03-01T00:00:00+01:00[Europe/Paris]");
ZonedDateTime next = trigger.nextEvaluationDate(Optional.of(triggerContext(date, trigger)));
ZonedDateTime next = trigger.nextEvaluationDate(conditionContext(), Optional.of(triggerContext(date, trigger)));

assertThat(next.format(DateTimeFormatter.ISO_LOCAL_DATE), is(next.format(DateTimeFormatter.ISO_LOCAL_DATE)));
}

@Test
void emptyBackfillStartDate() {
Schedule trigger = Schedule.builder().cron("0 0 * * *").backfill(Schedule.ScheduleBackfill.builder().build()).build();
ZonedDateTime next = trigger.nextEvaluationDate(Optional.empty());
ZonedDateTime next = trigger.nextEvaluationDate(conditionContext(), Optional.empty());

assertThat(next.getDayOfMonth(), is(ZonedDateTime.now().plusDays(1).getDayOfMonth()));
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
package io.kestra.core.schedulers;

import io.kestra.core.models.conditions.types.DayWeekInMonthCondition;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.State;
import io.kestra.core.models.triggers.Trigger;
import io.kestra.core.models.triggers.types.Schedule;
import io.kestra.runner.memory.MemoryFlowListeners;
import jakarta.inject.Inject;
import org.junit.jupiter.api.Test;

import java.time.DayOfWeek;
import java.time.ZonedDateTime;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.*;

class SchedulerConditionTest extends AbstractSchedulerTest {
@Inject
protected MemoryFlowListeners flowListenersService;

@Inject
protected SchedulerTriggerStateInterface triggerState;

@Inject
protected SchedulerExecutionStateInterface executionState;

private static Flow createScheduleFlow() {
Schedule schedule = Schedule.builder()
.id("hourly")
.type(Schedule.class.getName())
.cron("0 0 * * *")
.inputs(Map.of(
"testInputs", "test-inputs"
))
.scheduleConditions(List.of(
DayWeekInMonthCondition.builder()
.type(DayWeekInMonthCondition.class.getName())
.date("{{ trigger.date }}")
.dayOfWeek(DayOfWeek.MONDAY)
.dayInMonth(DayWeekInMonthCondition.DayInMonth.FIRST)
.build()
))
.build();

return createFlow(Collections.singletonList(schedule));
}

@Test
void schedule() throws Exception {
// mock flow listeners
MemoryFlowListeners flowListenersServiceSpy = spy(this.flowListenersService);
SchedulerExecutionStateInterface executionRepositorySpy = spy(this.executionState);
CountDownLatch queueCount = new CountDownLatch(4);

Flow flow = createScheduleFlow();

triggerState.save(Trigger.builder()
.namespace(flow.getNamespace())
.flowId(flow.getId())
.flowRevision(flow.getRevision())
.triggerId("hourly")
.date(ZonedDateTime.parse("2021-09-06T02:00:00+01:00[Europe/Paris]"))
.build()
);

doReturn(Collections.singletonList(flow))
.when(flowListenersServiceSpy)
.flows();

// mock the backfill execution is ended
doAnswer(invocation -> Optional.of(Execution.builder().state(new State().withState(State.Type.SUCCESS)).build()))
.when(executionRepositorySpy)
.findById(any());

// scheduler
try (AbstractScheduler scheduler = new DefaultScheduler(
applicationContext,
flowListenersServiceSpy,
executionRepositorySpy,
triggerState
)) {
// wait for execution
executionQueue.receive(SchedulerConditionTest.class, execution -> {
if (execution.getState().getCurrent() == State.Type.CREATED) {
executionQueue.emit(execution.withState(State.Type.SUCCESS));

queueCount.countDown();
if (queueCount.getCount() == 0) {
assertThat((ZonedDateTime) execution.getTrigger().getVariables().get("date"), is(ZonedDateTime.parse("2022-01-03T00:00:00+01:00")));
}
}
assertThat(execution.getFlowId(), is(flow.getId()));
});

scheduler.run();
queueCount.await(1, TimeUnit.MINUTES);

assertThat(queueCount.getCount(), is(0L));
}
}
}

0 comments on commit f636301

Please sign in to comment.