diff --git a/src/main/java/io/reactivex/plugins/RxJavaPlugins.java b/src/main/java/io/reactivex/plugins/RxJavaPlugins.java index e9114317d4..d3a0d7d5e6 100644 --- a/src/main/java/io/reactivex/plugins/RxJavaPlugins.java +++ b/src/main/java/io/reactivex/plugins/RxJavaPlugins.java @@ -13,7 +13,9 @@ package io.reactivex.plugins; import java.lang.Thread.UncaughtExceptionHandler; +import java.util.concurrent.Callable; +import io.reactivex.internal.functions.ObjectHelper; import org.reactivestreams.Subscriber; import io.reactivex.*; @@ -31,13 +33,13 @@ public final class RxJavaPlugins { static volatile Function onScheduleHandler; - static volatile Function onInitComputationHandler; + static volatile Function, Scheduler> onInitComputationHandler; - static volatile Function onInitSingleHandler; + static volatile Function, Scheduler> onInitSingleHandler; - static volatile Function onInitIoHandler; + static volatile Function, Scheduler> onInitIoHandler; - static volatile Function onInitNewThreadHandler; + static volatile Function, Scheduler> onInitNewThreadHandler; static volatile Function onComputationHandler; @@ -121,7 +123,7 @@ public static Consumer getErrorHandler() { * Returns the current hook function. * @return the hook function, may be null */ - public static Function getInitComputationSchedulerHandler() { + public static Function, Scheduler> getInitComputationSchedulerHandler() { return onInitComputationHandler; } @@ -129,7 +131,7 @@ public static Function getInitComputationSchedulerHandler( * Returns the current hook function. * @return the hook function, may be null */ - public static Function getInitIoSchedulerHandler() { + public static Function, Scheduler> getInitIoSchedulerHandler() { return onInitIoHandler; } @@ -137,7 +139,7 @@ public static Function getInitIoSchedulerHandler() { * Returns the current hook function. * @return the hook function, may be null */ - public static Function getInitNewThreadSchedulerHandler() { + public static Function, Scheduler> getInitNewThreadSchedulerHandler() { return onInitNewThreadHandler; } @@ -145,7 +147,7 @@ public static Function getInitNewThreadSchedulerHandler() * Returns the current hook function. * @return the hook function, may be null */ - public static Function getInitSingleSchedulerHandler() { + public static Function, Scheduler> getInitSingleSchedulerHandler() { return onInitSingleHandler; } @@ -183,54 +185,62 @@ public static Function getSingleSchedulerHandler() { /** * Calls the associated hook function. - * @param defaultScheduler the hook's input value - * @return the value returned by the hook + * @param defaultScheduler a {@link Callable} which returns the hook's input value + * @return the value returned by the hook, not null + * @throws NullPointerException if the callable parameter or its result are null */ - public static Scheduler initComputationScheduler(Scheduler defaultScheduler) { - Function f = onInitComputationHandler; + public static Scheduler initComputationScheduler(Callable defaultScheduler) { + ObjectHelper.requireNonNull(defaultScheduler, "Scheduler Callable can't be null"); + Function, Scheduler> f = onInitComputationHandler; if (f == null) { - return defaultScheduler; + return callRequireNonNull(defaultScheduler); } - return apply(f, defaultScheduler); // JIT will skip this + return applyRequireNonNull(f, defaultScheduler); // JIT will skip this } /** * Calls the associated hook function. - * @param defaultScheduler the hook's input value - * @return the value returned by the hook + * @param defaultScheduler a {@link Callable} which returns the hook's input value + * @return the value returned by the hook, not null + * @throws NullPointerException if the callable parameter or its result are null */ - public static Scheduler initIoScheduler(Scheduler defaultScheduler) { - Function f = onInitIoHandler; + public static Scheduler initIoScheduler(Callable defaultScheduler) { + ObjectHelper.requireNonNull(defaultScheduler, "Scheduler Callable can't be null"); + Function, Scheduler> f = onInitIoHandler; if (f == null) { - return defaultScheduler; + return callRequireNonNull(defaultScheduler); } - return apply(f, defaultScheduler); + return applyRequireNonNull(f, defaultScheduler); } /** * Calls the associated hook function. - * @param defaultScheduler the hook's input value - * @return the value returned by the hook + * @param defaultScheduler a {@link Callable} which returns the hook's input value + * @return the value returned by the hook, not null + * @throws NullPointerException if the callable parameter or its result are null */ - public static Scheduler initNewThreadScheduler(Scheduler defaultScheduler) { - Function f = onInitNewThreadHandler; + public static Scheduler initNewThreadScheduler(Callable defaultScheduler) { + ObjectHelper.requireNonNull(defaultScheduler, "Scheduler Callable can't be null"); + Function, Scheduler> f = onInitNewThreadHandler; if (f == null) { - return defaultScheduler; + return callRequireNonNull(defaultScheduler); } - return apply(f, defaultScheduler); + return applyRequireNonNull(f, defaultScheduler); } /** * Calls the associated hook function. - * @param defaultScheduler the hook's input value - * @return the value returned by the hook + * @param defaultScheduler a {@link Callable} which returns the hook's input value + * @return the value returned by the hook, not null + * @throws NullPointerException if the callable parameter or its result are null */ - public static Scheduler initSingleScheduler(Scheduler defaultScheduler) { - Function f = onInitSingleHandler; + public static Scheduler initSingleScheduler(Callable defaultScheduler) { + ObjectHelper.requireNonNull(defaultScheduler, "Scheduler Callable can't be null"); + Function, Scheduler> f = onInitSingleHandler; if (f == null) { - return defaultScheduler; + return callRequireNonNull(defaultScheduler); } - return apply(f, defaultScheduler); + return applyRequireNonNull(f, defaultScheduler); } /** @@ -392,9 +402,9 @@ public static void setErrorHandler(Consumer handler) { /** * Sets the specific hook function. - * @param handler the hook function to set, null allowed + * @param handler the hook function to set, null allowed, but the function may not return null */ - public static void setInitComputationSchedulerHandler(Function handler) { + public static void setInitComputationSchedulerHandler(Function, Scheduler> handler) { if (lockdown) { throw new IllegalStateException("Plugins can't be changed anymore"); } @@ -403,9 +413,9 @@ public static void setInitComputationSchedulerHandler(Function handler) { + public static void setInitIoSchedulerHandler(Function, Scheduler> handler) { if (lockdown) { throw new IllegalStateException("Plugins can't be changed anymore"); } @@ -414,9 +424,9 @@ public static void setInitIoSchedulerHandler(Function hand /** * Sets the specific hook function. - * @param handler the hook function to set, null allowed + * @param handler the hook function to set, null allowed, but the function may not return null */ - public static void setInitNewThreadSchedulerHandler(Function handler) { + public static void setInitNewThreadSchedulerHandler(Function, Scheduler> handler) { if (lockdown) { throw new IllegalStateException("Plugins can't be changed anymore"); } @@ -425,9 +435,9 @@ public static void setInitNewThreadSchedulerHandler(Function handler) { + public static void setInitSingleSchedulerHandler(Function, Scheduler> handler) { if (lockdown) { throw new IllegalStateException("Plugins can't be changed anymore"); } @@ -952,6 +962,33 @@ static R apply(BiFunction f, T t, U u) { } } + /** + * Wraps the call to the Scheduler creation callable in try-catch and propagates thrown + * checked exceptions as RuntimeException and enforces that result is not null. + * @param s the {@link Callable} which returns a {@link Scheduler}, not null (not verified). Cannot return null + * @return the result of the callable call, not null + * @throws NullPointerException if the callable parameter returns null + */ + static Scheduler callRequireNonNull(Callable s) { + try { + return ObjectHelper.requireNonNull(s.call(), "Scheduler Callable result can't be null"); + } catch (Throwable ex) { + throw ExceptionHelper.wrapOrThrow(ex); + } + } + + /** + * Wraps the call to the Scheduler creation function in try-catch and propagates thrown + * checked exceptions as RuntimeException and enforces that result is not null. + * @param f the function to call, not null (not verified). Cannot return null + * @param s the parameter value to the function + * @return the result of the function call, not null + * @throws NullPointerException if the function parameter returns null + */ + static Scheduler applyRequireNonNull(Function, Scheduler> f, Callable s) { + return ObjectHelper.requireNonNull(apply(f, s), "Scheduler Callable result can't be null"); + } + /** Helper class, no instances. */ private RxJavaPlugins() { throw new IllegalStateException("No instances!"); diff --git a/src/main/java/io/reactivex/schedulers/Schedulers.java b/src/main/java/io/reactivex/schedulers/Schedulers.java index a645ab5fe9..9f5fd75038 100644 --- a/src/main/java/io/reactivex/schedulers/Schedulers.java +++ b/src/main/java/io/reactivex/schedulers/Schedulers.java @@ -13,6 +13,7 @@ package io.reactivex.schedulers; +import java.util.concurrent.Callable; import java.util.concurrent.Executor; import io.reactivex.Scheduler; @@ -44,16 +45,52 @@ public final class Schedulers { static final Scheduler NEW_THREAD; - static { - SINGLE = RxJavaPlugins.initSingleScheduler(new SingleScheduler()); + static final class SingleHolder { + static final Scheduler DEFAULT = new SingleScheduler(); + } + + static final class ComputationHolder { + static final Scheduler DEFAULT = new ComputationScheduler(); + } - COMPUTATION = RxJavaPlugins.initComputationScheduler(new ComputationScheduler()); + static final class IoHolder { + static final Scheduler DEFAULT = new IoScheduler(); + } + + static final class NewThreadHolder { + static final Scheduler DEFAULT = NewThreadScheduler.instance(); + } - IO = RxJavaPlugins.initIoScheduler(new IoScheduler()); + static { + SINGLE = RxJavaPlugins.initSingleScheduler(new Callable() { + @Override + public Scheduler call() throws Exception { + return SingleHolder.DEFAULT; + } + }); + + COMPUTATION = RxJavaPlugins.initComputationScheduler(new Callable() { + @Override + public Scheduler call() throws Exception { + return ComputationHolder.DEFAULT; + } + }); + + IO = RxJavaPlugins.initIoScheduler(new Callable() { + @Override + public Scheduler call() throws Exception { + return IoHolder.DEFAULT; + } + }); TRAMPOLINE = TrampolineScheduler.instance(); - NEW_THREAD = RxJavaPlugins.initNewThreadScheduler(NewThreadScheduler.instance()); + NEW_THREAD = RxJavaPlugins.initNewThreadScheduler(new Callable() { + @Override + public Scheduler call() throws Exception { + return NewThreadHolder.DEFAULT; + } + }); } /** Utility class. */ diff --git a/src/test/java/io/reactivex/plugins/RxJavaPluginsTest.java b/src/test/java/io/reactivex/plugins/RxJavaPluginsTest.java index e14bf2c47f..89b7567375 100644 --- a/src/test/java/io/reactivex/plugins/RxJavaPluginsTest.java +++ b/src/test/java/io/reactivex/plugins/RxJavaPluginsTest.java @@ -296,7 +296,7 @@ public void overrideSingleScheduler() { RxJavaPlugins.reset(); } // make sure the reset worked - assertNotSame(ImmediateThinScheduler.INSTANCE, Schedulers.computation()); + assertNotSame(ImmediateThinScheduler.INSTANCE, Schedulers.single()); } @Test @@ -338,60 +338,238 @@ public void overrideNewThreadScheduler() { assertNotSame(ImmediateThinScheduler.INSTANCE, Schedulers.newThread()); } + Function, Scheduler> initReplaceWithImmediate = new Function, Scheduler>() { + @Override + public Scheduler apply(Callable t) { + return ImmediateThinScheduler.INSTANCE; + } + }; + @Test public void overrideInitSingleScheduler() { - Scheduler s = Schedulers.single(); // make sure the Schedulers is initialized + final Scheduler s = Schedulers.single(); // make sure the Schedulers is initialized + Callable c = new Callable() { + @Override + public Scheduler call() throws Exception { + return s; + } + }; try { - RxJavaPlugins.setInitSingleSchedulerHandler(replaceWithImmediate); + RxJavaPlugins.setInitSingleSchedulerHandler(initReplaceWithImmediate); - assertSame(ImmediateThinScheduler.INSTANCE, RxJavaPlugins.initSingleScheduler(s)); + assertSame(ImmediateThinScheduler.INSTANCE, RxJavaPlugins.initSingleScheduler(c)); } finally { RxJavaPlugins.reset(); } // make sure the reset worked - assertSame(s, RxJavaPlugins.initSingleScheduler(s)); + assertSame(s, RxJavaPlugins.initSingleScheduler(c)); } @Test public void overrideInitComputationScheduler() { - Scheduler s = Schedulers.computation(); // make sure the Schedulers is initialized + final Scheduler s = Schedulers.computation(); // make sure the Schedulers is initialized + Callable c = new Callable() { + @Override + public Scheduler call() throws Exception { + return s; + } + }; try { - RxJavaPlugins.setInitComputationSchedulerHandler(replaceWithImmediate); + RxJavaPlugins.setInitComputationSchedulerHandler(initReplaceWithImmediate); - assertSame(ImmediateThinScheduler.INSTANCE, RxJavaPlugins.initComputationScheduler(s)); + assertSame(ImmediateThinScheduler.INSTANCE, RxJavaPlugins.initComputationScheduler(c)); } finally { RxJavaPlugins.reset(); } // make sure the reset worked - assertSame(s, RxJavaPlugins.initComputationScheduler(s)); + assertSame(s, RxJavaPlugins.initComputationScheduler(c)); } @Test public void overrideInitIoScheduler() { - Scheduler s = Schedulers.io(); // make sure the Schedulers is initialized + final Scheduler s = Schedulers.io(); // make sure the Schedulers is initialized; + Callable c = new Callable() { + @Override + public Scheduler call() throws Exception { + return s; + } + }; try { - RxJavaPlugins.setInitIoSchedulerHandler(replaceWithImmediate); + RxJavaPlugins.setInitIoSchedulerHandler(initReplaceWithImmediate); - assertSame(ImmediateThinScheduler.INSTANCE, RxJavaPlugins.initIoScheduler(s)); + assertSame(ImmediateThinScheduler.INSTANCE, RxJavaPlugins.initIoScheduler(c)); } finally { RxJavaPlugins.reset(); } // make sure the reset worked - assertSame(s, RxJavaPlugins.initIoScheduler(s)); + assertSame(s, RxJavaPlugins.initIoScheduler(c)); } @Test public void overrideInitNewThreadScheduler() { - Scheduler s = Schedulers.newThread(); // make sure the Schedulers is initialized + final Scheduler s = Schedulers.newThread(); // make sure the Schedulers is initialized; + Callable c = new Callable() { + @Override + public Scheduler call() throws Exception { + return s; + } + }; try { - RxJavaPlugins.setInitNewThreadSchedulerHandler(replaceWithImmediate); + RxJavaPlugins.setInitNewThreadSchedulerHandler(initReplaceWithImmediate); - assertSame(ImmediateThinScheduler.INSTANCE, RxJavaPlugins.initNewThreadScheduler(s)); + assertSame(ImmediateThinScheduler.INSTANCE, RxJavaPlugins.initNewThreadScheduler(c)); } finally { RxJavaPlugins.reset(); } // make sure the reset worked - assertSame(s, RxJavaPlugins.initNewThreadScheduler(s)); + assertSame(s, RxJavaPlugins.initNewThreadScheduler(c)); + } + + Callable nullResultCallable = new Callable() { + @Override + public Scheduler call() throws Exception { + return null; + } + }; + + @Test + public void overrideInitSingleSchedulerCrashes() { + // fail when Callable is null + try { + RxJavaPlugins.initSingleScheduler(null); + fail("Should have thrown NullPointerException"); + } catch (NullPointerException npe) { + assertEquals("Scheduler Callable can't be null", npe.getMessage()); + } + + // fail when Callable result is null + try { + RxJavaPlugins.initSingleScheduler(nullResultCallable); + fail("Should have thrown NullPointerException"); + } catch (NullPointerException npe) { + assertEquals("Scheduler Callable result can't be null", npe.getMessage()); + } + } + + @Test + public void overrideInitComputationSchedulerCrashes() { + // fail when Callable is null + try { + RxJavaPlugins.initComputationScheduler(null); + fail("Should have thrown NullPointerException"); + } catch (NullPointerException npe) { + assertEquals("Scheduler Callable can't be null", npe.getMessage()); + } + + // fail when Callable result is null + try { + RxJavaPlugins.initComputationScheduler(nullResultCallable); + fail("Should have thrown NullPointerException"); + } catch (NullPointerException npe) { + assertEquals("Scheduler Callable result can't be null", npe.getMessage()); + } + } + + @Test + public void overrideInitIoSchedulerCrashes() { + // fail when Callable is null + try { + RxJavaPlugins.initIoScheduler(null); + fail("Should have thrown NullPointerException"); + } catch (NullPointerException npe) { + assertEquals("Scheduler Callable can't be null", npe.getMessage()); + } + + // fail when Callable result is null + try { + RxJavaPlugins.initIoScheduler(nullResultCallable); + fail("Should have thrown NullPointerException"); + } catch (NullPointerException npe) { + assertEquals("Scheduler Callable result can't be null", npe.getMessage()); + } + } + + @Test + public void overrideInitNewThreadSchedulerCrashes() { + // fail when Callable is null + try { + RxJavaPlugins.initNewThreadScheduler(null); + fail("Should have thrown NullPointerException"); + } catch (NullPointerException npe) { + // expected + assertEquals("Scheduler Callable can't be null", npe.getMessage()); + } + + // fail when Callable result is null + try { + RxJavaPlugins.initNewThreadScheduler(nullResultCallable); + fail("Should have thrown NullPointerException"); + } catch (NullPointerException npe) { + assertEquals("Scheduler Callable result can't be null", npe.getMessage()); + } + } + + Callable unsafeDefault = new Callable() { + @Override + public Scheduler call() throws Exception { + throw new AssertionError("Default Scheduler instance should not have been evaluated"); + } + }; + + @Test + public void testDefaultSingleSchedulerIsInitializedLazily() { + // unsafe default Scheduler Callable should not be evaluated + try { + RxJavaPlugins.setInitSingleSchedulerHandler(initReplaceWithImmediate); + RxJavaPlugins.initSingleScheduler(unsafeDefault); + } finally { + RxJavaPlugins.reset(); + } + + // make sure the reset worked + assertNotSame(ImmediateThinScheduler.INSTANCE, Schedulers.single()); + } + + @Test + public void testDefaultIoSchedulerIsInitializedLazily() { + // unsafe default Scheduler Callable should not be evaluated + try { + RxJavaPlugins.setInitIoSchedulerHandler(initReplaceWithImmediate); + RxJavaPlugins.initIoScheduler(unsafeDefault); + } finally { + RxJavaPlugins.reset(); + } + + // make sure the reset worked + assertNotSame(ImmediateThinScheduler.INSTANCE, Schedulers.io()); + } + + @Test + public void testDefaultComputationSchedulerIsInitializedLazily() { + // unsafe default Scheduler Callable should not be evaluated + try { + RxJavaPlugins.setInitComputationSchedulerHandler(initReplaceWithImmediate); + RxJavaPlugins.initComputationScheduler(unsafeDefault); + } finally { + RxJavaPlugins.reset(); + } + + // make sure the reset worked + assertNotSame(ImmediateThinScheduler.INSTANCE, Schedulers.computation()); + } + + @Test + public void testDefaultNewThreadSchedulerIsInitializedLazily() { + // unsafe default Scheduler Callable should not be evaluated + try { + RxJavaPlugins.setInitNewThreadSchedulerHandler(initReplaceWithImmediate); + RxJavaPlugins.initNewThreadScheduler(unsafeDefault); + } finally { + RxJavaPlugins.reset(); + } + + // make sure the reset worked + assertNotSame(ImmediateThinScheduler.INSTANCE, Schedulers.newThread()); } @SuppressWarnings("rawtypes") @@ -1154,8 +1332,13 @@ public void onComplete() { assertNull(RxJavaPlugins.onSingleScheduler(null)); - Scheduler s = ImmediateThinScheduler.INSTANCE; - + final Scheduler s = ImmediateThinScheduler.INSTANCE; + Callable c = new Callable() { + @Override + public Scheduler call() throws Exception { + return s; + } + }; assertSame(s, RxJavaPlugins.onComputationScheduler(s)); assertSame(s, RxJavaPlugins.onIoScheduler(s)); @@ -1164,23 +1347,13 @@ public void onComplete() { assertSame(s, RxJavaPlugins.onSingleScheduler(s)); + assertSame(s, RxJavaPlugins.initComputationScheduler(c)); - assertNull(RxJavaPlugins.initComputationScheduler(null)); - - assertNull(RxJavaPlugins.initIoScheduler(null)); - - assertNull(RxJavaPlugins.initNewThreadScheduler(null)); - - assertNull(RxJavaPlugins.initSingleScheduler(null)); - - assertSame(s, RxJavaPlugins.initComputationScheduler(s)); - - assertSame(s, RxJavaPlugins.initIoScheduler(s)); - - assertSame(s, RxJavaPlugins.initNewThreadScheduler(s)); + assertSame(s, RxJavaPlugins.initIoScheduler(c)); - assertSame(s, RxJavaPlugins.initSingleScheduler(s)); + assertSame(s, RxJavaPlugins.initNewThreadScheduler(c)); + assertSame(s, RxJavaPlugins.initSingleScheduler(c)); } finally { RxJavaPlugins.reset();