Skip to content

Commit

Permalink
feat(#93): First step for TriggerEvaluator
Browse files Browse the repository at this point in the history
  • Loading branch information
zero88 committed Dec 18, 2023
1 parent 8c0e5b1 commit 84c936c
Show file tree
Hide file tree
Showing 9 changed files with 162 additions and 73 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import org.jetbrains.annotations.NotNull;

import io.github.zero88.schedulerx.trigger.Trigger;
import io.github.zero88.schedulerx.trigger.TriggerEvaluator;
import io.vertx.core.Vertx;

/**
Expand All @@ -21,10 +22,14 @@ public interface SchedulerBuilder<IN, OUT, TRIGGER extends Trigger, SCHEDULER ex
SELF extends SchedulerBuilder<IN, OUT, TRIGGER, SCHEDULER, SELF>>
extends JobExecutorContext<IN, OUT>, SchedulerContext<TRIGGER, OUT> {

@NotNull TriggerEvaluator triggerEvaluator();

@NotNull SELF setVertx(@NotNull Vertx vertx);

@NotNull SELF setTrigger(@NotNull TRIGGER trigger);

@NotNull SELF setTriggerEvaluator(@NotNull TriggerEvaluator evaluator);

@NotNull SELF setMonitor(@NotNull SchedulingMonitor<OUT> monitor);

@NotNull SELF setJob(@NotNull Job<IN, OUT> job);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import io.github.zero88.schedulerx.trigger.TriggerCondition.ReasonCode;
import io.github.zero88.schedulerx.trigger.TriggerCondition.TriggerStatus;
import io.github.zero88.schedulerx.trigger.TriggerContext;
import io.github.zero88.schedulerx.trigger.TriggerEvaluator;
import io.vertx.core.Future;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
Expand All @@ -56,6 +57,7 @@ public abstract class AbstractScheduler<IN, OUT, T extends Trigger> implements S
private final @NotNull JobData<IN> jobData;
private final @NotNull Job<IN, OUT> job;
private final @NotNull T trigger;
private final @NotNull TriggerEvaluator evaluator;
private final @NotNull TimeoutPolicy timeoutPolicy;
private final Lock lock = new ReentrantLock();
private boolean didStart = false;
Expand All @@ -64,9 +66,10 @@ public abstract class AbstractScheduler<IN, OUT, T extends Trigger> implements S

protected AbstractScheduler(@NotNull Vertx vertx, @NotNull SchedulingMonitor<OUT> monitor,
@NotNull JobData<IN> jobData, @NotNull Job<IN, OUT> job, @NotNull T trigger,
@NotNull TimeoutPolicy timeoutPolicy) {
@NotNull TriggerEvaluator evaluator, @NotNull TimeoutPolicy timeoutPolicy) {
this.vertx = vertx;
this.trigger = trigger;
this.evaluator = new InternalTriggerEvaluator(this).andThen(evaluator);
this.monitor = monitor;
this.jobData = jobData;
this.job = job;
Expand Down Expand Up @@ -170,23 +173,6 @@ protected final void doStop(long timerId, TriggerContext context) {
*/
protected abstract void unregisterTimer(long timerId);

/**
* Check a trigger kickoff context whether to be able to run new execution or not
*/
protected final TriggerTransitionContext shouldRun(@NotNull TriggerTransitionContext kickOffContext) {
log(Instant.now(), "On evaluate");
if (state.pending()) {
return TriggerContextFactory.skip(kickOffContext, ReasonCode.NOT_YET_SCHEDULED);
}
if (state.completed()) {
return TriggerContextFactory.skip(kickOffContext, ReasonCode.ALREADY_STOPPED);
}
if (state.executing()) {
return TriggerContextFactory.skip(kickOffContext, ReasonCode.JOB_IS_RUNNING);
}
return evaluateTriggerRule(kickOffContext);
}

/**
* Check a trigger context whether to be able to stop by configuration or force stop
*/
Expand All @@ -200,23 +186,6 @@ protected final TriggerTransitionContext shouldStop(@NotNull TriggerTransitionCo
: triggerContext;
}

/**
* Evaluate a trigger kickoff context on trigger rule
*/
protected TriggerTransitionContext evaluateTriggerRule(@NotNull TriggerTransitionContext triggerContext) {
if (!triggerContext.isKickoff()) {
throw new IllegalStateException("Trigger condition status must be " + TriggerStatus.KICKOFF);
}
final Instant firedAt = Objects.requireNonNull(triggerContext.firedAt(),
"Kickoff context is missing a fired at time");
if (trigger().rule().isExceeded(firedAt)) {
return TriggerContextFactory.stop(triggerContext, ReasonCode.STOP_BY_CONFIG);
}
return trigger().shouldExecute(firedAt)
? TriggerContextFactory.ready(triggerContext)
: TriggerContextFactory.skip(triggerContext, ReasonCode.CONDITION_IS_NOT_MATCHED);
}

/**
* Register a timer id in internal state and increase tick time when the system timer fires
*
Expand All @@ -230,15 +199,15 @@ protected final long onFire(long timerId) {
/**
* Processing the trigger when the system timer fires
*/
protected final void onProcess(WorkerExecutor workerExecutor, TriggerTransitionContext kickoffContext) {
log(Objects.requireNonNull(kickoffContext.firedAt()), "On fire");
protected final void onProcess(WorkerExecutor workerExecutor, TriggerTransitionContext ctx) {
log(Objects.requireNonNull(ctx.firedAt()), "On fire");
final Duration timeout = timeoutPolicy().evaluationTimeout();
this.<TriggerTransitionContext>executeBlocking(workerExecutor,
p -> wrapTimeout(timeoutPolicy().evaluationTimeout(),
p).complete(shouldRun(kickoffContext)))
p -> this.wrapTimeout(timeout, p).handle(evaluator.check(ctx)))
.onSuccess(context -> onTrigger(workerExecutor, context))
.onFailure(t -> onMisfire(TriggerContextFactory.skip(kickoffContext, t instanceof TimeoutException
? ReasonCode.EVALUATION_TIMEOUT
: ReasonCode.UNEXPECTED_ERROR, t)));
.onFailure(t -> onMisfire(TriggerContextFactory.skip(ctx, t instanceof TimeoutException
? ReasonCode.EVALUATION_TIMEOUT
: ReasonCode.UNEXPECTED_ERROR, t)));
}

protected final void onTrigger(WorkerExecutor workerExecutor, TriggerTransitionContext triggerContext) {
Expand All @@ -248,9 +217,9 @@ protected final void onTrigger(WorkerExecutor workerExecutor, TriggerTransitionC
}
final ExecutionContextInternal<OUT> executionContext = new ExecutionContextImpl<>(vertx, triggerContext,
state.increaseRound());
final Duration timeout = timeoutPolicy().executionTimeout();
log(executionContext.triggeredAt(), "On trigger", triggerContext.tick(), executionContext.round());
this.executeBlocking(workerExecutor, p -> executeJob(
executionContext.setup(wrapTimeout(timeoutPolicy().executionTimeout(), p))))
this.executeBlocking(workerExecutor, p -> executeJob(executionContext.setup(wrapTimeout(timeout, p))))
.onComplete(ar -> onResult(executionContext, ar.cause()));
}

Expand Down Expand Up @@ -378,4 +347,38 @@ private <R> Promise<R> wrapTimeout(Duration timeout, Promise<R> promise) {
return new TimeoutBlock(vertx, timeout).wrap(promise);
}

@SuppressWarnings("rawtypes")
private static class InternalTriggerEvaluator extends AbstractTriggerEvaluator {

private final AbstractScheduler scheduler;

private InternalTriggerEvaluator(AbstractScheduler scheduler) { this.scheduler = scheduler; }

@Override
protected Future<TriggerTransitionContext> internalCheck(TriggerTransitionContext context) {
if (!context.isKickoff()) {
return Future.failedFuture(
new IllegalStateException("Trigger condition status must be " + TriggerStatus.KICKOFF));
}
if (scheduler.state.pending()) {
return Future.succeededFuture(TriggerContextFactory.skip(context, ReasonCode.NOT_YET_SCHEDULED));
}
if (scheduler.state.completed()) {
return Future.succeededFuture(TriggerContextFactory.skip(context, ReasonCode.ALREADY_STOPPED));
}
if (scheduler.state.executing()) {
return Future.succeededFuture(TriggerContextFactory.skip(context, ReasonCode.JOB_IS_RUNNING));
}
final Instant firedAt = Objects.requireNonNull(context.firedAt(),
"Kickoff context is missing a fired at time");
if (scheduler.trigger().rule().isExceeded(firedAt)) {
return Future.succeededFuture(TriggerContextFactory.stop(context, ReasonCode.STOP_BY_CONFIG));
}
return Future.succeededFuture(scheduler.trigger().shouldExecute(firedAt)
? TriggerContextFactory.ready(context)
: TriggerContextFactory.skip(context, ReasonCode.CONDITION_IS_NOT_MATCHED));
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import io.github.zero88.schedulerx.SchedulingMonitor;
import io.github.zero88.schedulerx.TimeoutPolicy;
import io.github.zero88.schedulerx.trigger.Trigger;
import io.github.zero88.schedulerx.trigger.TriggerEvaluator;
import io.vertx.core.Vertx;

/**
Expand All @@ -30,6 +31,7 @@ public abstract class AbstractSchedulerBuilder<IN, OUT, T extends Trigger, S ext
private JobData<IN> jobData;
private Job<IN, OUT> job;
private T trigger;
private TriggerEvaluator evaluator;
private TimeoutPolicy timeoutPolicy;

@Override
Expand All @@ -45,6 +47,11 @@ public abstract class AbstractSchedulerBuilder<IN, OUT, T extends Trigger, S ext
@Override
public @NotNull T trigger() { return Objects.requireNonNull(trigger, "Trigger is required"); }

@Override
public @NotNull TriggerEvaluator triggerEvaluator() {
return Optional.ofNullable(evaluator).orElseGet(TriggerEvaluator::create);
}

@Override
public @NotNull Job<IN, OUT> job() { return Objects.requireNonNull(job, "Job is required"); }

Expand All @@ -71,6 +78,12 @@ public abstract class AbstractSchedulerBuilder<IN, OUT, T extends Trigger, S ext
return (B) this;
}

@Override
public @NotNull B setTriggerEvaluator(@NotNull TriggerEvaluator evaluator) {
this.evaluator = evaluator;
return (B) this;
}

public @NotNull B setMonitor(@NotNull SchedulingMonitor<OUT> monitor) {
this.monitor = monitor;
return (B) this;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package io.github.zero88.schedulerx.impl;

import io.github.zero88.schedulerx.trigger.TriggerEvaluator;
import io.vertx.core.Future;

public abstract class AbstractTriggerEvaluator implements TriggerEvaluator {

public static TriggerEvaluator empty() {
return new AbstractTriggerEvaluator() {
@Override
protected Future<TriggerTransitionContext> internalCheck(TriggerTransitionContext context) {
return Future.succeededFuture(context);
}
};
}

private TriggerEvaluator next;

@Override
public Future<TriggerTransitionContext> check(TriggerTransitionContext context) {
return internalCheck(context).flatMap(ctx -> next == null ? Future.succeededFuture(ctx) : next.check(ctx));
}

@Override
public TriggerEvaluator andThen(TriggerEvaluator other) {
this.next = other;
return this;
}

protected abstract Future<TriggerTransitionContext> internalCheck(TriggerTransitionContext context);

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,9 @@ final class CronSchedulerImpl<IN, OUT> extends AbstractScheduler<IN, OUT, CronTr
private long nextTimerId;

CronSchedulerImpl(@NotNull Vertx vertx, @NotNull SchedulingMonitor<OUT> monitor, @NotNull JobData<IN> jobData,
@NotNull Job<IN, OUT> job, @NotNull CronTrigger trigger, @NotNull TimeoutPolicy timeoutPolicy) {
super(vertx, monitor, jobData, job, trigger, timeoutPolicy);
@NotNull Job<IN, OUT> job, @NotNull CronTrigger trigger, @NotNull TriggerEvaluator evaluator,
@NotNull TimeoutPolicy timeoutPolicy) {
super(vertx, monitor, jobData, job, trigger, evaluator, timeoutPolicy);
}

@Override
Expand Down Expand Up @@ -56,7 +57,8 @@ static final class CronSchedulerBuilderImpl<IN, OUT>
implements CronSchedulerBuilder<IN, OUT> {

public @NotNull CronScheduler<IN, OUT> build() {
return new CronSchedulerImpl<>(vertx(), monitor(), jobData(), job(), trigger(), timeoutPolicy());
return new CronSchedulerImpl<>(vertx(), monitor(), jobData(), job(), trigger(), triggerEvaluator(),
timeoutPolicy());
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,10 @@ final class EventSchedulerImpl<IN, OUT, T> extends AbstractScheduler<IN, OUT, Ev
private MessageConsumer<Object> consumer;

EventSchedulerImpl(@NotNull Vertx vertx, @NotNull SchedulingMonitor<OUT> monitor, @NotNull JobData<IN> jobData,
@NotNull Job<IN, OUT> job, @NotNull EventTrigger<T> trigger,
@NotNull Job<IN, OUT> job, @NotNull EventTrigger<T> trigger, @NotNull TriggerEvaluator evaluator,
@NotNull TimeoutPolicy timeoutPolicy) {
super(vertx, monitor, jobData, job, trigger, timeoutPolicy);
super(vertx, monitor, jobData, job, trigger, new EventTriggerEvaluator<>(trigger).andThen(evaluator),
timeoutPolicy);
}

@Override
Expand Down Expand Up @@ -66,21 +67,6 @@ protected void unregisterTimer(long timerId) {
}
}

@Override
@SuppressWarnings("unchecked")
protected TriggerTransitionContext evaluateTriggerRule(@NotNull TriggerTransitionContext triggerContext) {
final TriggerTransitionContext ctx = super.evaluateTriggerRule(triggerContext);
try {
if (ctx.condition().status() == TriggerCondition.TriggerStatus.READY &&
!trigger().getPredicate().test((T) triggerContext.info())) {
return TriggerContextFactory.skip(ctx, ReasonCode.CONDITION_IS_NOT_MATCHED);
}
} catch (Exception ex) {
return handleException(ctx, ex);
}
return ctx;
}

private TriggerTransitionContext createKickoffContext(Message<Object> msg, long tick) {
try {
T eventMsg = trigger().getPredicate().convert(msg.headers(), msg.body());
Expand All @@ -90,7 +76,7 @@ private TriggerTransitionContext createKickoffContext(Message<Object> msg, long
}
}

private TriggerTransitionContext handleException(TriggerTransitionContext context, Exception cause) {
static TriggerTransitionContext handleException(TriggerTransitionContext context, Exception cause) {
String reason = cause instanceof ClassCastException || cause instanceof EventTriggerPredicateException
? ReasonCode.CONDITION_IS_NOT_MATCHED
: ReasonCode.UNEXPECTED_ERROR;
Expand All @@ -104,7 +90,8 @@ static final class EventSchedulerBuilderImpl<IN, OUT, T>
// @formatter:on

public @NotNull EventScheduler<IN, OUT, T> build() {
return new EventSchedulerImpl<>(vertx(), monitor(), jobData(), job(), trigger(), timeoutPolicy());
return new EventSchedulerImpl<>(vertx(), monitor(), jobData(), job(), trigger(), triggerEvaluator(),
timeoutPolicy());
}

}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package io.github.zero88.schedulerx.trigger;

import io.github.zero88.schedulerx.impl.AbstractTriggerEvaluator;
import io.github.zero88.schedulerx.impl.TriggerContextFactory;
import io.github.zero88.schedulerx.impl.TriggerTransitionContext;
import io.github.zero88.schedulerx.trigger.TriggerCondition.ReasonCode;
import io.vertx.core.Future;

final class EventTriggerEvaluator<T> extends AbstractTriggerEvaluator {

private final EventTrigger<T> trigger;

EventTriggerEvaluator(EventTrigger<T> trigger) { this.trigger = trigger; }

@Override
@SuppressWarnings("unchecked")
protected Future<TriggerTransitionContext> internalCheck(TriggerTransitionContext ctx) {
try {
if (ctx.condition().status() == TriggerCondition.TriggerStatus.READY &&
!trigger.getPredicate().test((T) ctx.info())) {
return Future.succeededFuture(TriggerContextFactory.skip(ctx, ReasonCode.CONDITION_IS_NOT_MATCHED));
}
} catch (Exception ex) {
return Future.succeededFuture(EventSchedulerImpl.handleException(ctx, ex));
}
return Future.succeededFuture(ctx);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ final class IntervalSchedulerImpl<IN, OUT> extends AbstractScheduler<IN, OUT, In

IntervalSchedulerImpl(@NotNull Vertx vertx, @NotNull SchedulingMonitor<OUT> monitor, @NotNull JobData<IN> jobData,
@NotNull Job<IN, OUT> job, @NotNull IntervalTrigger trigger,
@NotNull TimeoutPolicy timeoutPolicy) {
super(vertx, monitor, jobData, job, trigger, timeoutPolicy);
@NotNull TriggerEvaluator evaluator, @NotNull TimeoutPolicy timeoutPolicy) {
super(vertx, monitor, jobData, job, trigger, evaluator, timeoutPolicy);
}

protected @NotNull Future<Long> registerTimer(WorkerExecutor workerExecutor) {
Expand All @@ -49,10 +49,9 @@ protected void unregisterTimer(long timerId) {
}

private long createPeriodicTimer(WorkerExecutor executor) {
return vertx().setPeriodic(trigger().intervalInMilliseconds(), id -> onProcess(executor,
TriggerContextFactory.kickoff(
trigger().type(),
onFire(id))));
return this.vertx()
.setPeriodic(trigger().intervalInMilliseconds(),
id -> onProcess(executor, TriggerContextFactory.kickoff(trigger().type(), onFire(id))));
}

// @formatter:off
Expand All @@ -62,7 +61,8 @@ static final class IntervalSchedulerBuilderImpl<IN, OUT>
// @formatter:on

public @NotNull IntervalScheduler<IN, OUT> build() {
return new IntervalSchedulerImpl<>(vertx(), monitor(), jobData(), job(), trigger(), timeoutPolicy());
return new IntervalSchedulerImpl<>(vertx(), monitor(), jobData(), job(), trigger(), triggerEvaluator(),
timeoutPolicy());
}

}
Expand Down
Loading

0 comments on commit 84c936c

Please sign in to comment.