Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/add timeout #89

Merged
merged 4 commits into from
Dec 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
131 changes: 131 additions & 0 deletions core/src/main/java/io/github/zero88/schedulerx/DefaultOptions.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
package io.github.zero88.schedulerx;

import java.time.Duration;
import java.time.format.DateTimeParseException;

import io.github.zero88.schedulerx.impl.Utils;
import io.github.zero88.schedulerx.trigger.Trigger;
import io.vertx.core.VertxOptions;

/**
* Instances of this class are used to configure the default options for {@link Trigger}, and {@link Scheduler}
* instances.
*
* @since 2.0.0
*/
public final class DefaultOptions {

public static final String DEFAULT_MAX_EXECUTION_TIMEOUT = "schedulerx.default_max_execution_timeout";
public static final String DEFAULT_MAX_EVALUATION_TIMEOUT = "schedulerx.default_max_evaluation_timeout";
public static final String DEFAULT_MAX_TRIGGER_RULE_LEEWAY = "schedulerx.default_max_trigger_rule_leeway";
public static final String DEFAULT_MAX_TRIGGER_PREVIEW_COUNT = "schedulerx.default_max_trigger_preview_count";
public static final String DEFAULT_WORKER_THREAD_PREFIX = "schedulerx.default_worker_thread_prefix";
public static final String DEFAULT_WORKER_THREAD_POOL_SIZE = "schedulerx.default_worker_thread_pool_size";


private static class Holder {

private static final DefaultOptions INSTANCE = new DefaultOptions();

}

public static DefaultOptions getInstance() {
return DefaultOptions.Holder.INSTANCE;
}

/**
* Declares the default max execution timeout. Defaults is {@link VertxOptions#DEFAULT_MAX_WORKER_EXECUTE_TIME}
*
* @apiNote It can be overridden by system property with key {@link #DEFAULT_MAX_EXECUTION_TIMEOUT}
*/
public final Duration maxExecutionTimeout;
/**
* Declares the default max trigger evaluation timeout. Defaults is
* {@link VertxOptions#DEFAULT_MAX_EVENT_LOOP_EXECUTE_TIME}
*
* @apiNote It can be overridden by system property with key {@link #DEFAULT_MAX_EVALUATION_TIMEOUT}
*/
public final Duration maxEvaluationTimeout;
/**
* Declares the default max trigger rule leeway time. Defaults is {@code 10 seconds}.
*
* @apiNote It can be overridden by system property with key {@link #DEFAULT_MAX_TRIGGER_RULE_LEEWAY}
*/
public final Duration maxTriggerRuleLeeway;
/**
* Declares the default max number of the trigger preview items. Defaults is {@code 30}.
*
* @apiNote It can be overridden by system property with key {@link #DEFAULT_MAX_TRIGGER_PREVIEW_COUNT}
*/
public final int maxTriggerPreviewCount;

/**
* Declares the default worker thread name prefix. Defaults is {@code scheduler.x-worker-thread}.
*
* @apiNote It can be overridden by system property with key {@link #DEFAULT_WORKER_THREAD_PREFIX}
*/
public final String workerThreadPrefix;

/**
* Declares the default worker thread pool size. Defaults is {@code 3}.
*
* @apiNote It can be overridden by system property with key {@link #DEFAULT_WORKER_THREAD_POOL_SIZE}
*/
public final int workerThreadPoolSize;

DefaultOptions() {
this.maxExecutionTimeout = loadMaxExecutionTimeout();
this.maxEvaluationTimeout = loadMaxEvaluationTimeout();
this.maxTriggerRuleLeeway = loadTriggerRuleLeeway();
this.maxTriggerPreviewCount = loadTriggerPreviewCount();
this.workerThreadPrefix = loadWorkerThreadPrefix();
this.workerThreadPoolSize = loadWorkerThreadPoolSize();
}

private static Duration loadMaxExecutionTimeout() {
try {
return Duration.parse(System.getProperty(DEFAULT_MAX_EXECUTION_TIMEOUT));
} catch (DateTimeParseException | NullPointerException ex) {
return Duration.of(VertxOptions.DEFAULT_MAX_WORKER_EXECUTE_TIME,
Utils.toChronoUnit(VertxOptions.DEFAULT_MAX_WORKER_EXECUTE_TIME_UNIT));
}
}

private static Duration loadMaxEvaluationTimeout() {
try {
return Duration.parse(System.getProperty(DEFAULT_MAX_EVALUATION_TIMEOUT));
} catch (DateTimeParseException | NullPointerException ex) {
return Duration.of(VertxOptions.DEFAULT_MAX_EVENT_LOOP_EXECUTE_TIME,
Utils.toChronoUnit(VertxOptions.DEFAULT_MAX_EVENT_LOOP_EXECUTE_TIME_UNIT));
}
}

private static Duration loadTriggerRuleLeeway() {
try {
return Duration.parse(System.getProperty(DEFAULT_MAX_TRIGGER_RULE_LEEWAY));
} catch (DateTimeParseException | NullPointerException ex) {
return Duration.ofSeconds(10);
}
}

private static int loadTriggerPreviewCount() {
try {
return Integer.parseInt(System.getProperty(DEFAULT_MAX_TRIGGER_PREVIEW_COUNT));
} catch (NumberFormatException | NullPointerException ex) {
return 30;
}
}

private static String loadWorkerThreadPrefix() {
return System.getProperty(DEFAULT_WORKER_THREAD_PREFIX, "scheduler.x-worker-thread");
}

private static int loadWorkerThreadPoolSize() {
try {
return Integer.parseInt(System.getProperty(DEFAULT_WORKER_THREAD_POOL_SIZE));
} catch (NumberFormatException | NullPointerException ex) {
return 3;
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import java.time.Instant;
import java.util.Objects;
import java.util.concurrent.TimeoutException;

import org.jetbrains.annotations.Nullable;

Expand Down Expand Up @@ -140,6 +141,13 @@ public interface ExecutionResult<OUTPUT> {
*/
default boolean isError() { return Objects.nonNull(error()); }

/**
* Identify task execution is timed out
*
* @return {@code true} if timeout error
*/
default boolean isTimeout() { return error() instanceof TimeoutException; }

/**
* Check whether the trigger is re-registered in the system timer or not after the trigger is available, only in
* case of trigger type is {@code cron}.
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/java/io/github/zero88/schedulerx/JobData.java
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public interface JobData<T> {
* @since 2.0.0
*/
static <D> JobData<D> empty(@NotNull Object externalId) {
return new JobData<D>() {
return new JobData<>() {
public @Nullable D get() { return null; }

@Override
Expand All @@ -80,7 +80,7 @@ static <D> JobData<D> create(@NotNull D data) {
* @since 2.0.0
*/
static <D> JobData<D> create(@NotNull D data, @NotNull Object externalId) {
return new JobData<D>() {
return new JobData<>() {
public D get() { return data; }

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ public interface SchedulerBuilder<IN, OUT, TRIGGER extends Trigger, SCHEDULER ex

@NotNull SELF setJobData(@NotNull JobData<IN> jobData);

@NotNull SELF setTimeoutPolicy(@NotNull TimeoutPolicy timeoutPolicy);

@NotNull SELF setMonitor(@NotNull SchedulingMonitor<OUT> monitor);

@NotNull SCHEDULER build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,21 @@ public interface TaskExecutorProperties<IN, OUT> {
@NotNull Task<IN, OUT> task();

/**
* Defines job data as input task data
* Declares the job input data
*
* @return job data
* @see JobData
*/
@GenIgnore(GenIgnore.PERMITTED_TYPE)
@NotNull JobData<IN> jobData();

/**
* Declares the timeout policy
*
* @return timeout policy
* @see TimeoutPolicy
*/
@GenIgnore(GenIgnore.PERMITTED_TYPE)
@NotNull TimeoutPolicy timeoutPolicy();

}
47 changes: 47 additions & 0 deletions core/src/main/java/io/github/zero88/schedulerx/TimeoutBlock.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package io.github.zero88.schedulerx;

import java.time.Duration;
import java.util.concurrent.TimeoutException;

import org.jetbrains.annotations.NotNull;

import io.github.zero88.schedulerx.impl.Utils.HumanReadableTimeFormat;
import io.vertx.core.Future;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;

public final class TimeoutBlock {

private final Vertx vertx;
private final Duration timeout;

public TimeoutBlock(Vertx vertx, Duration timeout) {
this.vertx = vertx;
this.timeout = timeout;
}

public <T> Promise<T> wrap(@NotNull Promise<T> promise) {
if (timeout.isNegative() || timeout.isZero()) {
return promise;
}
Future<Void> controller = Future.future(p -> vertx.setTimer(timeout.toMillis(), ignore -> p.complete()));
Future.any(promise.future(), controller).onSuccess(event -> {
if (!event.isComplete(0)) {
promise.fail(new NoStackTraceTimeoutException(timeout));
}
});
return promise;
}

static class NoStackTraceTimeoutException extends TimeoutException {

NoStackTraceTimeoutException(@NotNull Duration timeout) {
super("Timeout after " + HumanReadableTimeFormat.format(timeout));
}

@Override
public synchronized Throwable fillInStackTrace() { return this; }

}

}
62 changes: 62 additions & 0 deletions core/src/main/java/io/github/zero88/schedulerx/TimeoutPolicy.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package io.github.zero88.schedulerx;

import java.time.Duration;
import java.util.function.BinaryOperator;

import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonGetter;
import com.fasterxml.jackson.annotation.JsonProperty;

public final class TimeoutPolicy {

private final Duration evaluationTimeout;
private final Duration executionTimeout;

private TimeoutPolicy(Duration evaluationTimeout, Duration executionTimeout) {
this.evaluationTimeout = evaluationTimeout;
this.executionTimeout = executionTimeout;
}

public static TimeoutPolicy byDefault() {
return create(null, null);
}

public static TimeoutPolicy create(@NotNull Duration executionTimeout) {
return create(null, executionTimeout);
}

@JsonCreator
public static TimeoutPolicy create(@JsonProperty("evaluationTimeout") @Nullable Duration evaluationTimeout,
@JsonProperty("executionTimeout") @Nullable Duration executionTimeout) {
final BinaryOperator<Duration> check = (duration, defaultMax) -> {
if (duration == null || duration.compareTo(Duration.ZERO) <= 0 || duration.compareTo(defaultMax) > 0) {
return defaultMax;
}
return duration;
};
return new TimeoutPolicy(check.apply(evaluationTimeout, DefaultOptions.getInstance().maxEvaluationTimeout),
check.apply(executionTimeout, DefaultOptions.getInstance().maxExecutionTimeout));
}

/**
* Declares the evaluation timeout. Default is {@link DefaultOptions#maxEvaluationTimeout}
*
* @return the evaluation timeout
* @since 2.0.0
*/
@JsonGetter
public @NotNull Duration evaluationTimeout() { return evaluationTimeout; }

/**
* Declares the execution timeout. Default is {@link DefaultOptions#maxExecutionTimeout}
*
* @return the execution timeout
* @since 2.0.0
*/
@JsonGetter
public @NotNull Duration executionTimeout() { return executionTimeout; }

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package io.github.zero88.schedulerx;

import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

import io.github.zero88.schedulerx.impl.Utils;
import io.vertx.core.Vertx;
import io.vertx.core.VertxOptions;
import io.vertx.core.WorkerExecutor;

/**
* A factory to create {@link WorkerExecutor} based on the execution timeout policy.
*
* @since 2.0.0
*/
public interface WorkerExecutorFactory {

static @Nullable WorkerExecutor create(@NotNull Vertx vertx, @NotNull TimeoutPolicy timeoutPolicy) {
return WorkerExecutorFactory.create(vertx, timeoutPolicy,
DefaultOptions.getInstance().workerThreadPrefix + "-" +
Utils.randomPositiveInt());
}

static @Nullable WorkerExecutor create(@NotNull Vertx vertx, @NotNull TimeoutPolicy timeoutPolicy,
@Nullable String workerThreadName) {
final long executionTimeout = timeoutPolicy.executionTimeout().toMillis();
if (executionTimeout != VertxOptions.DEFAULT_MAX_WORKER_EXECUTE_TIME) {
return vertx.createSharedWorkerExecutor(workerThreadName, DefaultOptions.getInstance().workerThreadPoolSize,
executionTimeout);
}
return null;
}

}
Loading
Loading