Skip to content

Commit

Permalink
Finishing off, tests
Browse files Browse the repository at this point in the history
  • Loading branch information
pkriens committed Apr 30, 2021
1 parent 4a44f65 commit ed0e1bb
Show file tree
Hide file tree
Showing 11 changed files with 307 additions and 154 deletions.
3 changes: 2 additions & 1 deletion biz.aQute.api/bnd.bnd
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
org.osgi.namespace.implementation,\
org.osgi.annotation.bundle,\
org.osgi.dto,\
org.apache.felix.http.servlet-api
org.apache.felix.http.servlet-api,\
org.osgi.service.component.annotations

-sub: *.bnd
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package biz.aQute.scheduler.api;

import org.osgi.service.component.annotations.ComponentPropertyType;

/**
* An annotation to simplify using a CronJob
*/
@ComponentPropertyType
public @interface CronExpression {

/**
* The 'cron.expression' service property
* @return
*/
String cron();

}

18 changes: 9 additions & 9 deletions biz.aQute.api/src/main/java/biz/aQute/scheduler/api/CronJob.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
* chronos.
* <p>
* The Unix Cron defines a syntax that is used by the Cron service. A user
* should register a Cron service with the {@link CronJob#CRON} property. The
* should register a Cron service with the {@value CronJob#CRON} property. The
* value is according to the {link http://en.wikipedia.org/wiki/Cron}.
* <p>
*
Expand Down Expand Up @@ -59,27 +59,27 @@
* Additionally, you can use some fixed formats:
*
* <pre>
* &#64;yearly (or @annually) Run once a year at midnight on the morning of January 1 0 0 1 1 *
* &#64;monthly Run once a month at midnight on the morning of the first day of the month 0 0 1 * *
* &#64;weekly Run once a week at midnight on Sunday morning 0 0 * * 0
* &#64;daily Run once a day at midnight 0 0 * * *
* &#64;hourly Run once an hour at the beginning of the hour 0 * * * *
*
* &#64;yearly (or @annually) Run once a year at midnight on the morning of January 0 0 0 1 JAN *
* &#64;monthly Run once a month at midnight on the morning of the first day of the month 0 0 0 1 * *
* &#64;weekly Run once a week at midnight on Sunday morning 0 0 0 * * 7
* &#64;daily Run once a day at midnight 0 0 0 * * *
* &#64;hourly Run once an hour at the beginning of the hour 0 * * * * *
* &#64;reboot Run at startup @reboot (at service registration time)
* </pre>
* <p>
* Please not that for the constants we follow the Java 8 Date & Time constants.
* Major difference is the day number. In Quartz this is 0-6 for SAT-SUN while
* here it is 1-7 for MON-SUN.
*
* @param <T> The parameter for the cron job
*/
public interface CronJob {
/**
* The service property that specifies the cron schedule. The type is
* String+.
*/
String CRON = "cron";
String NAME = "name";
String CRON = "cron";
String NAME = "name";

/**
* Run a cron job.
Expand Down
55 changes: 41 additions & 14 deletions biz.aQute.api/src/main/java/biz/aQute/scheduler/api/Scheduler.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package biz.aQute.scheduler.api;

import java.time.temporal.TemporalAdjuster;
import java.util.concurrent.Callable;
import java.util.concurrent.Executor;

Expand All @@ -16,30 +17,35 @@ interface RunnableWithException {
void run() throws Exception;
}


/**
* Schedule a runnable to run periodically at a fixed rate. The schedule can
* be canceled by the returned task.
*
* @param name The name of the task
* @param runnable the task to run
* @param name
* The name of the task
* @param runnable
* the task to run
*/
Task periodic(Runnable runnable, long ms, String name);

/**
* Schedule a runnable to run after a certain time. The schedule can be
* canceled by the returned task if it was not yet canceled.
*
* @param name The name of the task
* @param runnable the task to run
* @param name
* The name of the task
* @param runnable
* the task to run
*/
Task after(Runnable runnable, long ms, String name);

/**
* Executes a task in the background, intended for short term tasks
*
* @param name The name of the task
* @param runnable the task to run
* @param name
* The name of the task
* @param runnable
* the task to run
*/
Task execute(Runnable runnable, String name);

Expand All @@ -50,11 +56,13 @@ interface RunnableWithException {
<T> Promise<T> submit(Callable<T> callable, String name);

/**
* Execute long running task and optionally restart when
* exceptions are thrown.
* Execute long running task and optionally restart when exceptions are
* thrown.
*
* @param r The body
* @param manage restart when it fails
* @param r
* The body
* @param manage
* restart when it fails
*
*/
Task deamon(RunnableWithException r, boolean manage, String name);
Expand All @@ -66,12 +74,31 @@ interface RunnableWithException {
* be used to stop scheduling. This variation does not take an environment
* object.
*
* @param r The Runnable to run
* @param name The name
* @param cronExpression A Cron Expression
* @param r
* The Runnable to run
* @param name
* The name
* @param cronExpression
* A Cron Expression
* @return A closeable to terminate the schedule
* @throws Exception
*/
Task schedule(RunnableWithException r, String cronExpression, String name) throws Exception;

/**
* Return a {@link TemporalAdjuster} based on a Cron expression. You can use
* a Temporal Adjust to calculate the time beteween a date time and the next
* trigger point.
*
* <pre>
* TemporalAdjuster cron = this.getCronAdjuster("@hourly");
* ZonedDateTime now = ZonedDateTime.now();
* ZonedDateTime next = now.with(cron);
* </pre>
*
* @param cronExpression a Cron expression as specified in {@link CronJob}
* @return a Temporal Adjuster based on a cron expression
*/
TemporalAdjuster getCronAdjuster(String cronExpression);

}
8 changes: 6 additions & 2 deletions biz.aQute.scheduler.basic.provider/bnd.bnd
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,15 @@
biz.aQute.api.scheduler,\
org.osgi.util.promise,\
aQute.libg


-testpath: \
biz.aQute.wrapper.junit,\
biz.aQute.wrapper.hamcrest,\
org.assertj.core,\
org.awaitility
org.awaitility,\
biz.aQute.launchpad,\
osgi.core, \
slf4j.api, \
slf4j.simple

Original file line number Diff line number Diff line change
@@ -1,27 +1,45 @@
package biz.aQute.scheduler.basic.provider;

import java.io.IOException;
import java.time.Clock;
import java.time.Instant;
import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;

import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.Deactivate;
import org.osgi.service.component.annotations.Reference;
import org.osgi.service.component.annotations.ReferenceCardinality;
import org.osgi.service.component.annotations.ReferencePolicy;
import org.osgi.service.component.annotations.ServiceScope;
import org.osgi.util.promise.Promise;
import org.osgi.util.promise.PromiseFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Component(service = CentralScheduler.class, scope = ServiceScope.SINGLETON)
class CentralScheduler {
import aQute.lib.converter.Converter;
import biz.aQute.scheduler.api.CronJob;
import biz.aQute.scheduler.api.Task;
import biz.aQute.scheduler.basic.provider.SchedulerImpl.TaskImpl;

@Component(service = CentralScheduler.class, scope = ServiceScope.SINGLETON, immediate = true)
public class CentralScheduler {
final List<Cron> crons = new ArrayList<>();
final static Logger logger = LoggerFactory.getLogger(SchedulerImpl.class);
final ScheduledExecutorService scheduler ;
final ScheduledExecutorService scheduler;
final PromiseFactory factory;

Clock clock = Clock.systemDefaultZone();
long shutdownTimeout = 5000;
final SchedulerImpl frameworkTasks = new SchedulerImpl(this);

@Activate
public CentralScheduler() {
Expand All @@ -31,6 +49,7 @@ public CentralScheduler() {

@Deactivate
void deactivate() {
frameworkTasks.deactivate();
scheduler.shutdown();
try {
if (scheduler.awaitTermination(500, TimeUnit.MILLISECONDS))
Expand Down Expand Up @@ -68,4 +87,79 @@ public <T> Promise<T> submit(Callable<T> callable, String name) {
});
}

class Cron {

CronJob target;
Task schedule;

Cron(CronJob target, String cronExpression, String name) throws Exception {
this.target = target;
this.schedule = frameworkTasks.schedule(target::run, cronExpression, name);
}

void close() throws IOException {
schedule.cancel();
}
}

void schedule(TaskImpl task, CronAdjuster cron, long delay) {
synchronized (task) {
if (task.canceled) {
return;
}
ScheduledFuture<?> schedule = scheduler.schedule(() -> {
task.run();
schedule(task, cron, nextDelay(cron));
}, delay, TimeUnit.MILLISECONDS);
task.cancel = () -> schedule.cancel(true);
}
}

long nextDelay(CronAdjuster cron) {
ZonedDateTime now = ZonedDateTime.now(clock);
ZonedDateTime next = now.with(cron);
long delay = next.toInstant()
.toEpochMilli() - System.currentTimeMillis();
if (delay < 1)
delay = 1;
return delay;
}




@Reference(policy = ReferencePolicy.DYNAMIC, cardinality = ReferenceCardinality.MULTIPLE)
void addSchedule(CronJob s, Map<String, Object> map) throws Exception {
String name = Converter.cnv(String.class, map.get(CronJob.NAME));
String[] schedules = Converter.cnv(String[].class, map.get(CronJob.CRON));
if (schedules == null || schedules.length == 0)
return;

if (name == null) {
name = "unknown " + Instant.now();
}

synchronized (crons) {
for (String schedule : schedules) {
try {
Cron cron = new Cron(s, schedule, name);
crons.add(cron);
} catch (Exception e) {
logger.error("Invalid cron expression " + schedule + " from " + map, e);
}
}
}
}

void removeSchedule(CronJob s) {
synchronized (crons) {
for (Iterator<Cron> cron = crons.iterator(); cron.hasNext();) {
Cron c = cron.next();
if (c.target == s) {
cron.remove();
c.schedule.cancel();
}
}
}
}
}
Loading

0 comments on commit ed0e1bb

Please sign in to comment.