Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

2.x: Evaluate Schedule initialization via Callable #4585

Merged
merged 13 commits into from
Sep 26, 2016
69 changes: 57 additions & 12 deletions src/main/java/io/reactivex/plugins/RxJavaPlugins.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
package io.reactivex.plugins;

import java.lang.Thread.UncaughtExceptionHandler;
import java.util.concurrent.Callable;

import org.reactivestreams.Subscriber;

Expand Down Expand Up @@ -183,52 +184,52 @@ public static Function<Scheduler, Scheduler> getSingleSchedulerHandler() {

/**
* Calls the associated hook function.
* @param defaultScheduler the hook's input value
* @param defaultScheduler a {@link Callable} which returns the hook's input value
* @return the value returned by the hook
*/
public static Scheduler initComputationScheduler(Scheduler defaultScheduler) {
public static Scheduler initComputationScheduler(Callable<Scheduler> defaultScheduler) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should not it throw on a null callable? What's the point of calling with null?

Copy link
Contributor Author

@peter-tackage peter-tackage Sep 22, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was done for consistency with the existing expectations in RxJavaPluginsTest.clearIsPassthrough(), specifically:

assertNull(RxJavaPlugins.initComputationScheduler(null));
assertNull(RxJavaPlugins.initIoScheduler(null));
assertNull(RxJavaPlugins.initNewThreadScheduler(null));
assertNull(RxJavaPlugins.initSingleScheduler(null));

Should this be changed to only return null if the Callable returns null (and throw if the Callable itself is null)?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Null should not be allowed as a return value from the Callable nor from the init Function.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would be a change, because previously the Scheduler value for RxJavaPlugins.init* was allowed to be null, as per - assertNull(RxJavaPlugins.initSingleScheduler(null));.

I will add an additional set of tests for the new behavior (something along the lines of assemblyHookCrashes).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

Function<Scheduler, Scheduler> f = onInitComputationHandler;
if (f == null) {
return defaultScheduler;
return callOrNull(defaultScheduler);
}
return apply(f, defaultScheduler); // JIT will skip this
}

/**
* Calls the associated hook function.
* @param defaultScheduler the hook's input value
* @param defaultScheduler a {@link Callable} which returns the hook's input value
* @return the value returned by the hook
*/
public static Scheduler initIoScheduler(Scheduler defaultScheduler) {
public static Scheduler initIoScheduler(Callable<Scheduler> defaultScheduler) {
Function<Scheduler, Scheduler> f = onInitIoHandler;
if (f == null) {
return defaultScheduler;
return callOrNull(defaultScheduler);
}
return apply(f, defaultScheduler);
}

/**
* Calls the associated hook function.
* @param defaultScheduler the hook's input value
* @param defaultScheduler a {@link Callable} which returns the hook's input value
* @return the value returned by the hook
*/
public static Scheduler initNewThreadScheduler(Scheduler defaultScheduler) {
public static Scheduler initNewThreadScheduler(Callable<Scheduler> defaultScheduler) {
Function<Scheduler, Scheduler> f = onInitNewThreadHandler;
if (f == null) {
return defaultScheduler;
return callOrNull(defaultScheduler);
}
return apply(f, defaultScheduler);
}

/**
* Calls the associated hook function.
* @param defaultScheduler the hook's input value
* @param defaultScheduler a {@link Callable} which returns the hook's input value
* @return the value returned by the hook
*/
public static Scheduler initSingleScheduler(Scheduler defaultScheduler) {
public static Scheduler initSingleScheduler(Callable<Scheduler> defaultScheduler) {
Function<Scheduler, Scheduler> f = onInitSingleHandler;
if (f == null) {
return defaultScheduler;
return callOrNull(defaultScheduler);
}
return apply(f, defaultScheduler);
}
Expand Down Expand Up @@ -934,6 +935,23 @@ static <T, R> R apply(Function<T, R> f, T t) {
}
}

/**
* Wraps the call to the function in try-catch and propagates thrown
* checked exceptions as RuntimeException.
* @param <T> the input type
* @param <R> the output type
* @param f the function to call, not null (not verified)
* @param t the {@link Callable} parameter value to the function
* @return the result of the function call
*/
static <T, R> R apply(Function<T, R> f, Callable<T> t) {
try {
return f.apply(t.call());
} catch (Throwable ex) {
throw ExceptionHelper.wrapOrThrow(ex);
}
}

/**
* Wraps the call to the function in try-catch and propagates thrown
* checked exceptions as RuntimeException.
Expand All @@ -953,6 +971,33 @@ static <T, U, R> R apply(BiFunction<T, U, R> f, T t, U u) {
}
}

/**
* Wraps the call to the callable in try-catch and propagates thrown
* checked exceptions as RuntimeException.
* @param <T> the input type
* @param <T> the output type
* @param t the callable, not null (not verified)
* @return the result of the callable call
*/
static <T> T call(Callable<T> t) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

inline this method?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure what you mean. I've tried to be consistent with the abstraction of the similar apply method. Do you think that should change?

try {
return t.call();
} catch (Throwable ex) {
throw ExceptionHelper.wrapOrThrow(ex);
}
}

/**
* Wraps the call to the callable in try-catch and propagates thrown
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

copy paste error

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

* checked exceptions as RuntimeException.
* @param <T> the input and output type
* @param t the callable, nullable
* @return the callable result if the callable is nonnull, null otherwise.
*/
static <T> T callOrNull(Callable<T> t) {
return t == null ? null : call(t);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

null should never be allowed

}

/** Helper class, no instances. */
private RxJavaPlugins() {
throw new IllegalStateException("No instances!");
Expand Down
33 changes: 27 additions & 6 deletions src/main/java/io/reactivex/schedulers/Schedulers.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

package io.reactivex.schedulers;

import java.util.concurrent.Callable;
import java.util.concurrent.Executor;

import io.reactivex.Scheduler;
Expand Down Expand Up @@ -45,15 +46,35 @@ public final class Schedulers {
static final Scheduler NEW_THREAD;

static {
SINGLE = RxJavaPlugins.initSingleScheduler(new SingleScheduler());

COMPUTATION = RxJavaPlugins.initComputationScheduler(new ComputationScheduler());

IO = RxJavaPlugins.initIoScheduler(new IoScheduler());
SINGLE = RxJavaPlugins.initSingleScheduler(new Callable<Scheduler>() {
@Override
public Scheduler call() throws Exception {
return new SingleScheduler();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just in case call() is called multiple times by the hook, these could actually return an field of an inner class (singleton holder pattern or what's the name):

static final class SingleHolder {
    static final Scheduler DEFAULT = new SingleScheduler();
}

SINGLE = RxJavaPlugins.initiSingleScheduler(new Callable<Scheduler>() {
    @Override
    public Scheduler call() {
        return SingleHolder.DEFAULT;
    }
});

}
});

COMPUTATION = RxJavaPlugins.initComputationScheduler(new Callable<Scheduler>() {
@Override
public Scheduler call() throws Exception {
return new ComputationScheduler();
}
});

IO = RxJavaPlugins.initIoScheduler(new Callable<Scheduler>() {
@Override
public Scheduler call() throws Exception {
return new IoScheduler();
}
});

TRAMPOLINE = TrampolineScheduler.instance();

NEW_THREAD = RxJavaPlugins.initNewThreadScheduler(NewThreadScheduler.instance());
NEW_THREAD = RxJavaPlugins.initNewThreadScheduler(new Callable<Scheduler>() {
@Override
public Scheduler call() throws Exception {
return NewThreadScheduler.instance();
}
});
}

/** Utility class. */
Expand Down
65 changes: 47 additions & 18 deletions src/test/java/io/reactivex/plugins/RxJavaPluginsTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -339,58 +339,82 @@ public void overrideNewThreadScheduler() {

@Test
public void overrideInitSingleScheduler() {
Scheduler s = Schedulers.single(); // make sure the Schedulers is initialized
final Scheduler s = Schedulers.single(); // make sure the Schedulers is initialized
Callable<Scheduler> c = new Callable<Scheduler>() {
@Override
public Scheduler call() throws Exception {
return s;
}
};
try {
RxJavaPlugins.setInitSingleSchedulerHandler(replaceWithImmediate);

assertSame(ImmediateThinScheduler.INSTANCE, RxJavaPlugins.initSingleScheduler(s));
assertSame(ImmediateThinScheduler.INSTANCE, RxJavaPlugins.initSingleScheduler(c));
} finally {
RxJavaPlugins.reset();
}
// make sure the reset worked
assertSame(s, RxJavaPlugins.initSingleScheduler(s));
assertSame(s, RxJavaPlugins.initSingleScheduler(c));
}

@Test
public void overrideInitComputationScheduler() {
Scheduler s = Schedulers.computation(); // make sure the Schedulers is initialized
final Scheduler s = Schedulers.computation(); // make sure the Schedulers is initialized
Callable<Scheduler> c = new Callable<Scheduler>() {
@Override
public Scheduler call() throws Exception {
return s;
}
};
try {
RxJavaPlugins.setInitComputationSchedulerHandler(replaceWithImmediate);

assertSame(ImmediateThinScheduler.INSTANCE, RxJavaPlugins.initComputationScheduler(s));
assertSame(ImmediateThinScheduler.INSTANCE, RxJavaPlugins.initComputationScheduler(c));
} finally {
RxJavaPlugins.reset();
}
// make sure the reset worked
assertSame(s, RxJavaPlugins.initComputationScheduler(s));
assertSame(s, RxJavaPlugins.initComputationScheduler(c));
}

@Test
public void overrideInitIoScheduler() {
Scheduler s = Schedulers.io(); // make sure the Schedulers is initialized
final Scheduler s = Schedulers.io(); // make sure the Schedulers is initialized;
Callable<Scheduler> c = new Callable<Scheduler>() {
@Override
public Scheduler call() throws Exception {
return s;
}
};
try {
RxJavaPlugins.setInitIoSchedulerHandler(replaceWithImmediate);

assertSame(ImmediateThinScheduler.INSTANCE, RxJavaPlugins.initIoScheduler(s));
assertSame(ImmediateThinScheduler.INSTANCE, RxJavaPlugins.initIoScheduler(c));
} finally {
RxJavaPlugins.reset();
}
// make sure the reset worked
assertSame(s, RxJavaPlugins.initIoScheduler(s));
assertSame(s, RxJavaPlugins.initIoScheduler(c));
}

@Test
public void overrideInitNewThreadScheduler() {
Scheduler s = Schedulers.newThread(); // make sure the Schedulers is initialized
final Scheduler s = Schedulers.newThread(); // make sure the Schedulers is initialized;
Callable<Scheduler> c = new Callable<Scheduler>() {
@Override
public Scheduler call() throws Exception {
return s;
}
};
try {
RxJavaPlugins.setInitNewThreadSchedulerHandler(replaceWithImmediate);

assertSame(ImmediateThinScheduler.INSTANCE, RxJavaPlugins.initNewThreadScheduler(s));
assertSame(ImmediateThinScheduler.INSTANCE, RxJavaPlugins.initNewThreadScheduler(c));
} finally {
RxJavaPlugins.reset();
}
// make sure the reset worked
assertSame(s, RxJavaPlugins.initNewThreadScheduler(s));
assertSame(s, RxJavaPlugins.initNewThreadScheduler(c));
}

@SuppressWarnings("rawtypes")
Expand Down Expand Up @@ -1153,8 +1177,13 @@ public void onComplete() {

assertNull(RxJavaPlugins.onSingleScheduler(null));

Scheduler s = ImmediateThinScheduler.INSTANCE;

final Scheduler s = ImmediateThinScheduler.INSTANCE;
Callable<Scheduler> c = new Callable<Scheduler>() {
@Override
public Scheduler call() throws Exception {
return s;
}
};
assertSame(s, RxJavaPlugins.onComputationScheduler(s));

assertSame(s, RxJavaPlugins.onIoScheduler(s));
Expand All @@ -1172,13 +1201,13 @@ public void onComplete() {

assertNull(RxJavaPlugins.initSingleScheduler(null));

assertSame(s, RxJavaPlugins.initComputationScheduler(s));
assertSame(s, RxJavaPlugins.initComputationScheduler(c));

assertSame(s, RxJavaPlugins.initIoScheduler(s));
assertSame(s, RxJavaPlugins.initIoScheduler(c));

assertSame(s, RxJavaPlugins.initNewThreadScheduler(s));
assertSame(s, RxJavaPlugins.initNewThreadScheduler(c));

assertSame(s, RxJavaPlugins.initSingleScheduler(s));
assertSame(s, RxJavaPlugins.initSingleScheduler(c));


} finally {
Expand Down