diff --git a/core/src/main/java/io/github/zero88/schedulerx/DefaultOptions.java b/core/src/main/java/io/github/zero88/schedulerx/DefaultOptions.java new file mode 100644 index 0000000..b473ded --- /dev/null +++ b/core/src/main/java/io/github/zero88/schedulerx/DefaultOptions.java @@ -0,0 +1,131 @@ +package io.github.zero88.schedulerx; + +import java.time.Duration; +import java.time.format.DateTimeParseException; + +import io.github.zero88.schedulerx.impl.Utils; +import io.github.zero88.schedulerx.trigger.Trigger; +import io.vertx.core.VertxOptions; + +/** + * Instances of this class are used to configure the default options for {@link Trigger}, and {@link Scheduler} + * instances. + * + * @since 2.0.0 + */ +public final class DefaultOptions { + + public static final String DEFAULT_MAX_EXECUTION_TIMEOUT = "schedulerx.default_max_execution_timeout"; + public static final String DEFAULT_MAX_EVALUATION_TIMEOUT = "schedulerx.default_max_evaluation_timeout"; + public static final String DEFAULT_MAX_TRIGGER_RULE_LEEWAY = "schedulerx.default_max_trigger_rule_leeway"; + public static final String DEFAULT_MAX_TRIGGER_PREVIEW_COUNT = "schedulerx.default_max_trigger_preview_count"; + public static final String DEFAULT_WORKER_THREAD_PREFIX = "schedulerx.default_worker_thread_prefix"; + public static final String DEFAULT_WORKER_THREAD_POOL_SIZE = "schedulerx.default_worker_thread_pool_size"; + + + private static class Holder { + + private static final DefaultOptions INSTANCE = new DefaultOptions(); + + } + + public static DefaultOptions getInstance() { + return DefaultOptions.Holder.INSTANCE; + } + + /** + * Declares the default max execution timeout. Defaults is {@link VertxOptions#DEFAULT_MAX_WORKER_EXECUTE_TIME} + * + * @apiNote It can be overridden by system property with key {@link #DEFAULT_MAX_EXECUTION_TIMEOUT} + */ + public final Duration maxExecutionTimeout; + /** + * Declares the default max trigger evaluation timeout. Defaults is + * {@link VertxOptions#DEFAULT_MAX_EVENT_LOOP_EXECUTE_TIME} + * + * @apiNote It can be overridden by system property with key {@link #DEFAULT_MAX_EVALUATION_TIMEOUT} + */ + public final Duration maxEvaluationTimeout; + /** + * Declares the default max trigger rule leeway time. Defaults is {@code 10 seconds}. + * + * @apiNote It can be overridden by system property with key {@link #DEFAULT_MAX_TRIGGER_RULE_LEEWAY} + */ + public final Duration maxTriggerRuleLeeway; + /** + * Declares the default max number of the trigger preview items. Defaults is {@code 30}. + * + * @apiNote It can be overridden by system property with key {@link #DEFAULT_MAX_TRIGGER_PREVIEW_COUNT} + */ + public final int maxTriggerPreviewCount; + + /** + * Declares the default worker thread name prefix. Defaults is {@code scheduler.x-worker-thread}. + * + * @apiNote It can be overridden by system property with key {@link #DEFAULT_WORKER_THREAD_PREFIX} + */ + public final String workerThreadPrefix; + + /** + * Declares the default worker thread pool size. Defaults is {@code 3}. + * + * @apiNote It can be overridden by system property with key {@link #DEFAULT_WORKER_THREAD_POOL_SIZE} + */ + public final int workerThreadPoolSize; + + DefaultOptions() { + this.maxExecutionTimeout = loadMaxExecutionTimeout(); + this.maxEvaluationTimeout = loadMaxEvaluationTimeout(); + this.maxTriggerRuleLeeway = loadTriggerRuleLeeway(); + this.maxTriggerPreviewCount = loadTriggerPreviewCount(); + this.workerThreadPrefix = loadWorkerThreadPrefix(); + this.workerThreadPoolSize = loadWorkerThreadPoolSize(); + } + + private static Duration loadMaxExecutionTimeout() { + try { + return Duration.parse(System.getProperty(DEFAULT_MAX_EXECUTION_TIMEOUT)); + } catch (DateTimeParseException | NullPointerException ex) { + return Duration.of(VertxOptions.DEFAULT_MAX_WORKER_EXECUTE_TIME, + Utils.toChronoUnit(VertxOptions.DEFAULT_MAX_WORKER_EXECUTE_TIME_UNIT)); + } + } + + private static Duration loadMaxEvaluationTimeout() { + try { + return Duration.parse(System.getProperty(DEFAULT_MAX_EVALUATION_TIMEOUT)); + } catch (DateTimeParseException | NullPointerException ex) { + return Duration.of(VertxOptions.DEFAULT_MAX_EVENT_LOOP_EXECUTE_TIME, + Utils.toChronoUnit(VertxOptions.DEFAULT_MAX_EVENT_LOOP_EXECUTE_TIME_UNIT)); + } + } + + private static Duration loadTriggerRuleLeeway() { + try { + return Duration.parse(System.getProperty(DEFAULT_MAX_TRIGGER_RULE_LEEWAY)); + } catch (DateTimeParseException | NullPointerException ex) { + return Duration.ofSeconds(10); + } + } + + private static int loadTriggerPreviewCount() { + try { + return Integer.parseInt(System.getProperty(DEFAULT_MAX_TRIGGER_PREVIEW_COUNT)); + } catch (NumberFormatException | NullPointerException ex) { + return 30; + } + } + + private static String loadWorkerThreadPrefix() { + return System.getProperty(DEFAULT_WORKER_THREAD_PREFIX, "scheduler.x-worker-thread"); + } + + private static int loadWorkerThreadPoolSize() { + try { + return Integer.parseInt(System.getProperty(DEFAULT_WORKER_THREAD_POOL_SIZE)); + } catch (NumberFormatException | NullPointerException ex) { + return 3; + } + } + +} diff --git a/core/src/main/java/io/github/zero88/schedulerx/ExecutionResult.java b/core/src/main/java/io/github/zero88/schedulerx/ExecutionResult.java index 30f9e8f..e1c8d0c 100644 --- a/core/src/main/java/io/github/zero88/schedulerx/ExecutionResult.java +++ b/core/src/main/java/io/github/zero88/schedulerx/ExecutionResult.java @@ -2,6 +2,7 @@ import java.time.Instant; import java.util.Objects; +import java.util.concurrent.TimeoutException; import org.jetbrains.annotations.Nullable; @@ -140,6 +141,13 @@ public interface ExecutionResult { */ default boolean isError() { return Objects.nonNull(error()); } + /** + * Identify task execution is timed out + * + * @return {@code true} if timeout error + */ + default boolean isTimeout() { return error() instanceof TimeoutException; } + /** * Check whether the trigger is re-registered in the system timer or not after the trigger is available, only in * case of trigger type is {@code cron}. diff --git a/core/src/main/java/io/github/zero88/schedulerx/JobData.java b/core/src/main/java/io/github/zero88/schedulerx/JobData.java index 5c76628..10e7137 100644 --- a/core/src/main/java/io/github/zero88/schedulerx/JobData.java +++ b/core/src/main/java/io/github/zero88/schedulerx/JobData.java @@ -53,7 +53,7 @@ public interface JobData { * @since 2.0.0 */ static JobData empty(@NotNull Object externalId) { - return new JobData() { + return new JobData<>() { public @Nullable D get() { return null; } @Override @@ -80,7 +80,7 @@ static JobData create(@NotNull D data) { * @since 2.0.0 */ static JobData create(@NotNull D data, @NotNull Object externalId) { - return new JobData() { + return new JobData<>() { public D get() { return data; } @Override diff --git a/core/src/main/java/io/github/zero88/schedulerx/SchedulerBuilder.java b/core/src/main/java/io/github/zero88/schedulerx/SchedulerBuilder.java index 0072a88..dd8e25e 100644 --- a/core/src/main/java/io/github/zero88/schedulerx/SchedulerBuilder.java +++ b/core/src/main/java/io/github/zero88/schedulerx/SchedulerBuilder.java @@ -31,6 +31,8 @@ public interface SchedulerBuilder jobData); + @NotNull SELF setTimeoutPolicy(@NotNull TimeoutPolicy timeoutPolicy); + @NotNull SELF setMonitor(@NotNull SchedulingMonitor monitor); @NotNull SCHEDULER build(); diff --git a/core/src/main/java/io/github/zero88/schedulerx/TaskExecutorProperties.java b/core/src/main/java/io/github/zero88/schedulerx/TaskExecutorProperties.java index b816d3b..2c0e284 100644 --- a/core/src/main/java/io/github/zero88/schedulerx/TaskExecutorProperties.java +++ b/core/src/main/java/io/github/zero88/schedulerx/TaskExecutorProperties.java @@ -46,7 +46,7 @@ public interface TaskExecutorProperties { @NotNull Task task(); /** - * Defines job data as input task data + * Declares the job input data * * @return job data * @see JobData @@ -54,4 +54,13 @@ public interface TaskExecutorProperties { @GenIgnore(GenIgnore.PERMITTED_TYPE) @NotNull JobData jobData(); + /** + * Declares the timeout policy + * + * @return timeout policy + * @see TimeoutPolicy + */ + @GenIgnore(GenIgnore.PERMITTED_TYPE) + @NotNull TimeoutPolicy timeoutPolicy(); + } diff --git a/core/src/main/java/io/github/zero88/schedulerx/TimeoutBlock.java b/core/src/main/java/io/github/zero88/schedulerx/TimeoutBlock.java new file mode 100644 index 0000000..c2493ce --- /dev/null +++ b/core/src/main/java/io/github/zero88/schedulerx/TimeoutBlock.java @@ -0,0 +1,47 @@ +package io.github.zero88.schedulerx; + +import java.time.Duration; +import java.util.concurrent.TimeoutException; + +import org.jetbrains.annotations.NotNull; + +import io.github.zero88.schedulerx.impl.Utils.HumanReadableTimeFormat; +import io.vertx.core.Future; +import io.vertx.core.Promise; +import io.vertx.core.Vertx; + +public final class TimeoutBlock { + + private final Vertx vertx; + private final Duration timeout; + + public TimeoutBlock(Vertx vertx, Duration timeout) { + this.vertx = vertx; + this.timeout = timeout; + } + + public Promise wrap(@NotNull Promise promise) { + if (timeout.isNegative() || timeout.isZero()) { + return promise; + } + Future controller = Future.future(p -> vertx.setTimer(timeout.toMillis(), ignore -> p.complete())); + Future.any(promise.future(), controller).onSuccess(event -> { + if (!event.isComplete(0)) { + promise.fail(new NoStackTraceTimeoutException(timeout)); + } + }); + return promise; + } + + static class NoStackTraceTimeoutException extends TimeoutException { + + NoStackTraceTimeoutException(@NotNull Duration timeout) { + super("Timeout after " + HumanReadableTimeFormat.format(timeout)); + } + + @Override + public synchronized Throwable fillInStackTrace() { return this; } + + } + +} diff --git a/core/src/main/java/io/github/zero88/schedulerx/TimeoutPolicy.java b/core/src/main/java/io/github/zero88/schedulerx/TimeoutPolicy.java new file mode 100644 index 0000000..4df9db2 --- /dev/null +++ b/core/src/main/java/io/github/zero88/schedulerx/TimeoutPolicy.java @@ -0,0 +1,62 @@ +package io.github.zero88.schedulerx; + +import java.time.Duration; +import java.util.function.BinaryOperator; + +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonGetter; +import com.fasterxml.jackson.annotation.JsonProperty; + +public final class TimeoutPolicy { + + private final Duration evaluationTimeout; + private final Duration executionTimeout; + + private TimeoutPolicy(Duration evaluationTimeout, Duration executionTimeout) { + this.evaluationTimeout = evaluationTimeout; + this.executionTimeout = executionTimeout; + } + + public static TimeoutPolicy byDefault() { + return create(null, null); + } + + public static TimeoutPolicy create(@NotNull Duration executionTimeout) { + return create(null, executionTimeout); + } + + @JsonCreator + public static TimeoutPolicy create(@JsonProperty("evaluationTimeout") @Nullable Duration evaluationTimeout, + @JsonProperty("executionTimeout") @Nullable Duration executionTimeout) { + final BinaryOperator check = (duration, defaultMax) -> { + if (duration == null || duration.compareTo(Duration.ZERO) <= 0 || duration.compareTo(defaultMax) > 0) { + return defaultMax; + } + return duration; + }; + return new TimeoutPolicy(check.apply(evaluationTimeout, DefaultOptions.getInstance().maxEvaluationTimeout), + check.apply(executionTimeout, DefaultOptions.getInstance().maxExecutionTimeout)); + } + + /** + * Declares the evaluation timeout. Default is {@link DefaultOptions#maxEvaluationTimeout} + * + * @return the evaluation timeout + * @since 2.0.0 + */ + @JsonGetter + public @NotNull Duration evaluationTimeout() { return evaluationTimeout; } + + /** + * Declares the execution timeout. Default is {@link DefaultOptions#maxExecutionTimeout} + * + * @return the execution timeout + * @since 2.0.0 + */ + @JsonGetter + public @NotNull Duration executionTimeout() { return executionTimeout; } + +} diff --git a/core/src/main/java/io/github/zero88/schedulerx/WorkerExecutorFactory.java b/core/src/main/java/io/github/zero88/schedulerx/WorkerExecutorFactory.java new file mode 100644 index 0000000..7384d6e --- /dev/null +++ b/core/src/main/java/io/github/zero88/schedulerx/WorkerExecutorFactory.java @@ -0,0 +1,34 @@ +package io.github.zero88.schedulerx; + +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; + +import io.github.zero88.schedulerx.impl.Utils; +import io.vertx.core.Vertx; +import io.vertx.core.VertxOptions; +import io.vertx.core.WorkerExecutor; + +/** + * A factory to create {@link WorkerExecutor} based on the execution timeout policy. + * + * @since 2.0.0 + */ +public interface WorkerExecutorFactory { + + static @Nullable WorkerExecutor create(@NotNull Vertx vertx, @NotNull TimeoutPolicy timeoutPolicy) { + return WorkerExecutorFactory.create(vertx, timeoutPolicy, + DefaultOptions.getInstance().workerThreadPrefix + "-" + + Utils.randomPositiveInt()); + } + + static @Nullable WorkerExecutor create(@NotNull Vertx vertx, @NotNull TimeoutPolicy timeoutPolicy, + @Nullable String workerThreadName) { + final long executionTimeout = timeoutPolicy.executionTimeout().toMillis(); + if (executionTimeout != VertxOptions.DEFAULT_MAX_WORKER_EXECUTE_TIME) { + return vertx.createSharedWorkerExecutor(workerThreadName, DefaultOptions.getInstance().workerThreadPoolSize, + executionTimeout); + } + return null; + } + +} diff --git a/core/src/main/java/io/github/zero88/schedulerx/impl/AbstractScheduler.java b/core/src/main/java/io/github/zero88/schedulerx/impl/AbstractScheduler.java index 649e940..eae64b6 100644 --- a/core/src/main/java/io/github/zero88/schedulerx/impl/AbstractScheduler.java +++ b/core/src/main/java/io/github/zero88/schedulerx/impl/AbstractScheduler.java @@ -3,8 +3,11 @@ import static io.github.zero88.schedulerx.impl.Utils.brackets; import java.text.MessageFormat; +import java.time.Duration; import java.time.Instant; import java.util.Objects; +import java.util.Optional; +import java.util.concurrent.TimeoutException; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import java.util.function.Consumer; @@ -19,11 +22,13 @@ import io.github.zero88.schedulerx.Scheduler; import io.github.zero88.schedulerx.SchedulingMonitor; import io.github.zero88.schedulerx.Task; +import io.github.zero88.schedulerx.TimeoutBlock; +import io.github.zero88.schedulerx.TimeoutPolicy; +import io.github.zero88.schedulerx.WorkerExecutorFactory; import io.github.zero88.schedulerx.trigger.Trigger; import io.github.zero88.schedulerx.trigger.TriggerCondition.ReasonCode; import io.github.zero88.schedulerx.trigger.TriggerCondition.TriggerStatus; import io.github.zero88.schedulerx.trigger.TriggerContext; -import io.vertx.core.AsyncResult; import io.vertx.core.Future; import io.vertx.core.Promise; import io.vertx.core.Vertx; @@ -50,19 +55,22 @@ public abstract class AbstractScheduler implements S private final @NotNull JobData jobData; private final @NotNull Task task; private final @NotNull T trigger; + private final @NotNull TimeoutPolicy timeoutPolicy; private final Lock lock = new ReentrantLock(); private boolean didStart = false; private boolean didTriggerValidation = false; private IllegalArgumentException invalidTrigger; protected AbstractScheduler(@NotNull Vertx vertx, @NotNull SchedulingMonitor monitor, - @NotNull JobData jobData, @NotNull Task task, @NotNull T trigger) { - this.vertx = vertx; - this.monitor = monitor; - this.jobData = jobData; - this.task = task; - this.trigger = trigger; - this.state = new SchedulerStateImpl<>(); + @NotNull JobData jobData, @NotNull Task task, @NotNull T trigger, + @NotNull TimeoutPolicy timeoutPolicy) { + this.vertx = vertx; + this.monitor = monitor; + this.jobData = jobData; + this.task = task; + this.trigger = trigger; + this.timeoutPolicy = timeoutPolicy; + this.state = new SchedulerStateImpl<>(); } @Override @@ -77,6 +85,9 @@ protected AbstractScheduler(@NotNull Vertx vertx, @NotNull SchedulingMonitor task() { return this.task; } + @Override + public @NotNull TimeoutPolicy timeoutPolicy() { return this.timeoutPolicy; } + @Override @SuppressWarnings({ "java:S1193", "unchecked" }) public final @NotNull T trigger() { @@ -111,7 +122,7 @@ public final void start(WorkerExecutor workerExecutor) { if (didStart) { throw new IllegalStateException("The executor is already started!"); } - doStart(workerExecutor); + doStart(workerExecutor == null ? WorkerExecutorFactory.create(vertx, timeoutPolicy) : workerExecutor); didStart = true; } finally { lock.unlock(); @@ -220,9 +231,13 @@ protected final long onFire(long timerId) { */ protected final void onProcess(WorkerExecutor workerExecutor, TriggerTransitionContext kickoffContext) { log(Objects.requireNonNull(kickoffContext.firedAt()), "On fire"); - this.executeBlocking(workerExecutor, p -> p.complete(shouldRun(kickoffContext))) + this.executeBlocking(workerExecutor, + p -> wrapTimeout(timeoutPolicy().evaluationTimeout(), + p).complete(shouldRun(kickoffContext))) .onSuccess(context -> onTrigger(workerExecutor, context)) - .onFailure(t -> onMisfire(TriggerContextFactory.skip(kickoffContext, ReasonCode.UNEXPECTED_ERROR, t))); + .onFailure(t -> onMisfire(TriggerContextFactory.skip(kickoffContext, t instanceof TimeoutException + ? ReasonCode.EVALUATION_TIMEOUT + : ReasonCode.UNEXPECTED_ERROR, t))); } protected final void onTrigger(WorkerExecutor workerExecutor, TriggerTransitionContext triggerContext) { @@ -233,8 +248,9 @@ protected final void onTrigger(WorkerExecutor workerExecutor, TriggerTransitionC final ExecutionContextInternal executionContext = new ExecutionContextImpl<>(vertx, triggerContext, state.increaseRound()); log(executionContext.triggeredAt(), "On trigger", triggerContext.tick(), executionContext.round()); - this.executeBlocking(workerExecutor, p -> executeTask(executionContext.setup(p))) - .onComplete(ar -> onResult(triggerContext, ar)); + this.executeBlocking(workerExecutor, p -> executeTask( + executionContext.setup(wrapTimeout(timeoutPolicy().executionTimeout(), p)))) + .onComplete(ar -> onResult(executionContext, ar.cause())); } protected final void onSchedule(long timerId) { @@ -284,32 +300,31 @@ protected final void onMisfire(@NotNull TriggerTransitionContext triggerContext) .build()); } - @SuppressWarnings("unchecked") - protected final void onResult(@NotNull TriggerTransitionContext triggerContext, - @NotNull AsyncResult asyncResult) { + protected final void onResult(@NotNull ExecutionContext executionContext, @Nullable Throwable asyncCause) { + final ExecutionContextInternal ctx = (ExecutionContextInternal) executionContext; + final TriggerTransitionContext triggerContext = ctx.triggerContext(); final Instant finishedAt = state.markFinished(triggerContext.tick()); - TriggerTransitionContext transitionCtx; - if (asyncResult.succeeded()) { - final ExecutionContextInternal executionCtx = (ExecutionContextInternal) asyncResult.result(); - log(finishedAt, "On result", triggerContext.tick(), executionCtx.round()); - monitor.onEach(ExecutionResultImpl.builder() - .setExternalId(jobData.externalId()) - .setAvailableAt(state.availableAt()) - .setTriggerContext(triggerContext) - .setTick(triggerContext.tick()) - .setFiredAt(triggerContext.firedAt()) - .setRound(executionCtx.round()) - .setTriggeredAt(executionCtx.triggeredAt()) - .setExecutedAt(executionCtx.executedAt()) - .setFinishedAt(finishedAt) - .setData(state.addData(executionCtx.round(), executionCtx.data())) - .setError(state.addError(executionCtx.round(), executionCtx.error())) - .build()); - transitionCtx = shouldStop(triggerContext, executionCtx.isForceStop(), executionCtx.round()); - } else { - LOGGER.warn(genMsg(state.tick(), state.round(), finishedAt, "Programming error"), asyncResult.cause()); - transitionCtx = shouldStop(triggerContext, false, state.round()); + log(finishedAt, "On result", triggerContext.tick(), ctx.round()); + if (asyncCause instanceof TimeoutException) { + LOGGER.warn(genMsg(state.tick(), state.round(), finishedAt, asyncCause.getMessage())); + } else if (asyncCause != null) { + LOGGER.error(genMsg(triggerContext.tick(), ctx.round(), finishedAt, "System error"), asyncCause); } + monitor.onEach(ExecutionResultImpl.builder() + .setExternalId(jobData.externalId()) + .setAvailableAt(state.availableAt()) + .setTriggerContext(triggerContext) + .setTick(triggerContext.tick()) + .setFiredAt(triggerContext.firedAt()) + .setRound(ctx.round()) + .setTriggeredAt(ctx.triggeredAt()) + .setExecutedAt(ctx.executedAt()) + .setFinishedAt(finishedAt) + .setData(state.addData(ctx.round(), ctx.data())) + .setError(state.addError(ctx.round(), + Optional.ofNullable(ctx.error()).orElse(asyncCause))) + .build()); + final TriggerTransitionContext transitionCtx = shouldStop(triggerContext, ctx.isForceStop(), ctx.round()); if (transitionCtx.isStopped()) { doStop(state.timerId(), transitionCtx); } @@ -358,4 +373,8 @@ private Future executeBlocking(WorkerExecutor workerExecutor, Consumer Promise wrapTimeout(Duration timeout, Promise promise) { + return new TimeoutBlock(vertx, timeout).wrap(promise); + } + } diff --git a/core/src/main/java/io/github/zero88/schedulerx/impl/AbstractSchedulerBuilder.java b/core/src/main/java/io/github/zero88/schedulerx/impl/AbstractSchedulerBuilder.java index 9a4d544..7b18f0a 100644 --- a/core/src/main/java/io/github/zero88/schedulerx/impl/AbstractSchedulerBuilder.java +++ b/core/src/main/java/io/github/zero88/schedulerx/impl/AbstractSchedulerBuilder.java @@ -12,6 +12,7 @@ import io.github.zero88.schedulerx.SchedulingLogMonitor; import io.github.zero88.schedulerx.SchedulingMonitor; import io.github.zero88.schedulerx.Task; +import io.github.zero88.schedulerx.TimeoutPolicy; import io.github.zero88.schedulerx.trigger.Trigger; import io.vertx.core.Vertx; @@ -29,6 +30,7 @@ public abstract class AbstractSchedulerBuilder jobData; private Task task; private T trigger; + private TimeoutPolicy timeoutPolicy; @Override public @NotNull Vertx vertx() { @@ -49,6 +51,11 @@ public abstract class AbstractSchedulerBuilder jobData() { return Optional.ofNullable(jobData).orElseGet(JobData::empty); } + @Override + public @NotNull TimeoutPolicy timeoutPolicy() { + return Optional.ofNullable(timeoutPolicy).orElseGet(TimeoutPolicy::byDefault); + } + public @NotNull B setVertx(@NotNull Vertx vertx) { this.vertx = vertx; return (B) this; @@ -74,4 +81,10 @@ public abstract class AbstractSchedulerBuilder implements ExecutionContextInternal promise; @@ -21,7 +20,7 @@ final class ExecutionContextImpl implements ExecutionContextInternal implements ExecutionContextInternal extends ExecutionContext { + @NotNull TriggerTransitionContext triggerContext(); + /** * Prepare to execute task * diff --git a/core/src/main/java/io/github/zero88/schedulerx/impl/Utils.java b/core/src/main/java/io/github/zero88/schedulerx/impl/Utils.java index ef84f25..98fc314 100644 --- a/core/src/main/java/io/github/zero88/schedulerx/impl/Utils.java +++ b/core/src/main/java/io/github/zero88/schedulerx/impl/Utils.java @@ -1,6 +1,7 @@ package io.github.zero88.schedulerx.impl; import java.security.SecureRandom; +import java.time.Duration; import java.time.temporal.ChronoUnit; import java.util.concurrent.TimeUnit; @@ -61,4 +62,14 @@ public static T castOrNull(Object data, boolean nullOrThrow) { } } + public static class HumanReadableTimeFormat { + + private HumanReadableTimeFormat() { } + + public static String format(Duration duration) { + return duration.toString().substring(2).replaceAll("(\\d[HMS])(?!$)", "$1 ").toLowerCase(); + } + + } + } diff --git a/core/src/main/java/io/github/zero88/schedulerx/trigger/CronSchedulerBuilder.java b/core/src/main/java/io/github/zero88/schedulerx/trigger/CronSchedulerBuilder.java index c2dfd9c..bb71532 100644 --- a/core/src/main/java/io/github/zero88/schedulerx/trigger/CronSchedulerBuilder.java +++ b/core/src/main/java/io/github/zero88/schedulerx/trigger/CronSchedulerBuilder.java @@ -6,6 +6,7 @@ import io.github.zero88.schedulerx.SchedulerBuilder; import io.github.zero88.schedulerx.SchedulingMonitor; import io.github.zero88.schedulerx.Task; +import io.github.zero88.schedulerx.TimeoutPolicy; import io.vertx.codegen.annotations.Fluent; import io.vertx.codegen.annotations.GenIgnore; import io.vertx.codegen.annotations.VertxGen; @@ -40,6 +41,10 @@ public interface CronSchedulerBuilder @GenIgnore(GenIgnore.PERMITTED_TYPE) @NotNull CronSchedulerBuilder setJobData(@NotNull JobData jobData); + @Fluent + @GenIgnore(GenIgnore.PERMITTED_TYPE) + @NotNull CronSchedulerBuilder setTimeoutPolicy(@NotNull TimeoutPolicy timeoutPolicy); + @Fluent @GenIgnore(GenIgnore.PERMITTED_TYPE) @NotNull CronSchedulerBuilder setMonitor(@NotNull SchedulingMonitor monitor); diff --git a/core/src/main/java/io/github/zero88/schedulerx/trigger/CronSchedulerImpl.java b/core/src/main/java/io/github/zero88/schedulerx/trigger/CronSchedulerImpl.java index 8b77600..8ec7668 100644 --- a/core/src/main/java/io/github/zero88/schedulerx/trigger/CronSchedulerImpl.java +++ b/core/src/main/java/io/github/zero88/schedulerx/trigger/CronSchedulerImpl.java @@ -10,6 +10,7 @@ import io.github.zero88.schedulerx.JobData; import io.github.zero88.schedulerx.SchedulingMonitor; import io.github.zero88.schedulerx.Task; +import io.github.zero88.schedulerx.TimeoutPolicy; import io.github.zero88.schedulerx.impl.AbstractScheduler; import io.github.zero88.schedulerx.impl.AbstractSchedulerBuilder; import io.github.zero88.schedulerx.impl.TriggerContextFactory; @@ -23,8 +24,8 @@ final class CronSchedulerImpl extends AbstractScheduler monitor, @NotNull JobData jobData, - @NotNull Task task, @NotNull CronTrigger trigger) { - super(vertx, monitor, jobData, task, trigger); + @NotNull Task task, @NotNull CronTrigger trigger, @NotNull TimeoutPolicy timeoutPolicy) { + super(vertx, monitor, jobData, task, trigger, timeoutPolicy); } @Override @@ -55,7 +56,7 @@ static final class CronSchedulerBuilderImpl implements CronSchedulerBuilder { public @NotNull CronScheduler build() { - return new CronSchedulerImpl<>(vertx(), monitor(), jobData(), task(), trigger()); + return new CronSchedulerImpl<>(vertx(), monitor(), jobData(), task(), trigger(), timeoutPolicy()); } } diff --git a/core/src/main/java/io/github/zero88/schedulerx/trigger/EventSchedulerBuilder.java b/core/src/main/java/io/github/zero88/schedulerx/trigger/EventSchedulerBuilder.java index 6dfe03f..f9e01aa 100644 --- a/core/src/main/java/io/github/zero88/schedulerx/trigger/EventSchedulerBuilder.java +++ b/core/src/main/java/io/github/zero88/schedulerx/trigger/EventSchedulerBuilder.java @@ -6,6 +6,7 @@ import io.github.zero88.schedulerx.SchedulerBuilder; import io.github.zero88.schedulerx.SchedulingMonitor; import io.github.zero88.schedulerx.Task; +import io.github.zero88.schedulerx.TimeoutPolicy; import io.vertx.codegen.annotations.Fluent; import io.vertx.codegen.annotations.GenIgnore; import io.vertx.codegen.annotations.VertxGen; @@ -41,6 +42,10 @@ public interface EventSchedulerBuilder @GenIgnore(GenIgnore.PERMITTED_TYPE) @NotNull EventSchedulerBuilder setJobData(@NotNull JobData jobData); + @Fluent + @GenIgnore(GenIgnore.PERMITTED_TYPE) + @NotNull EventSchedulerBuilder setTimeoutPolicy(@NotNull TimeoutPolicy timeoutPolicy); + @Fluent @GenIgnore(GenIgnore.PERMITTED_TYPE) @NotNull EventSchedulerBuilder setMonitor(@NotNull SchedulingMonitor monitor); diff --git a/core/src/main/java/io/github/zero88/schedulerx/trigger/EventSchedulerImpl.java b/core/src/main/java/io/github/zero88/schedulerx/trigger/EventSchedulerImpl.java index 17bb502..b8a1ff3 100644 --- a/core/src/main/java/io/github/zero88/schedulerx/trigger/EventSchedulerImpl.java +++ b/core/src/main/java/io/github/zero88/schedulerx/trigger/EventSchedulerImpl.java @@ -10,6 +10,7 @@ import io.github.zero88.schedulerx.JobData; import io.github.zero88.schedulerx.SchedulingMonitor; import io.github.zero88.schedulerx.Task; +import io.github.zero88.schedulerx.TimeoutPolicy; import io.github.zero88.schedulerx.impl.AbstractScheduler; import io.github.zero88.schedulerx.impl.AbstractSchedulerBuilder; import io.github.zero88.schedulerx.impl.TriggerContextFactory; @@ -29,8 +30,9 @@ final class EventSchedulerImpl extends AbstractScheduler consumer; EventSchedulerImpl(@NotNull Vertx vertx, @NotNull SchedulingMonitor monitor, @NotNull JobData jobData, - @NotNull Task task, @NotNull EventTrigger trigger) { - super(vertx, monitor, jobData, task, trigger); + @NotNull Task task, @NotNull EventTrigger trigger, + @NotNull TimeoutPolicy timeoutPolicy) { + super(vertx, monitor, jobData, task, trigger, timeoutPolicy); } @Override @@ -102,7 +104,7 @@ static final class EventSchedulerBuilderImpl // @formatter:on public @NotNull EventScheduler build() { - return new EventSchedulerImpl<>(vertx(), monitor(), jobData(), task(), trigger()); + return new EventSchedulerImpl<>(vertx(), monitor(), jobData(), task(), trigger(), timeoutPolicy()); } } diff --git a/core/src/main/java/io/github/zero88/schedulerx/trigger/IntervalSchedulerBuilder.java b/core/src/main/java/io/github/zero88/schedulerx/trigger/IntervalSchedulerBuilder.java index 15d996b..fa70f7d 100644 --- a/core/src/main/java/io/github/zero88/schedulerx/trigger/IntervalSchedulerBuilder.java +++ b/core/src/main/java/io/github/zero88/schedulerx/trigger/IntervalSchedulerBuilder.java @@ -6,6 +6,7 @@ import io.github.zero88.schedulerx.SchedulerBuilder; import io.github.zero88.schedulerx.SchedulingMonitor; import io.github.zero88.schedulerx.Task; +import io.github.zero88.schedulerx.TimeoutPolicy; import io.vertx.codegen.annotations.Fluent; import io.vertx.codegen.annotations.GenIgnore; import io.vertx.codegen.annotations.VertxGen; @@ -39,6 +40,10 @@ public interface IntervalSchedulerBuilder @GenIgnore(GenIgnore.PERMITTED_TYPE) @NotNull IntervalSchedulerBuilder setJobData(@NotNull JobData jobData); + @Fluent + @GenIgnore(GenIgnore.PERMITTED_TYPE) + @NotNull IntervalSchedulerBuilder setTimeoutPolicy(@NotNull TimeoutPolicy timeoutPolicy); + @Fluent @GenIgnore(GenIgnore.PERMITTED_TYPE) @NotNull IntervalSchedulerBuilder setMonitor(@NotNull SchedulingMonitor monitor); diff --git a/core/src/main/java/io/github/zero88/schedulerx/trigger/IntervalSchedulerImpl.java b/core/src/main/java/io/github/zero88/schedulerx/trigger/IntervalSchedulerImpl.java index 162e9e3..9aa3370 100644 --- a/core/src/main/java/io/github/zero88/schedulerx/trigger/IntervalSchedulerImpl.java +++ b/core/src/main/java/io/github/zero88/schedulerx/trigger/IntervalSchedulerImpl.java @@ -9,6 +9,7 @@ import io.github.zero88.schedulerx.JobData; import io.github.zero88.schedulerx.SchedulingMonitor; import io.github.zero88.schedulerx.Task; +import io.github.zero88.schedulerx.TimeoutPolicy; import io.github.zero88.schedulerx.impl.AbstractScheduler; import io.github.zero88.schedulerx.impl.AbstractSchedulerBuilder; import io.github.zero88.schedulerx.impl.TriggerContextFactory; @@ -21,8 +22,9 @@ final class IntervalSchedulerImpl extends AbstractScheduler { IntervalSchedulerImpl(@NotNull Vertx vertx, @NotNull SchedulingMonitor monitor, @NotNull JobData jobData, - @NotNull Task task, @NotNull IntervalTrigger trigger) { - super(vertx, monitor, jobData, task, trigger); + @NotNull Task task, @NotNull IntervalTrigger trigger, + @NotNull TimeoutPolicy timeoutPolicy) { + super(vertx, monitor, jobData, task, trigger, timeoutPolicy); } protected @NotNull Future registerTimer(WorkerExecutor workerExecutor) { @@ -47,8 +49,10 @@ protected void unregisterTimer(long timerId) { } private long createPeriodicTimer(WorkerExecutor executor) { - return vertx().setPeriodic(trigger().intervalInMilliseconds(), - id -> onProcess(executor, TriggerContextFactory.kickoff(trigger().type(), onFire(id)))); + return vertx().setPeriodic(trigger().intervalInMilliseconds(), id -> onProcess(executor, + TriggerContextFactory.kickoff( + trigger().type(), + onFire(id)))); } // @formatter:off @@ -58,7 +62,7 @@ static final class IntervalSchedulerBuilderImpl // @formatter:on public @NotNull IntervalScheduler build() { - return new IntervalSchedulerImpl<>(vertx(), monitor(), jobData(), task(), trigger()); + return new IntervalSchedulerImpl<>(vertx(), monitor(), jobData(), task(), trigger(), timeoutPolicy()); } } diff --git a/core/src/main/java/io/github/zero88/schedulerx/trigger/PreviewParameter.java b/core/src/main/java/io/github/zero88/schedulerx/trigger/PreviewParameter.java index 64a1d9d..8c88958 100644 --- a/core/src/main/java/io/github/zero88/schedulerx/trigger/PreviewParameter.java +++ b/core/src/main/java/io/github/zero88/schedulerx/trigger/PreviewParameter.java @@ -9,6 +9,7 @@ import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; +import io.github.zero88.schedulerx.DefaultOptions; import io.github.zero88.schedulerx.trigger.rule.TriggerRule; /** @@ -18,8 +19,6 @@ */ public final class PreviewParameter { - public static final int MAX_TIMES = 30; - private int times; private Instant startedAt = Instant.now(); private ZoneId timeZone; @@ -55,10 +54,10 @@ public static PreviewParameter byDefault() { } /** - * @return the number of a preview item, maximum is {@link #MAX_TIMES} + * @return the number of a preview item, maximum is {@link DefaultOptions#maxTriggerPreviewCount} */ public int getTimes() { - return Math.max(1, Math.min(times, MAX_TIMES)); + return Math.max(1, Math.min(times, DefaultOptions.getInstance().maxTriggerPreviewCount)); } /** diff --git a/core/src/main/java/io/github/zero88/schedulerx/trigger/TriggerCondition.java b/core/src/main/java/io/github/zero88/schedulerx/trigger/TriggerCondition.java index b729389..8600ac3 100644 --- a/core/src/main/java/io/github/zero88/schedulerx/trigger/TriggerCondition.java +++ b/core/src/main/java/io/github/zero88/schedulerx/trigger/TriggerCondition.java @@ -63,9 +63,10 @@ class ReasonCode { public static final String NOT_YET_SCHEDULED = "TriggerIsNotYetScheduled"; public static final String ALREADY_STOPPED = "TriggerIsAlreadyStopped"; public static final String CONDITION_IS_NOT_MATCHED = "ConditionIsNotMatched"; - public static final String STOP_BY_TASK = "ForceStopByTask"; + public static final String EVALUATION_TIMEOUT = "TriggerEvaluationTimeout"; + public static final String STOP_BY_TASK = "ForceStop"; public static final String STOP_BY_CONFIG = "StopByTriggerConfig"; - public static final String STOP_BY_MANUAL = "StopManually"; + public static final String STOP_BY_MANUAL = "ManualStop"; public static final String TASK_IS_RUNNING = "TaskIsRunning"; public static final String UNEXPECTED_ERROR = "UnexpectedError"; diff --git a/core/src/main/java/io/github/zero88/schedulerx/trigger/rule/TriggerRule.java b/core/src/main/java/io/github/zero88/schedulerx/trigger/rule/TriggerRule.java index 6f579a3..1b9e3cb 100644 --- a/core/src/main/java/io/github/zero88/schedulerx/trigger/rule/TriggerRule.java +++ b/core/src/main/java/io/github/zero88/schedulerx/trigger/rule/TriggerRule.java @@ -7,6 +7,8 @@ import org.jetbrains.annotations.NotNull; +import io.github.zero88.schedulerx.DefaultOptions; + import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonGetter; import com.fasterxml.jackson.annotation.JsonProperty; @@ -19,11 +21,6 @@ @SuppressWarnings("rawtypes") public interface TriggerRule { - /** - * A maximum of leeway time - */ - Duration MAX_LEEWAY = Duration.ofSeconds(30); - /** * A no-op trigger rule. */ @@ -47,7 +44,14 @@ public interface TriggerRule { Instant until(); /** - * Declares the allowable margin of time in the time validation of {@link #satisfy(Instant)} and {@link #until()} + * Declares the allowable margin of time in the time validation of {@link #satisfy(Instant)} and {@link #until()}. + *

+ * The leeway time has constraints: + *

    + *
  • when given argument is negative, the leeway time fallback to {@link Duration#ZERO}
  • + *
  • when given argument is greater than {@link DefaultOptions#maxTriggerRuleLeeway}, the leeway time fallback + * to max default value
  • + *
* * @return the leeway time */ @@ -86,12 +90,11 @@ default boolean isExceeded(@NotNull Instant firedAt) { return create(timeframes, null, null); } - /** * Create a new trigger rule * * @param timeframes the given timeframes - * @param leeway the given leeway + * @param leeway the given leeway * @return a new Trigger rule */ static @NotNull TriggerRule create(List timeframes, Duration leeway) { @@ -111,7 +114,7 @@ default boolean isExceeded(@NotNull Instant firedAt) { /** * Create a new trigger rule * - * @param until the given until + * @param until the given until * @param leeway the given leeway * @return a new Trigger rule */ diff --git a/core/src/main/java/io/github/zero88/schedulerx/trigger/rule/TriggerRuleImpl.java b/core/src/main/java/io/github/zero88/schedulerx/trigger/rule/TriggerRuleImpl.java index e018571..fdf8b8d 100644 --- a/core/src/main/java/io/github/zero88/schedulerx/trigger/rule/TriggerRuleImpl.java +++ b/core/src/main/java/io/github/zero88/schedulerx/trigger/rule/TriggerRuleImpl.java @@ -9,6 +9,8 @@ import org.jetbrains.annotations.NotNull; +import io.github.zero88.schedulerx.DefaultOptions; + @SuppressWarnings("rawtypes") final class TriggerRuleImpl implements TriggerRule { @@ -65,13 +67,11 @@ private int computeHashCode() { @NotNull private static Duration validateLeewayTime(Duration leeway) { final Duration given = Optional.ofNullable(leeway).orElse(Duration.ZERO); - if (given.compareTo(MAX_LEEWAY) > 0) { - return MAX_LEEWAY; - } if (given.isNegative()) { return Duration.ZERO; } - return given; + final Duration maxLeeway = DefaultOptions.getInstance().maxTriggerRuleLeeway; + return given.compareTo(maxLeeway) > 0 ? maxLeeway : given; } } diff --git a/core/src/test/java/io/github/zero88/schedulerx/DefaultOptionsTest.java b/core/src/test/java/io/github/zero88/schedulerx/DefaultOptionsTest.java new file mode 100644 index 0000000..f80bbc5 --- /dev/null +++ b/core/src/test/java/io/github/zero88/schedulerx/DefaultOptionsTest.java @@ -0,0 +1,60 @@ +package io.github.zero88.schedulerx; + +import java.time.Duration; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.parallel.Execution; +import org.junit.jupiter.api.parallel.ExecutionMode; +import org.junitpioneer.jupiter.SetSystemProperty; + +@Execution(ExecutionMode.SAME_THREAD) +class DefaultOptionsTest { + + @Test + void test_default_options() { + Assertions.assertEquals(Duration.ofSeconds(2), DefaultOptions.getInstance().maxEvaluationTimeout); + Assertions.assertEquals(Duration.ofSeconds(60), DefaultOptions.getInstance().maxExecutionTimeout); + Assertions.assertEquals(30, DefaultOptions.getInstance().maxTriggerPreviewCount); + Assertions.assertEquals(Duration.ofSeconds(10), DefaultOptions.getInstance().maxTriggerRuleLeeway); + Assertions.assertEquals("scheduler.x-worker-thread", DefaultOptions.getInstance().workerThreadPrefix); + Assertions.assertEquals(3, DefaultOptions.getInstance().workerThreadPoolSize); + } + + @Test + @SetSystemProperty(key = "schedulerx.default_max_evaluation_timeout", value = "PT1S") + void test_override_max_evaluation_timeout() { + Assertions.assertEquals(Duration.ofSeconds(1), new DefaultOptions().maxEvaluationTimeout); + } + + @Test + @SetSystemProperty(key = "schedulerx.default_max_execution_timeout", value = "PT10M") + void test_override_max_execution_timeout() { + Assertions.assertEquals(Duration.ofMinutes(10), new DefaultOptions().maxExecutionTimeout); + } + + @Test + @SetSystemProperty(key = "schedulerx.default_max_trigger_preview_count", value = "12") + void test_override_max_preview_count() { + Assertions.assertEquals(12, new DefaultOptions().maxTriggerPreviewCount); + } + + @Test + @SetSystemProperty(key = "schedulerx.default_max_trigger_rule_leeway", value = "PT3S") + void test_override_max_trigger_rule_leeway() { + Assertions.assertEquals(Duration.ofSeconds(3), new DefaultOptions().maxTriggerRuleLeeway); + } + + @Test + @SetSystemProperty(key = "schedulerx.default_worker_thread_prefix", value = "hello-there") + void test_override_worker_thread_prefix() { + Assertions.assertEquals("hello-there", new DefaultOptions().workerThreadPrefix); + } + + @Test + @SetSystemProperty(key = "schedulerx.default_worker_thread_pool_size", value = "10") + void test_override_worker_thread_pool_size() { + Assertions.assertEquals(10, new DefaultOptions().workerThreadPoolSize); + } + +} diff --git a/core/src/test/java/io/github/zero88/schedulerx/SchedulerTest.java b/core/src/test/java/io/github/zero88/schedulerx/SchedulerTest.java index 80b64cc..58f91bc 100644 --- a/core/src/test/java/io/github/zero88/schedulerx/SchedulerTest.java +++ b/core/src/test/java/io/github/zero88/schedulerx/SchedulerTest.java @@ -138,6 +138,33 @@ void test_scheduler_should_maintain_external_id_from_jobData_to_task_result(Obje .start(); } + @Test + void test_scheduler_should_timeout_in_execution(Vertx vertx, VertxTestContext testContext) { + final Duration timeout = Duration.ofSeconds(2); + final Duration runningTime = Duration.ofSeconds(3); + final Consumer> timeoutAsserter = result -> { + Assertions.assertTrue(result.isError()); + Assertions.assertTrue(result.isTimeout()); + Assertions.assertEquals("Timeout after 2s", result.error().getMessage()); + }; + final SchedulingAsserter asserter = SchedulingAsserter.builder() + .setTestContext(testContext) + .setEach(timeoutAsserter) + .build(); + final Task task = (jobData, executionContext) -> { + TestUtils.block(runningTime, testContext); + Assertions.assertTrue(Thread.currentThread().getName().startsWith("scheduler.x-worker-thread")); + }; + IntervalScheduler.builder() + .setVertx(vertx) + .setMonitor(asserter) + .setTrigger(IntervalTrigger.builder().interval(5).repeat(1).build()) + .setTask(task) + .setTimeoutPolicy(TimeoutPolicy.create(timeout)) + .build() + .start(); + } + @Test void test_scheduler_should_able_to_force_stop(Vertx vertx, VertxTestContext testContext) { final Consumer> completed = result -> { diff --git a/core/src/test/java/io/github/zero88/schedulerx/TestUtils.java b/core/src/test/java/io/github/zero88/schedulerx/TestUtils.java index e54f250..eac0c64 100644 --- a/core/src/test/java/io/github/zero88/schedulerx/TestUtils.java +++ b/core/src/test/java/io/github/zero88/schedulerx/TestUtils.java @@ -11,8 +11,13 @@ import io.vertx.core.impl.logging.Logger; import io.vertx.core.impl.logging.LoggerFactory; +import io.vertx.core.json.jackson.DatabindCodec; import io.vertx.junit5.VertxTestContext; +import com.fasterxml.jackson.annotation.JsonInclude.Include; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.SerializationFeature; + public final class TestUtils { private static final Logger LOGGER = LoggerFactory.getLogger(TestUtils.class); @@ -42,6 +47,15 @@ public static List simulateRunActionInParallel(VertxTestContext testC return store; } + public static ObjectMapper defaultMapper() { + return DatabindCodec.mapper() + .copy() + .findAndRegisterModules() + .disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS) + .disable(SerializationFeature.WRITE_DURATIONS_AS_TIMESTAMPS) + .setSerializationInclusion(Include.NON_NULL); + } + public static class TestWorker extends Thread { protected static final Logger LOGGER = LoggerFactory.getLogger(TestWorker.class); diff --git a/core/src/test/java/io/github/zero88/schedulerx/TimeoutPolicyTest.java b/core/src/test/java/io/github/zero88/schedulerx/TimeoutPolicyTest.java new file mode 100644 index 0000000..db20d27 --- /dev/null +++ b/core/src/test/java/io/github/zero88/schedulerx/TimeoutPolicyTest.java @@ -0,0 +1,37 @@ +package io.github.zero88.schedulerx; + +import java.time.Duration; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; + +class TimeoutPolicyTest { + + @Test + void test_default() { + final TimeoutPolicy timeoutPolicy = TimeoutPolicy.byDefault(); + Assertions.assertEquals(Duration.ofSeconds(2), timeoutPolicy.evaluationTimeout()); + Assertions.assertEquals(Duration.ofSeconds(60), timeoutPolicy.executionTimeout()); + } + + @Test + void test_serialize_deserialize() throws JsonProcessingException { + ObjectMapper mapper = TestUtils.defaultMapper(); + final String expected = "{\"evaluationTimeout\":\"PT1S\",\"executionTimeout\":\"PT30S\"}"; + final TimeoutPolicy timeoutPolicy = TimeoutPolicy.create(Duration.ofSeconds(1), Duration.ofSeconds(30)); + Assertions.assertEquals(expected, mapper.writeValueAsString(timeoutPolicy)); + final TimeoutPolicy deserialized = mapper.readerFor(TimeoutPolicy.class).readValue(expected); + Assertions.assertEquals(timeoutPolicy.evaluationTimeout(), deserialized.evaluationTimeout()); + Assertions.assertEquals(timeoutPolicy.executionTimeout(), deserialized.executionTimeout()); + } + + @Test + void test_abnormal_value() { + final TimeoutPolicy t1 = TimeoutPolicy.create(Duration.ZERO, Duration.ofSeconds(100)); + Assertions.assertEquals(Duration.ofSeconds(2), t1.evaluationTimeout()); + Assertions.assertEquals(Duration.ofSeconds(60), t1.executionTimeout()); + } +} diff --git a/core/src/test/java/io/github/zero88/schedulerx/trigger/IntervalSchedulerTest.java b/core/src/test/java/io/github/zero88/schedulerx/trigger/IntervalSchedulerTest.java index 470fddc..aa787b8 100644 --- a/core/src/test/java/io/github/zero88/schedulerx/trigger/IntervalSchedulerTest.java +++ b/core/src/test/java/io/github/zero88/schedulerx/trigger/IntervalSchedulerTest.java @@ -22,7 +22,7 @@ import io.github.zero88.schedulerx.TestUtils; import io.github.zero88.schedulerx.trigger.TriggerCondition.ReasonCode; import io.vertx.core.Vertx; -import io.vertx.junit5.Checkpoint; +import io.vertx.core.WorkerExecutor; import io.vertx.junit5.RunTestOnContext; import io.vertx.junit5.VertxExtension; import io.vertx.junit5.VertxTestContext; @@ -70,7 +70,7 @@ void test_run_task_after_delay(Vertx vertx, VertxTestContext ctx) { .setSchedule(onSchedule) .setCompleted(onComplete) .build(); - final IntervalTrigger trigger = IntervalTrigger.builder().initialDelay(2).interval(2).repeat(2).build(); + final IntervalTrigger trigger = IntervalTrigger.builder().initialDelay(5).interval(2).repeat(2).build(); IntervalScheduler.builder() .setVertx(vertx) .setMonitor(asserter) @@ -82,7 +82,6 @@ void test_run_task_after_delay(Vertx vertx, VertxTestContext ctx) { @Test void test_run_blocking_task_in_the_end(Vertx vertx, VertxTestContext testContext) { - final Checkpoint checkpoint = testContext.checkpoint(3); final AtomicLong lastTickOnEach = new AtomicLong(); final Consumer> onEach = result -> { lastTickOnEach.set(result.tick()); @@ -109,16 +108,14 @@ void test_run_blocking_task_in_the_end(Vertx vertx, VertxTestContext testContext .setCompleted(onComplete) .build(); final IntervalTrigger trigger = IntervalTrigger.builder().interval(1).repeat(3).build(); + final WorkerExecutor worker = vertx.createSharedWorkerExecutor("hello", 3, 500); IntervalScheduler.builder() .setVertx(vertx) .setMonitor(asserter) .setTrigger(trigger) - .setTask((jobData, ctx) -> { - TestUtils.block(Duration.ofSeconds(3), testContext); - checkpoint.flag(); - }) + .setTask((jobData, ctx) -> TestUtils.block(Duration.ofSeconds(3), testContext)) .build() - .start(); + .start(worker); } @Test @@ -127,7 +124,7 @@ void test_task_should_be_executed_in_interval_trigger(Vertx vertx, VertxTestCont if (result.round() < 3) { Assertions.assertTrue(result.isError()); Assertions.assertNotNull(result.error()); - Assertions.assertTrue(result.error() instanceof RuntimeException); + Assertions.assertInstanceOf(RuntimeException.class, result.error()); Assertions.assertNull(result.data()); } if (result.round() == 3) { diff --git a/core/src/test/java/io/github/zero88/schedulerx/trigger/predicate/EventTriggerPredicateTest.java b/core/src/test/java/io/github/zero88/schedulerx/trigger/predicate/EventTriggerPredicateTest.java index 481882c..b0e88c7 100644 --- a/core/src/test/java/io/github/zero88/schedulerx/trigger/predicate/EventTriggerPredicateTest.java +++ b/core/src/test/java/io/github/zero88/schedulerx/trigger/predicate/EventTriggerPredicateTest.java @@ -32,9 +32,7 @@ class EventTriggerPredicateTest { static ObjectMapper mapper; @BeforeAll - static void setup() { - mapper = DatabindCodec.mapper(); - } + static void setup() { mapper = DatabindCodec.mapper(); } static Stream validData() { // @formatter:off diff --git a/core/src/test/java/io/github/zero88/schedulerx/trigger/rule/TimeframeTest.java b/core/src/test/java/io/github/zero88/schedulerx/trigger/rule/TimeframeTest.java index 0888c4b..87f7eb8 100644 --- a/core/src/test/java/io/github/zero88/schedulerx/trigger/rule/TimeframeTest.java +++ b/core/src/test/java/io/github/zero88/schedulerx/trigger/rule/TimeframeTest.java @@ -18,23 +18,18 @@ import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; +import io.github.zero88.schedulerx.TestUtils; import io.github.zero88.schedulerx.trigger.rule.custom.SimpleDateTimeTimeframe; -import com.fasterxml.jackson.annotation.JsonInclude.Include; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.SerializationFeature; class TimeframeTest { static ObjectMapper mapper; @BeforeAll - static void setup() { - mapper = new ObjectMapper().findAndRegisterModules() - .configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false) - .setSerializationInclusion(Include.NON_NULL); - } + static void setup() { mapper = TestUtils.defaultMapper(); } private static Stream validValues() { // @formatter:off diff --git a/core/src/test/java/io/github/zero88/schedulerx/trigger/rule/TriggerRuleTest.java b/core/src/test/java/io/github/zero88/schedulerx/trigger/rule/TriggerRuleTest.java index 2808f1b..cdd4aab 100644 --- a/core/src/test/java/io/github/zero88/schedulerx/trigger/rule/TriggerRuleTest.java +++ b/core/src/test/java/io/github/zero88/schedulerx/trigger/rule/TriggerRuleTest.java @@ -21,6 +21,7 @@ import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; +import org.junitpioneer.jupiter.SetSystemProperty; import org.junitpioneer.jupiter.cartesian.ArgumentSets; import org.junitpioneer.jupiter.cartesian.CartesianTest; @@ -80,8 +81,8 @@ void test_until(Instant until, FiredAtArgument arg) { private static Stream leewayTestData() { return Stream.of(arguments(null, Duration.ZERO), arguments(0.0, Duration.ZERO), arguments(0, Duration.ZERO), - arguments("-PT1H", Duration.ZERO), arguments("PT10S", Duration.ofSeconds(10)), - arguments("PT1M", Duration.ofSeconds(30))); + arguments("-PT1H", Duration.ZERO), arguments("PT5S", Duration.ofSeconds(5)), + arguments("PT1M", Duration.ofSeconds(10))); } @ParameterizedTest @@ -118,14 +119,14 @@ private static ArgumentSets timeframesTestData() { FiredAtArgument.of(Instant.parse("2023-09-22T09:00:00Z"), true), FiredAtArgument.of(Instant.parse("2023-09-22T11:29:59Z"), true), FiredAtArgument.of(Instant.parse("2023-09-23T00:00:01Z"), true), - FiredAtArgument.of(Duration.ofSeconds(15), - Instant.parse("2023-09-22T03:30:14Z"), true), + FiredAtArgument.of(Duration.ofSeconds(7), + Instant.parse("2023-09-22T03:30:06Z"), true), FiredAtArgument.of(Instant.parse("2023-09-20T04:00:00Z"), false), FiredAtArgument.of(Instant.parse("2023-09-21T08:59:59Z"), false), FiredAtArgument.of(Instant.parse("2023-09-23T11:30:00Z"), false), FiredAtArgument.of(Instant.parse("2023-09-24T22:00:00Z"), false), - FiredAtArgument.of(Duration.ofSeconds(15), - Instant.parse("2023-09-25T03:30:16Z"), false)); + FiredAtArgument.of(Duration.ofSeconds(7), + Instant.parse("2023-09-25T03:30:08Z"), false)); } @CartesianTest