From 7834e8a181bea6947f48575464aa80408c3ce15c Mon Sep 17 00:00:00 2001 From: jmhofer Date: Tue, 9 Apr 2013 15:55:04 +0200 Subject: [PATCH] Trying to extend the Scheduler interface according to the comments at #19. --- rxjava-core/src/main/java/rx/Scheduler.java | 40 ++++++++++++++++++ .../rx/concurrency/AbstractScheduler.java | 42 +++++++++++++++++++ .../src/main/java/rx/operators/Tester.java | 21 ++++++++++ 3 files changed, 103 insertions(+) diff --git a/rxjava-core/src/main/java/rx/Scheduler.java b/rxjava-core/src/main/java/rx/Scheduler.java index 74fe274b3ac..e1e7c1e8060 100644 --- a/rxjava-core/src/main/java/rx/Scheduler.java +++ b/rxjava-core/src/main/java/rx/Scheduler.java @@ -19,12 +19,31 @@ import rx.util.functions.Action0; import rx.util.functions.Func0; +import rx.util.functions.Func1; +import rx.util.functions.Func2; /** * Represents an object that schedules units of work. */ public interface Scheduler { + /** + * Schedules a cancelable action to be executed. + * + * @param state State to pass into the action. + * @param action Action to schedule. + * @return a subscription to be able to unsubscribe from action. + */ + Subscription schedule(T state, Func2 action); + + /** + * Schedules a cancelable action to be executed. + * + * @param action Action to schedule. + * @return a subscription to be able to unsubscribe from action. + */ + Subscription schedule(Func1 action); + /** * Schedules a cancelable action to be executed. * @@ -43,6 +62,27 @@ public interface Scheduler { */ Subscription schedule(Action0 action); + /** + * Schedules a cancelable action to be executed in dueTime. + * + * @param state State to pass into the action. + * @param action Action to schedule. + * @param dueTime Time the action is due for executing. + * @param unit Time unit of the due time. + * @return a subscription to be able to unsubscribe from action. + */ + Subscription schedule(T state, Func2 action, long dueTime, TimeUnit unit); + + /** + * Schedules a cancelable action to be executed in dueTime. + * + * @param action Action to schedule. + * @param dueTime Time the action is due for executing. + * @param unit Time unit of the due time. + * @return a subscription to be able to unsubscribe from action. + */ + Subscription schedule(Func1 action, long dueTime, TimeUnit unit); + /** * Schedules an action to be executed in dueTime. * diff --git a/rxjava-core/src/main/java/rx/concurrency/AbstractScheduler.java b/rxjava-core/src/main/java/rx/concurrency/AbstractScheduler.java index e6fc87ebdb4..d15e8e184a6 100644 --- a/rxjava-core/src/main/java/rx/concurrency/AbstractScheduler.java +++ b/rxjava-core/src/main/java/rx/concurrency/AbstractScheduler.java @@ -22,6 +22,8 @@ import rx.subscriptions.Subscriptions; import rx.util.functions.Action0; import rx.util.functions.Func0; +import rx.util.functions.Func1; +import rx.util.functions.Func2; /* package */abstract class AbstractScheduler implements Scheduler { @@ -30,11 +32,51 @@ public Subscription schedule(Action0 action) { return schedule(asFunc0(action)); } + @Override + public Subscription schedule(final Func1 action) { + return schedule(new Func0() { + @Override + public Subscription call() { + return action.call(AbstractScheduler.this); + } + }); + } + + @Override + public Subscription schedule(final T state, final Func2 action) { + return schedule(new Func0() { + @Override + public Subscription call() { + return action.call(AbstractScheduler.this, state); + } + }); + } + @Override public Subscription schedule(Action0 action, long dueTime, TimeUnit unit) { return schedule(asFunc0(action), dueTime, unit); } + @Override + public Subscription schedule(final Func1 action, long dueTime, TimeUnit unit) { + return schedule(new Func0() { + @Override + public Subscription call() { + return action.call(AbstractScheduler.this); + } + }, dueTime, unit); + } + + @Override + public Subscription schedule(final T state, final Func2 action, long dueTime, TimeUnit unit) { + return schedule(new Func0() { + @Override + public Subscription call() { + return action.call(AbstractScheduler.this, state); + } + }, dueTime, unit); + } + @Override public long now() { return System.nanoTime(); diff --git a/rxjava-core/src/main/java/rx/operators/Tester.java b/rxjava-core/src/main/java/rx/operators/Tester.java index bc042428466..11b2c8a798f 100644 --- a/rxjava-core/src/main/java/rx/operators/Tester.java +++ b/rxjava-core/src/main/java/rx/operators/Tester.java @@ -19,6 +19,7 @@ import rx.util.functions.Action0; import rx.util.functions.Func0; import rx.util.functions.Func1; +import rx.util.functions.Func2; /** * Common utility functions for testing operator implementations. @@ -289,6 +290,16 @@ public Subscription schedule(Func0 action) { return underlying.schedule(action); } + @Override + public Subscription schedule(Func1 action) { + return underlying.schedule(action); + } + + @Override + public Subscription schedule(T state, Func2 action) { + return underlying.schedule(state, action); + } + @Override public Subscription schedule(Action0 action, long dueTime, TimeUnit unit) { return underlying.schedule(action, dueTime, unit); @@ -299,6 +310,16 @@ public Subscription schedule(Func0 action, long dueTime, TimeUnit return underlying.schedule(action, dueTime, unit); } + @Override + public Subscription schedule(Func1 action, long dueTime, TimeUnit unit) { + return underlying.schedule(action, dueTime, unit); + } + + @Override + public Subscription schedule(T state, Func2 action, long dueTime, TimeUnit unit) { + return underlying.schedule(state, action, dueTime, unit); + } + @Override public long now() { return underlying.now();