Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature(#100): TimeClock interface #101

Merged
merged 1 commit into from
Dec 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions core/src/main/java/io/github/zero88/schedulerx/TimeClock.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package io.github.zero88.schedulerx;

import java.time.Instant;

import org.jetbrains.annotations.ApiStatus.Internal;
import org.jetbrains.annotations.NotNull;

/**
* Represents for time clock
*
* @since 2.0.0
*/
@Internal
public interface TimeClock {

/**
* Obtains the current instant from the system clock.
*
* @return the current instant using the system clock, not null
*/
@NotNull Instant now();

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.github.zero88.schedulerx.JobExecutor;
import io.github.zero88.schedulerx.Scheduler;
import io.github.zero88.schedulerx.SchedulingMonitor;
import io.github.zero88.schedulerx.TimeClock;
import io.github.zero88.schedulerx.TimeoutBlock;
import io.github.zero88.schedulerx.TimeoutPolicy;
import io.github.zero88.schedulerx.WorkerExecutorFactory;
Expand Down Expand Up @@ -59,6 +60,7 @@ public abstract class AbstractScheduler<IN, OUT, T extends Trigger> implements S
private final @NotNull T trigger;
private final @NotNull TriggerEvaluator evaluator;
private final @NotNull TimeoutPolicy timeoutPolicy;
private final @NotNull TimeClock clock;
private final Lock lock = new ReentrantLock();
private boolean didStart = false;
private boolean didTriggerValidation = false;
Expand All @@ -74,12 +76,15 @@ protected AbstractScheduler(@NotNull Job<IN, OUT> job, @NotNull JobData<IN> jobD
this.trigger = trigger;
this.monitor = monitor;
this.evaluator = new InternalTriggerEvaluator(this).andThen(evaluator);
this.state = new SchedulerStateImpl<>();
this.clock = new TimeClockImpl();
this.state = new SchedulerStateImpl<>(clock);
}

@Override
public final @NotNull Vertx vertx() { return this.vertx; }

public final @NotNull TimeClock clock() { return this.clock; }

@Override
public final @NotNull SchedulingMonitor<OUT> monitor() { return this.monitor; }

Expand Down Expand Up @@ -149,7 +154,7 @@ public final void executeJob(@NotNull ExecutionContext<OUT> executionContext) {
@Override
public final void cancel() {
if (!state.completed()) {
log(Instant.now(), "On cancel");
log(clock.now(), "On cancel");
doStop(state.timerId(), TriggerContextFactory.cancel(trigger().type(), state.tick()));
}
}
Expand Down Expand Up @@ -203,8 +208,9 @@ protected final void onTrigger(WorkerExecutor workerExecutor, TriggerContext tri
return;
}
final long round = state.increaseRound();
final ExecutionContextInternal<OUT> executionContext = new ExecutionContextImpl<>(vertx, triggerContext, round);
final Duration timeout = timeoutPolicy().executionTimeout();
final ExecutionContextInternal<OUT> executionContext = new ExecutionContextImpl<>(vertx, clock, triggerContext,
round);
log(executionContext.triggeredAt(), "On trigger", triggerContext.tick(), round);
Future.join(onEvaluationAfterTrigger(workerExecutor, triggerContext, round),
executeBlocking(workerExecutor, p -> executeJob(executionContext.setup(wrapTimeout(timeout, p)))))
Expand All @@ -228,7 +234,7 @@ protected final void onSchedule(long timerId) {
.setExternalId(jobData.externalId())
.setAvailableAt(state.availableAt())
.setTriggerContext(context)
.setRescheduledAt(Instant.now())
.setRescheduledAt(clock.now())
.setTick(context.tick())
.setRound(state.round())
.build();
Expand All @@ -241,15 +247,15 @@ protected final void onUnableSchedule(Throwable cause) {
monitor.onUnableSchedule(ExecutionResultImpl.<OUT>builder()
.setExternalId(jobData.externalId())
.setTriggerContext(ctx)
.setUnscheduledAt(Instant.now())
.setUnscheduledAt(clock.now())
.setTick(ctx.tick())
.setRound(ctx.tick())
.build());
}

protected final Future<TriggerContext> onEvaluationBeforeTrigger(WorkerExecutor worker, TriggerContext ctx) {
return executeBlocking(worker, p -> {
log(Instant.now(), "On before trigger");
log(clock.now(), "On before trigger");
wrapTimeout(timeoutPolicy().evaluationTimeout(), p).handle(
evaluator.beforeTrigger(trigger, ctx, jobData.externalId()));
});
Expand All @@ -258,11 +264,11 @@ protected final Future<TriggerContext> onEvaluationBeforeTrigger(WorkerExecutor
protected final Future<TriggerContext> onEvaluationAfterTrigger(WorkerExecutor worker, TriggerContext ctx,
long round) {
return executeBlocking(worker, p -> {
log(Instant.now(), "On after trigger");
log(clock.now(), "On after trigger");
wrapTimeout(timeoutPolicy().evaluationTimeout(), p).handle(
evaluator.afterTrigger(trigger(), ctx, jobData.externalId(), round)
.onSuccess(c -> doStop(state.timerId(), c))
.onFailure(t -> LOGGER.error(genMsg(ctx.tick(), round, Instant.now(), "After evaluate"), t)));
.onFailure(t -> LOGGER.error(genMsg(ctx.tick(), round, clock.now(), "After evaluate"), t)));
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import org.jetbrains.annotations.NotNull;

import io.github.zero88.schedulerx.TimeClock;
import io.github.zero88.schedulerx.trigger.TriggerContext;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
Expand All @@ -14,18 +15,20 @@ final class ExecutionContextImpl<OUTPUT> implements ExecutionContextInternal<OUT
private final Vertx vertx;
private final long round;
private final TriggerContext triggerContext;
private final TimeClock clock;
private final Instant triggeredAt;
private Instant executedAt;
private Promise<Object> promise;
private OUTPUT data;
private Throwable error;
private boolean forceStop = false;

ExecutionContextImpl(Vertx vertx, TriggerContext triggerContext, long round) {
ExecutionContextImpl(Vertx vertx, TimeClock clock, TriggerContext triggerContext, long round) {
this.vertx = vertx;
this.round = round;
this.triggerContext = triggerContext;
this.triggeredAt = Instant.now();
this.clock = clock;
this.triggeredAt = this.clock.now();
}

@Override
Expand All @@ -34,7 +37,7 @@ final class ExecutionContextImpl<OUTPUT> implements ExecutionContextInternal<OUT
throw new IllegalStateException("ExecutionContext is already setup");
}
this.promise = promise;
this.executedAt = Instant.now();
this.executedAt = this.clock.now();
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

import org.jetbrains.annotations.NotNull;

import io.github.zero88.schedulerx.TimeClock;

final class SchedulerStateImpl<OUTPUT> implements SchedulerStateInternal<OUTPUT> {

private final AtomicReference<Instant> availableAt = new AtomicReference<>();
Expand All @@ -24,8 +26,11 @@ final class SchedulerStateImpl<OUTPUT> implements SchedulerStateInternal<OUTPUT>
private final AtomicBoolean pending = new AtomicBoolean(true);
private final AtomicReference<Entry<Long, OUTPUT>> data = new AtomicReference<>(new SimpleEntry<>(0L, null));
private final AtomicReference<Entry<Long, Throwable>> error = new AtomicReference<>(new SimpleEntry<>(0L, null));
private final TimeClock clock;
private long timerId;

SchedulerStateImpl(TimeClock clock) { this.clock = clock; }

@Override
public Instant availableAt() { return availableAt.get(); }

Expand Down Expand Up @@ -73,20 +78,20 @@ public long increaseTick() {
@Override
public @NotNull Instant markAvailable() {
pending.set(false);
availableAt.set(Instant.now());
availableAt.set(clock.now());
return availableAt();
}

@Override
public @NotNull Instant markFinished(long tick) {
inProgress.remove(tick);
return Instant.now();
return clock.now();
}

@Override
public @NotNull Instant markCompleted() {
completed.set(true);
return Instant.now();
return clock.now();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package io.github.zero88.schedulerx.impl;

import java.time.Instant;

import org.jetbrains.annotations.ApiStatus.Internal;
import org.jetbrains.annotations.NotNull;

import io.github.zero88.schedulerx.TimeClock;

@Internal
class TimeClockImpl implements TimeClock {

@Override
public @NotNull Instant now() {
return Instant.now();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -66,21 +66,23 @@ public static TriggerContext error(String triggerType, String reason, @Nullable
* Create trigger context in {@link TriggerStatus#KICKOFF} state
*
* @param triggerType the trigger type
* @param firedAt the fired at
* @param tick the tick
*/
public static @NotNull TriggerContext kickoff(@NotNull String triggerType, long tick) {
return kickoff(triggerType, tick, null);
public static @NotNull TriggerContext kickoff(@NotNull String triggerType, @NotNull Instant firedAt, long tick) {
return kickoff(triggerType, firedAt, tick, null);
}

/**
* Create trigger context in {@link TriggerStatus#KICKOFF} state
*
* @param triggerType the trigger type
* @param firedAt the fired at
* @param tick the tick
* @param info the trigger context info
*/
public static @NotNull <T> TriggerContext kickoff(@NotNull String triggerType, long tick, @Nullable T info) {
final Instant firedAt = Instant.now();
public static @NotNull <T> TriggerContext kickoff(@NotNull String triggerType, @NotNull Instant firedAt, long tick,
@Nullable T info) {
final TriggerCondition condition = createCondition(TriggerStatus.KICKOFF, null, null);
return new TriggerContext() {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,11 @@ final class CronSchedulerImpl<IN, OUT> extends AbstractScheduler<IN, OUT, CronTr
@Override
protected @NotNull Future<Long> registerTimer(WorkerExecutor workerExecutor) {
try {
final Instant now = Instant.now();
final Instant now = clock().now();
final long nextTriggerAfter = trigger().nextTriggerAfter(now);
final Instant nextTriggerTime = now.plus(nextTriggerAfter, ChronoUnit.MILLIS);
nextTimerId = vertx().setTimer(nextTriggerAfter, tId -> {
onProcess(workerExecutor, TriggerContextFactory.kickoff(trigger().type(), onFire(tId)));
onProcess(workerExecutor, TriggerContextFactory.kickoff(trigger().type(), clock().now(), onFire(tId)));
doStart(workerExecutor);
});
log(now, "Next schedule at" + brackets(nextTriggerTime) + " by timerId" + brackets(nextTimerId));
Expand All @@ -49,7 +49,7 @@ final class CronSchedulerImpl<IN, OUT> extends AbstractScheduler<IN, OUT, CronTr
@Override
protected void unregisterTimer(long timerId) {
boolean result = vertx().cancelTimer(nextTimerId);
log(Instant.now(), "Unregistered timerId" + brackets(nextTimerId) + brackets(result));
log(clock().now(), "Unregistered timerId" + brackets(nextTimerId) + brackets(result));
}

static final class CronSchedulerBuilderImpl<IN, OUT>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import static io.github.zero88.schedulerx.impl.Utils.brackets;

import java.time.Instant;
import java.util.Objects;

import org.jetbrains.annotations.NotNull;
Expand Down Expand Up @@ -61,7 +60,7 @@ final class EventSchedulerImpl<IN, OUT, T> extends AbstractScheduler<IN, OUT, Ev
protected void unregisterTimer(long timerId) {
if (Objects.nonNull(consumer)) {
consumer.unregister()
.onComplete(r -> log(Instant.now(),
.onComplete(r -> log(clock().now(),
"Unregistered EventBus subscriber on address" + brackets(consumer.address()) +
brackets(r.succeeded()) + brackets(r.cause())));
}
Expand All @@ -70,9 +69,9 @@ protected void unregisterTimer(long timerId) {
private TriggerContext createKickoffContext(Message<Object> msg, long tick) {
try {
T eventMsg = trigger().getPredicate().convert(msg.headers(), msg.body());
return TriggerContextFactory.kickoff(trigger().type(), tick, eventMsg);
return TriggerContextFactory.kickoff(trigger().type(), clock().now(), tick, eventMsg);
} catch (Exception ex) {
return handleException(TriggerContextFactory.kickoff(trigger().type(), tick, msg), ex);
return handleException(TriggerContextFactory.kickoff(trigger().type(), clock().now(), tick, msg), ex);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import static io.github.zero88.schedulerx.trigger.IntervalTrigger.REPEAT_INDEFINITELY;

import java.time.Duration;
import java.time.Instant;

import org.jetbrains.annotations.NotNull;

Expand Down Expand Up @@ -37,7 +36,7 @@ final class IntervalSchedulerImpl<IN, OUT> extends AbstractScheduler<IN, OUT, In
}
final Promise<Long> promise = Promise.promise();
final long delay = trigger().initialDelay().toMillis();
log(Instant.now(), "Delay " + brackets(delay + "ms") + " then register the trigger in the scheduler");
log(clock().now(), "Delay " + brackets(delay + "ms") + " then register the trigger in the scheduler");
vertx().setTimer(delay, ignore -> promise.complete(createPeriodicTimer(workerExecutor)));
return promise.future();
} catch (Exception e) {
Expand All @@ -48,13 +47,15 @@ final class IntervalSchedulerImpl<IN, OUT> extends AbstractScheduler<IN, OUT, In
@Override
protected void unregisterTimer(long timerId) {
boolean result = vertx().cancelTimer(timerId);
log(Instant.now(), "Unregistered timerId" + brackets(timerId) + brackets(result));
log(clock().now(), "Unregistered timerId" + brackets(timerId) + brackets(result));
}

private long createPeriodicTimer(WorkerExecutor executor) {
final long millis = trigger().interval().toMillis();
return this.vertx()
.setPeriodic(trigger().interval().toMillis(),
id -> onProcess(executor, TriggerContextFactory.kickoff(trigger().type(), onFire(id))));
.setPeriodic(millis, id -> onProcess(executor,
TriggerContextFactory.kickoff(trigger().type(), clock().now(),
onFire(id))));
}

// @formatter:off
Expand Down
Loading