diff --git a/core/src/main/java/io/github/zero88/schedulerx/impl/AbstractScheduler.java b/core/src/main/java/io/github/zero88/schedulerx/impl/AbstractScheduler.java index 2737da7..6258dd8 100644 --- a/core/src/main/java/io/github/zero88/schedulerx/impl/AbstractScheduler.java +++ b/core/src/main/java/io/github/zero88/schedulerx/impl/AbstractScheduler.java @@ -160,8 +160,10 @@ protected final void doStart(WorkerExecutor workerExecutor) { } protected final void doStop(long timerId, TriggerContext context) { - unregisterTimer(timerId); - onComplete(context); + if (context.isStopped()) { + unregisterTimer(timerId); + onComplete(context); + } } /** @@ -174,18 +176,6 @@ protected final void doStop(long timerId, TriggerContext context) { */ protected abstract void unregisterTimer(long timerId); - /** - * Check a trigger context whether to be able to stop by configuration or force stop - */ - protected final TriggerContext shouldStop(@NotNull TriggerContext triggerContext, boolean isForceStop, long round) { - if (isForceStop) { - return TriggerContextFactory.stop(triggerContext, ReasonCode.STOP_BY_JOB); - } - return trigger().shouldStop(round) - ? TriggerContextFactory.stop(triggerContext, ReasonCode.STOP_BY_CONFIG) - : triggerContext; - } - /** * Register a timer id in internal state and increase tick time when the system timer fires * @@ -199,16 +189,13 @@ protected final long onFire(long timerId) { /** * Processing the trigger right away after the system timer fires */ - protected final void onProcess(WorkerExecutor workerExecutor, TriggerContext ctx) { - log(Objects.requireNonNull(ctx.firedAt()), "On fire"); - final Duration timeout = timeoutPolicy().evaluationTimeout(); - this.executeBlocking(workerExecutor, p -> this.wrapTimeout(timeout, p) - .handle(evaluator.beforeRun(trigger, ctx, - jobData.externalId()))) - .onSuccess(context -> onTrigger(workerExecutor, context)) - .onFailure(t -> onMisfire(TriggerContextFactory.skip(ctx, t instanceof TimeoutException - ? ReasonCode.EVALUATION_TIMEOUT - : ReasonCode.UNEXPECTED_ERROR, t))); + protected final void onProcess(WorkerExecutor workerExecutor, TriggerContext triggerContext) { + log(Objects.requireNonNull(triggerContext.firedAt()), "On fire"); + this.onEvaluationBeforeTrigger(workerExecutor, triggerContext) + .onSuccess(ctx -> onTrigger(workerExecutor, ctx)) + .onFailure(t -> onMisfire(TriggerContextFactory.skip(triggerContext, t instanceof TimeoutException + ? ReasonCode.EVALUATION_TIMEOUT + : ReasonCode.UNEXPECTED_ERROR, t))); } protected final void onTrigger(WorkerExecutor workerExecutor, TriggerContext triggerContext) { @@ -216,12 +203,13 @@ protected final void onTrigger(WorkerExecutor workerExecutor, TriggerContext tri onMisfire(triggerContext); return; } - final ExecutionContextInternal executionContext = new ExecutionContextImpl<>(vertx, triggerContext, - state.increaseRound()); + final long round = state.increaseRound(); + final ExecutionContextInternal executionContext = new ExecutionContextImpl<>(vertx, triggerContext, round); final Duration timeout = timeoutPolicy().executionTimeout(); - log(executionContext.triggeredAt(), "On trigger", triggerContext.tick(), executionContext.round()); - this.executeBlocking(workerExecutor, p -> executeJob(executionContext.setup(wrapTimeout(timeout, p)))) - .onComplete(ar -> onResult(executionContext, ar.cause())); + log(executionContext.triggeredAt(), "On trigger", triggerContext.tick(), round); + Future.join(onEvaluationAfterTrigger(workerExecutor, triggerContext, round), + executeBlocking(workerExecutor, p -> executeJob(executionContext.setup(wrapTimeout(timeout, p))))) + .onComplete(ar -> onResult(executionContext, ar.result().cause(1))); } protected final void onSchedule(long timerId) { @@ -260,6 +248,25 @@ protected final void onUnableSchedule(Throwable cause) { .build()); } + protected final Future onEvaluationBeforeTrigger(WorkerExecutor worker, TriggerContext ctx) { + return executeBlocking(worker, p -> { + log(Instant.now(), "On before trigger"); + this.wrapTimeout(timeoutPolicy().evaluationTimeout(), p) + .handle(evaluator.beforeTrigger(trigger, ctx, jobData.externalId())); + }); + } + + protected final Future onEvaluationAfterTrigger(WorkerExecutor worker, TriggerContext ctx, + long round) { + return executeBlocking(worker, p -> { + log(Instant.now(), "On after trigger"); + wrapTimeout(timeoutPolicy().evaluationTimeout(), p).handle( + evaluator.afterTrigger(trigger(), ctx, jobData.externalId(), round) + .onSuccess(c -> doStop(state.timerId(), c)) + .onFailure(t -> LOGGER.error(genMsg(ctx.tick(), round, Instant.now(), "After evaluate"), t))); + }); + } + protected final void onMisfire(@NotNull TriggerContext triggerCtx) { final Instant finishedAt = state.markFinished(triggerCtx.tick()); final String reasonCode = triggerCtx.condition().reasonCode(); @@ -288,7 +295,7 @@ protected final void onResult(@NotNull ExecutionContext executionContext, @ if (asyncCause instanceof TimeoutException) { LOGGER.warn(genMsg(triggerContext.tick(), ctx.round(), finishedAt, asyncCause.getMessage())); } else if (asyncCause != null) { - LOGGER.error(genMsg(triggerContext.tick(), ctx.round(), finishedAt, "System error"), asyncCause); + LOGGER.error(genMsg(triggerContext.tick(), ctx.round(), finishedAt, "On result::System error"), asyncCause); } monitor.onEach(ExecutionResultImpl.builder() .setExternalId(jobData.externalId()) @@ -304,9 +311,8 @@ protected final void onResult(@NotNull ExecutionContext executionContext, @ .setError(state.addError(ctx.round(), Optional.ofNullable(ctx.error()).orElse(asyncCause))) .build()); - final TriggerContext transitionCtx = shouldStop(triggerContext, ctx.isForceStop(), ctx.round()); - if (transitionCtx.isStopped()) { - doStop(state.timerId(), transitionCtx); + if (ctx.isForceStop()) { + doStop(state.timerId(), TriggerContextFactory.stop(triggerContext, ReasonCode.STOP_BY_JOB)); } } @@ -358,15 +364,15 @@ private Promise wrapTimeout(Duration timeout, Promise promise) { } @SuppressWarnings("rawtypes") - private static class InternalTriggerEvaluator extends AbstractTriggerEvaluator { + private static class InternalTriggerEvaluator extends DefaultTriggerEvaluator { private final AbstractScheduler scheduler; private InternalTriggerEvaluator(AbstractScheduler scheduler) { this.scheduler = scheduler; } @Override - protected Future internalCheck(@NotNull Trigger trigger, @NotNull TriggerContext ctx, - @Nullable Object externalId) { + protected Future internalBeforeTrigger(@NotNull Trigger trigger, @NotNull TriggerContext ctx, + @Nullable Object externalId) { if (!ctx.isKickoff()) { throw new IllegalStateException("Trigger condition status must be " + TriggerStatus.KICKOFF); } @@ -375,7 +381,6 @@ protected Future internalCheck(@NotNull Trigger trigger, @NotNul @NotNull private TriggerContext doCheck(TriggerContext ctx) { - scheduler.log(Instant.now(), "On evaluate"); if (scheduler.state.pending()) { return TriggerContextFactory.skip(ctx, ReasonCode.NOT_YET_SCHEDULED); } diff --git a/core/src/main/java/io/github/zero88/schedulerx/impl/AbstractSchedulerBuilder.java b/core/src/main/java/io/github/zero88/schedulerx/impl/AbstractSchedulerBuilder.java index fae95d3..c2bd768 100644 --- a/core/src/main/java/io/github/zero88/schedulerx/impl/AbstractSchedulerBuilder.java +++ b/core/src/main/java/io/github/zero88/schedulerx/impl/AbstractSchedulerBuilder.java @@ -49,7 +49,7 @@ public abstract class AbstractSchedulerBuilder internalCheck(@NotNull Trigger trigger, - @NotNull TriggerContext triggerContext, - @Nullable Object externalId) { - return Future.succeededFuture(triggerContext); - } - }; - } - - private TriggerEvaluator next; - - @Override - public @NotNull Future beforeRun(@NotNull Trigger trigger, @NotNull TriggerContext triggerContext, - @Nullable Object externalId) { - return this.internalCheck(trigger, triggerContext, externalId) - .flatMap(ctx -> next == null ? Future.succeededFuture(ctx) : next.beforeRun(trigger, ctx, externalId)); - } - - @Override - public @NotNull TriggerEvaluator andThen(@Nullable TriggerEvaluator another) { - this.next = another; - return this; - } - - protected abstract Future internalCheck(@NotNull Trigger trigger, - @NotNull TriggerContext triggerContext, - @Nullable Object externalId); - -} diff --git a/core/src/main/java/io/github/zero88/schedulerx/impl/DefaultTriggerEvaluator.java b/core/src/main/java/io/github/zero88/schedulerx/impl/DefaultTriggerEvaluator.java new file mode 100644 index 0000000..c892512 --- /dev/null +++ b/core/src/main/java/io/github/zero88/schedulerx/impl/DefaultTriggerEvaluator.java @@ -0,0 +1,57 @@ +package io.github.zero88.schedulerx.impl; + +import org.jetbrains.annotations.ApiStatus.Internal; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; + +import io.github.zero88.schedulerx.trigger.Trigger; +import io.github.zero88.schedulerx.trigger.TriggerContext; +import io.github.zero88.schedulerx.trigger.TriggerEvaluator; +import io.vertx.core.Future; + +@Internal +public class DefaultTriggerEvaluator implements TriggerEvaluator { + + static TriggerEvaluator noop() { + return new DefaultTriggerEvaluator(); + } + + private TriggerEvaluator next; + + @Override + public final @NotNull Future beforeTrigger(@NotNull Trigger trigger, + @NotNull TriggerContext triggerContext, + @Nullable Object externalId) { + return this.internalBeforeTrigger(trigger, triggerContext, externalId) + .flatMap(c -> next == null ? Future.succeededFuture(c) : next.beforeTrigger(trigger, c, externalId)); + } + + @Override + public final @NotNull Future afterTrigger(@NotNull Trigger trigger, + @NotNull TriggerContext triggerContext, + @Nullable Object externalId, long round) { + // @formatter:off + return this.internalAfterTrigger(trigger, triggerContext, externalId, round ) + .flatMap(c -> next == null ? Future.succeededFuture(c) : next.afterTrigger(trigger, c, externalId, round)); + // @formatter:on + } + + @Override + public final @NotNull TriggerEvaluator andThen(@Nullable TriggerEvaluator another) { + this.next = another; + return this; + } + + protected Future internalBeforeTrigger(@NotNull Trigger trigger, + @NotNull TriggerContext triggerContext, + @Nullable Object externalId) { + return Future.succeededFuture(triggerContext); + } + + protected Future internalAfterTrigger(@NotNull Trigger trigger, + @NotNull TriggerContext triggerContext, + @Nullable Object externalId, long round) { + return Future.succeededFuture(triggerContext); + } + +} diff --git a/core/src/main/java/io/github/zero88/schedulerx/trigger/EventSchedulerImpl.java b/core/src/main/java/io/github/zero88/schedulerx/trigger/EventSchedulerImpl.java index 27195cb..39fefb7 100644 --- a/core/src/main/java/io/github/zero88/schedulerx/trigger/EventSchedulerImpl.java +++ b/core/src/main/java/io/github/zero88/schedulerx/trigger/EventSchedulerImpl.java @@ -14,7 +14,7 @@ import io.github.zero88.schedulerx.TimeoutPolicy; import io.github.zero88.schedulerx.impl.AbstractScheduler; import io.github.zero88.schedulerx.impl.AbstractSchedulerBuilder; -import io.github.zero88.schedulerx.impl.AbstractTriggerEvaluator; +import io.github.zero88.schedulerx.impl.DefaultTriggerEvaluator; import io.github.zero88.schedulerx.impl.TriggerContextFactory; import io.github.zero88.schedulerx.trigger.TriggerCondition.ReasonCode; import io.github.zero88.schedulerx.trigger.predicate.EventTriggerPredicate.EventTriggerPredicateException; @@ -97,12 +97,12 @@ static final class EventSchedulerBuilderImpl } - static final class EventTriggerEvaluator extends AbstractTriggerEvaluator { + static final class EventTriggerEvaluator extends DefaultTriggerEvaluator { @Override @SuppressWarnings("unchecked") - protected Future internalCheck(@NotNull Trigger trigger, @NotNull TriggerContext ctx, - @Nullable Object externalId) { + protected Future internalBeforeTrigger(@NotNull Trigger trigger, @NotNull TriggerContext ctx, + @Nullable Object externalId) { try { if (ctx.condition().status() == TriggerCondition.TriggerStatus.READY && !((EventTrigger) trigger).getPredicate().test((T) ctx.info())) { diff --git a/core/src/main/java/io/github/zero88/schedulerx/trigger/IntervalSchedulerImpl.java b/core/src/main/java/io/github/zero88/schedulerx/trigger/IntervalSchedulerImpl.java index af05adb..7d64990 100644 --- a/core/src/main/java/io/github/zero88/schedulerx/trigger/IntervalSchedulerImpl.java +++ b/core/src/main/java/io/github/zero88/schedulerx/trigger/IntervalSchedulerImpl.java @@ -5,6 +5,7 @@ import java.time.Instant; import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; import io.github.zero88.schedulerx.Job; import io.github.zero88.schedulerx.JobData; @@ -12,7 +13,9 @@ import io.github.zero88.schedulerx.TimeoutPolicy; import io.github.zero88.schedulerx.impl.AbstractScheduler; import io.github.zero88.schedulerx.impl.AbstractSchedulerBuilder; +import io.github.zero88.schedulerx.impl.DefaultTriggerEvaluator; import io.github.zero88.schedulerx.impl.TriggerContextFactory; +import io.github.zero88.schedulerx.trigger.TriggerCondition.ReasonCode; import io.vertx.core.Future; import io.vertx.core.Promise; import io.vertx.core.Vertx; @@ -24,7 +27,7 @@ final class IntervalSchedulerImpl extends AbstractScheduler job, @NotNull JobData jobData, @NotNull TimeoutPolicy timeoutPolicy, @NotNull SchedulingMonitor monitor, @NotNull IntervalTrigger trigger, @NotNull TriggerEvaluator evaluator, @NotNull Vertx vertx) { - super(job, jobData, timeoutPolicy, monitor, trigger, evaluator, vertx); + super(job, jobData, timeoutPolicy, monitor, trigger, new IntervalTriggerEvaluator().andThen(evaluator), vertx); } protected @NotNull Future registerTimer(WorkerExecutor workerExecutor) { @@ -67,4 +70,20 @@ static final class IntervalSchedulerBuilderImpl } + + static final class IntervalTriggerEvaluator extends DefaultTriggerEvaluator { + + @Override + protected Future internalAfterTrigger(@NotNull Trigger trigger, + @NotNull TriggerContext triggerContext, + @Nullable Object externalId, long round) { + IntervalTrigger interval = (IntervalTrigger) trigger; + if (interval.noRepeatIndefinitely() && round >= interval.getRepeat()) { + return Future.succeededFuture(TriggerContextFactory.stop(triggerContext, ReasonCode.STOP_BY_CONFIG)); + } + return Future.succeededFuture(triggerContext); + } + + } + } diff --git a/core/src/main/java/io/github/zero88/schedulerx/trigger/IntervalTrigger.java b/core/src/main/java/io/github/zero88/schedulerx/trigger/IntervalTrigger.java index 12537db..0913f9c 100644 --- a/core/src/main/java/io/github/zero88/schedulerx/trigger/IntervalTrigger.java +++ b/core/src/main/java/io/github/zero88/schedulerx/trigger/IntervalTrigger.java @@ -80,11 +80,6 @@ default long delayInMilliseconds() { @Override @NotNull IntervalTrigger validate(); - @Override - default boolean shouldStop(long round) { - return noRepeatIndefinitely() && round >= getRepeat(); - } - @Override default JsonObject toJson() { JsonObject self = JsonObject.of("repeat", getRepeat(), "initialDelay", getInitialDelay(), diff --git a/core/src/main/java/io/github/zero88/schedulerx/trigger/Trigger.java b/core/src/main/java/io/github/zero88/schedulerx/trigger/Trigger.java index 9c6ed9d..946001c 100644 --- a/core/src/main/java/io/github/zero88/schedulerx/trigger/Trigger.java +++ b/core/src/main/java/io/github/zero88/schedulerx/trigger/Trigger.java @@ -44,17 +44,6 @@ public interface Trigger extends HasTriggerType, TriggerRepresentation { */ @NotNull Trigger validate(); - /** - * Verify the execution should be stopped after the current execution round is out of the trigger rule. - *

- * This method will be invoked right away after each execution round is finished regardless of the execution result - * is success or error. - * - * @param round the current execution round - * @since 2.0.0 - */ - default boolean shouldStop(long round) { return false; } - /** * Simulate the next trigger times based on default preview parameter({@link PreviewParameter#byDefault()}) * diff --git a/core/src/main/java/io/github/zero88/schedulerx/trigger/TriggerEvaluator.java b/core/src/main/java/io/github/zero88/schedulerx/trigger/TriggerEvaluator.java index d6ad028..18df862 100644 --- a/core/src/main/java/io/github/zero88/schedulerx/trigger/TriggerEvaluator.java +++ b/core/src/main/java/io/github/zero88/schedulerx/trigger/TriggerEvaluator.java @@ -13,15 +13,24 @@ public interface TriggerEvaluator { /** - * Check whether the trigger is able to run + * Verify if the trigger can run before each execution round is started. * - * @param trigger the trigger - * @param context the trigger context - * @param externalId the job external id + * @param trigger the trigger + * @param triggerContext the trigger context + * @param externalId the job external id * @return a future of the trigger context that is evaluated */ - @NotNull Future beforeRun(@NotNull Trigger trigger, @NotNull TriggerContext context, - @Nullable Object externalId); + @NotNull Future beforeTrigger(@NotNull Trigger trigger, @NotNull TriggerContext triggerContext, + @Nullable Object externalId); + + /** + * Verify if the trigger should stop executing immediately after one round of execution begins. + * + * @param round the current execution round + * @since 2.0.0 + */ + @NotNull Future afterTrigger(@NotNull Trigger trigger, @NotNull TriggerContext triggerContext, + @Nullable Object externalId, long round); /** * Chain another evaluator diff --git a/core/src/test/java/io/github/zero88/schedulerx/SchedulerTest.java b/core/src/test/java/io/github/zero88/schedulerx/SchedulerTest.java index 59f6496..5c6a216 100644 --- a/core/src/test/java/io/github/zero88/schedulerx/SchedulerTest.java +++ b/core/src/test/java/io/github/zero88/schedulerx/SchedulerTest.java @@ -17,7 +17,7 @@ import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.MethodSource; -import io.github.zero88.schedulerx.impl.AbstractTriggerEvaluator; +import io.github.zero88.schedulerx.impl.DefaultTriggerEvaluator; import io.github.zero88.schedulerx.trigger.CronScheduler; import io.github.zero88.schedulerx.trigger.CronTrigger; import io.github.zero88.schedulerx.trigger.EventScheduler; @@ -184,11 +184,11 @@ void test_scheduler_should_timeout_in_evaluation(Vertx vertx, VertxTestContext t .setTestContext(testContext) .setMisfire(timeoutAsserter) .build(); - final TriggerEvaluator evaluator = new AbstractTriggerEvaluator() { + final TriggerEvaluator evaluator = new DefaultTriggerEvaluator() { @Override - protected Future internalCheck(@NotNull Trigger trigger, - @NotNull TriggerContext triggerContext, - @Nullable Object externalId) { + protected Future internalBeforeTrigger(@NotNull Trigger trigger, + @NotNull TriggerContext triggerContext, + @Nullable Object externalId) { TestUtils.block(runningTime, testContext); return Future.succeededFuture(triggerContext); } diff --git a/core/src/test/java/io/github/zero88/schedulerx/trigger/IntervalSchedulerTest.java b/core/src/test/java/io/github/zero88/schedulerx/trigger/IntervalSchedulerTest.java index 4de098b..f0ca509 100644 --- a/core/src/test/java/io/github/zero88/schedulerx/trigger/IntervalSchedulerTest.java +++ b/core/src/test/java/io/github/zero88/schedulerx/trigger/IntervalSchedulerTest.java @@ -3,6 +3,7 @@ import static org.junit.jupiter.params.provider.Arguments.arguments; import java.time.Duration; +import java.time.Instant; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Consumer; import java.util.stream.Stream; @@ -22,6 +23,7 @@ import io.github.zero88.schedulerx.TestUtils; import io.vertx.core.Vertx; import io.vertx.core.WorkerExecutor; +import io.vertx.junit5.Checkpoint; import io.vertx.junit5.RunTestOnContext; import io.vertx.junit5.VertxExtension; import io.vertx.junit5.VertxTestContext; @@ -55,21 +57,68 @@ void test_unable_schedule_job_due_to_invalid_config(IntervalTrigger trigger, Str } @Test - void test_run_job_after_delay(Vertx vertx, VertxTestContext ctx) { + void test_job_should_be_executed_in_interval_trigger(Vertx vertx, VertxTestContext context) { + final Consumer> onEach = result -> { + if (result.round() < 3) { + Assertions.assertTrue(result.isError()); + Assertions.assertNotNull(result.error()); + Assertions.assertInstanceOf(RuntimeException.class, result.error()); + Assertions.assertNull(result.data()); + } + if (result.round() == 3) { + Assertions.assertFalse(result.isError()); + Assertions.assertNull(result.error()); + Assertions.assertEquals("OK", result.data()); + } + Assertions.assertNull(result.triggerContext().info()); + }; + final SchedulingAsserter asserter = SchedulingAsserter.builder() + .setTestContext(context) + .setEach(onEach) + .build(); + final Job job = (jobData, ctx) -> { + final long round = ctx.round(); + if (round == 1) { + throw new IllegalArgumentException("throw in execution"); + } + if (round == 2) { + ctx.fail(new IllegalArgumentException("explicit set failed")); + } + if (round == 3) { + ctx.complete("OK"); + } + }; + final IntervalTrigger trigger = IntervalTrigger.builder().interval(2).repeat(3).build(); + IntervalScheduler.builder() + .setVertx(vertx) + .setMonitor(asserter) + .setTrigger(trigger) + .setJob(job) + .build() + .start(); + } + + @Test + void test_job_should_be_executed_in_interval_trigger_after_delay(Vertx vertx, VertxTestContext ctx) { + final Duration initialDelay = Duration.ofSeconds(5); + final Instant startedTime = Instant.now(); final Consumer> onSchedule = result -> { Assertions.assertEquals(0, result.tick()); Assertions.assertEquals(0, result.round()); + final Duration timeLapsed = Duration.between(startedTime, result.availableAt()); + Assertions.assertTrue(timeLapsed.compareTo(initialDelay) > 0); }; - final Consumer> onComplete = result -> { - Assertions.assertEquals(2, result.round()); - Assertions.assertFalse(result.isError()); - }; + final Consumer> onComplete = result -> Assertions.assertEquals(1, result.round()); final SchedulingAsserter asserter = SchedulingAsserter.builder() .setTestContext(ctx) .setSchedule(onSchedule) .setCompleted(onComplete) .build(); - final IntervalTrigger trigger = IntervalTrigger.builder().initialDelay(5).interval(2).repeat(2).build(); + final IntervalTrigger trigger = IntervalTrigger.builder() + .initialDelay(initialDelay.toSeconds()) + .interval(2) + .repeat(1) + .build(); IntervalScheduler.builder() .setVertx(vertx) .setMonitor(asserter) @@ -81,6 +130,7 @@ void test_run_job_after_delay(Vertx vertx, VertxTestContext ctx) { @Test void test_run_blocking_job_till_the_end(Vertx vertx, VertxTestContext testContext) { + final Checkpoint flag = testContext.checkpoint(4); final AtomicLong lastTickOnEach = new AtomicLong(); final Consumer> onEach = result -> { lastTickOnEach.set(result.tick()); @@ -89,6 +139,7 @@ void test_run_blocking_job_till_the_end(Vertx vertx, VertxTestContext testContex } else { Assertions.assertTrue(result.tick() > result.round()); } + flag.flag(); }; final Consumer> onMisfire = result -> { Assertions.assertTrue(result.tick() > result.round()); @@ -96,18 +147,20 @@ void test_run_blocking_job_till_the_end(Vertx vertx, VertxTestContext testContex Assertions.assertEquals("JobIsRunning", result.triggerContext().condition().reasonCode()); }; final Consumer> onComplete = result -> { + final long tickAtClosedTime = result.tick() - 1; + Assertions.assertEquals(lastTickOnEach.get() + result.round(), tickAtClosedTime); Assertions.assertEquals(3, result.round()); - Assertions.assertEquals(lastTickOnEach.get() + result.round(), result.tick()); - Assertions.assertFalse(result.isError()); + flag.flag(); }; final SchedulingAsserter asserter = SchedulingAsserter.builder() .setTestContext(testContext) .setEach(onEach) .setMisfire(onMisfire) .setCompleted(onComplete) + .disableAutoCompleteTest() .build(); final IntervalTrigger trigger = IntervalTrigger.builder().interval(1).repeat(3).build(); - final WorkerExecutor worker = vertx.createSharedWorkerExecutor("hello", 3, 500); + final WorkerExecutor worker = vertx.createSharedWorkerExecutor("hello", 3, 1000); IntervalScheduler.builder() .setVertx(vertx) .setMonitor(asserter) @@ -117,51 +170,10 @@ void test_run_blocking_job_till_the_end(Vertx vertx, VertxTestContext testContex .start(worker); } - @Test - void test_job_should_be_executed_in_interval_trigger(Vertx vertx, VertxTestContext context) { - final Consumer> onEach = result -> { - if (result.round() < 3) { - Assertions.assertTrue(result.isError()); - Assertions.assertNotNull(result.error()); - Assertions.assertInstanceOf(RuntimeException.class, result.error()); - Assertions.assertNull(result.data()); - } - if (result.round() == 3) { - Assertions.assertFalse(result.isError()); - Assertions.assertNull(result.error()); - Assertions.assertEquals("OK", result.data()); - } - Assertions.assertNull(result.triggerContext().info()); - }; - final SchedulingAsserter asserter = SchedulingAsserter.builder() - .setTestContext(context) - .setEach(onEach) - .build(); - final Job job = (jobData, ctx) -> { - final long round = ctx.round(); - if (round == 1) { - throw new IllegalArgumentException("throw in execution"); - } - if (round == 2) { - ctx.fail(new IllegalArgumentException("explicit set failed")); - } - if (round == 3) { - ctx.complete("OK"); - } - }; - final IntervalTrigger trigger = IntervalTrigger.builder().interval(2).repeat(3).build(); - IntervalScheduler.builder() - .setVertx(vertx) - .setMonitor(asserter) - .setTrigger(trigger) - .setJob(job) - .build() - .start(); - } - @Test void test_scheduler_should_be_stopped_when_reach_to_target_round(Vertx vertx, VertxTestContext context) { final Consumer> onCompleted = result -> { + Assertions.assertEquals(3, result.round()); Assertions.assertTrue(result.triggerContext().isStopped()); Assertions.assertEquals("StopByTriggerConfig", result.triggerContext().condition().reasonCode()); }; diff --git a/core/src/testFixtures/java/io/github/zero88/schedulerx/SchedulingAsserter.java b/core/src/testFixtures/java/io/github/zero88/schedulerx/SchedulingAsserter.java index c33d32e..1472182 100644 --- a/core/src/testFixtures/java/io/github/zero88/schedulerx/SchedulingAsserter.java +++ b/core/src/testFixtures/java/io/github/zero88/schedulerx/SchedulingAsserter.java @@ -30,18 +30,20 @@ public final class SchedulingAsserter implements SchedulingMonitor { private final Consumer> misfire; private final Consumer> each; private final Consumer> completed; + private final boolean autoCompleteTest; SchedulingAsserter(@NotNull VertxTestContext testContext, @Nullable SchedulingMonitor logMonitor, Consumer> unableSchedule, Consumer> schedule, Consumer> misfire, Consumer> each, - Consumer> completed) { - this.testContext = Objects.requireNonNull(testContext, "Vertx Test context is required"); - this.logMonitor = Optional.ofNullable(logMonitor).orElse(SchedulingLogMonitor.create()); - this.unableSchedule = unableSchedule; - this.schedule = schedule; - this.misfire = misfire; - this.each = each; - this.completed = completed; + Consumer> completed, boolean autoCompleteTest) { + this.testContext = Objects.requireNonNull(testContext, "Vertx Test context is required"); + this.logMonitor = Optional.ofNullable(logMonitor).orElse(SchedulingLogMonitor.create()); + this.unableSchedule = unableSchedule; + this.schedule = schedule; + this.misfire = misfire; + this.each = each; + this.completed = completed; + this.autoCompleteTest = autoCompleteTest; } @Override @@ -132,7 +134,9 @@ public void onCompleted(@NotNull ExecutionResult result) { Assertions.assertNull(result.finishedAt()); Assertions.assertNull(result.rescheduledAt()); verify(result, completed); - testContext.completeNow(); + if (autoCompleteTest) { + testContext.completeNow(); + } }); } diff --git a/core/src/testFixtures/java/io/github/zero88/schedulerx/SchedulingAsserterBuilder.java b/core/src/testFixtures/java/io/github/zero88/schedulerx/SchedulingAsserterBuilder.java index afb0f31..a6ae202 100644 --- a/core/src/testFixtures/java/io/github/zero88/schedulerx/SchedulingAsserterBuilder.java +++ b/core/src/testFixtures/java/io/github/zero88/schedulerx/SchedulingAsserterBuilder.java @@ -15,6 +15,7 @@ public final class SchedulingAsserterBuilder { private Consumer> misfire; private Consumer> each; private Consumer> completed; + private boolean autoCompleteTest = true; /** * Set Vertx test context @@ -106,13 +107,25 @@ public SchedulingAsserterBuilder setCompleted(Consumer disableAutoCompleteTest() { + this.autoCompleteTest = false; + return this; + } + /** * Build an asserter * * @return SchedulingAsserter */ public SchedulingAsserter build() { - return new SchedulingAsserter<>(testContext, logMonitor, unableSchedule, schedule, misfire, each, completed); + return new SchedulingAsserter<>(testContext, logMonitor, unableSchedule, schedule, misfire, each, completed, + autoCompleteTest); } }