Skip to content

Commit

Permalink
Add Reactor-to-RxJava2 Schedulers adapters (#202)
Browse files Browse the repository at this point in the history
This allows the user to replace the standard RxJava 2 schedulers with
its Reactor-Core counterparts (and restore the defaults as well) so
that redundant worker threads could be avoided.
  • Loading branch information
akarnokd authored and simonbasle committed Apr 12, 2019
1 parent 21d31f6 commit 98f3a52
Show file tree
Hide file tree
Showing 3 changed files with 278 additions and 1 deletion.
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ ext {

// Libraries
openHftChronicleVersion = '3.4.2'
rxJava2Version = '2.1.0'
rxJava2Version = '2.2.8'
akkaActorVersion = '2.4.10'

swtVersion = '4.5.2'
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
/*
* Copyright (c) 2019-Present Pivotal Software Inc, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package reactor.adapter.rxjava;

import java.util.Objects;
import java.util.concurrent.TimeUnit;

/**
* Replace the standard schedulers of RxJava 2 or Reactor-Core with
* the schedulers of the other library.
* <p>
* This can prevent having two sets of worker pools when
* both libraries are in use.
*/
public final class RxJava2Schedulers {

/** Utility class. */
private RxJava2Schedulers() {
throw new IllegalStateException("No instances!");
}

/**
* The RxJava 2 standard Schedulers will be replaced
* by their Reactor-Core counterparts; both libraries
* will run on the Reactor-Core schedulers.
* <p>
* Call this before interacting with RxJava 2.
*/
public static void useReactorCoreSchedulers() {
// shut down the standard schedulers
if (io.reactivex.plugins.RxJavaPlugins.getComputationSchedulerHandler() == null) {
io.reactivex.schedulers.Schedulers.computation().shutdown();
}
if (io.reactivex.plugins.RxJavaPlugins.getIoSchedulerHandler() == null) {
io.reactivex.schedulers.Schedulers.io().shutdown();
}
if (io.reactivex.plugins.RxJavaPlugins.getSingleSchedulerHandler() == null) {
io.reactivex.schedulers.Schedulers.single().shutdown();
}
if (io.reactivex.plugins.RxJavaPlugins.getNewThreadSchedulerHandler() == null) {
io.reactivex.schedulers.Schedulers.newThread().shutdown();
}

io.reactivex.Scheduler computation = ReactorCoreSchedulerWrapper.from(reactor.core.scheduler.Schedulers.parallel());
io.reactivex.plugins.RxJavaPlugins.setComputationSchedulerHandler(s -> computation);

io.reactivex.Scheduler elastic = ReactorCoreSchedulerWrapper.from(reactor.core.scheduler.Schedulers.elastic());
io.reactivex.plugins.RxJavaPlugins.setIoSchedulerHandler(s -> elastic);

io.reactivex.Scheduler single = ReactorCoreSchedulerWrapper.from(reactor.core.scheduler.Schedulers.single());
io.reactivex.plugins.RxJavaPlugins.setSingleSchedulerHandler(s -> single);

// There is no scheduler equivalent to RxJava's newThread so let's use elastic
io.reactivex.plugins.RxJavaPlugins.setNewThreadSchedulerHandler(s -> elastic);
}

/**
* Restores the RxJava 2 standard Schedulers to their
* default values.
*/
public static void resetReactorCoreSchedulers() {
io.reactivex.plugins.RxJavaPlugins.setComputationSchedulerHandler(null);

io.reactivex.plugins.RxJavaPlugins.setIoSchedulerHandler(null);

io.reactivex.plugins.RxJavaPlugins.setSingleSchedulerHandler(null);

io.reactivex.plugins.RxJavaPlugins.setNewThreadSchedulerHandler(null);

// restart the standard schedulers
io.reactivex.schedulers.Schedulers.computation().start();
io.reactivex.schedulers.Schedulers.io().start();
io.reactivex.schedulers.Schedulers.single().start();
io.reactivex.schedulers.Schedulers.newThread().start();
}

/**
* Wraps a Reactor-Core Scheduler and exposes it as an RxJava 2 Scheduler.
*/
static final class ReactorCoreSchedulerWrapper extends io.reactivex.Scheduler {

/**
* Wraps a Reactor-Core Scheduler and exposes it as an RxJava 2 Scheduler.
* @param scheduler an Reactor-Core {@link reactor.core.scheduler.Scheduler}
* @return a new {@link io.reactivex.Scheduler} instance
*/
public static io.reactivex.Scheduler from(reactor.core.scheduler.Scheduler scheduler) {
return new ReactorCoreSchedulerWrapper(scheduler);
}

final reactor.core.scheduler.Scheduler scheduler;

ReactorCoreSchedulerWrapper(reactor.core.scheduler.Scheduler scheduler) {
this.scheduler = Objects.requireNonNull(scheduler, "scheduler");
}

@Override
public io.reactivex.disposables.Disposable scheduleDirect(Runnable task) {
reactor.core.Disposable s = scheduler.schedule(task);
return io.reactivex.disposables.Disposables.fromAction(s::dispose);
}

@Override
public io.reactivex.disposables.Disposable scheduleDirect(Runnable task, long delay, TimeUnit unit) {
reactor.core.Disposable s = scheduler.schedule(task, delay, unit);
return io.reactivex.disposables.Disposables.fromAction(s::dispose);
}

@Override
public io.reactivex.disposables.Disposable schedulePeriodicallyDirect(Runnable task, long initialDelay, long period, TimeUnit unit) {
reactor.core.Disposable s =
scheduler.schedulePeriodically(task, initialDelay, period, unit);
return io.reactivex.disposables.Disposables.fromAction(s::dispose);
}

@Override
public Worker createWorker() {
return new ReactorCoreSchedulerWorker(scheduler.createWorker());
}

/**
* An RxJava 2 Worker that wraps a Reactor Core Worker.
*/
static final class ReactorCoreSchedulerWorker extends io.reactivex.Scheduler.Worker {
final reactor.core.scheduler.Scheduler.Worker w;

volatile boolean disposed;

ReactorCoreSchedulerWorker(reactor.core.scheduler.Scheduler.Worker w) {
this.w = w;
}

@Override
public io.reactivex.disposables.Disposable schedule(Runnable task) {
reactor.core.Disposable s = w.schedule(task);
return io.reactivex.disposables.Disposables.fromAction(s::dispose);
}

@Override
public void dispose() {
disposed = true;
w.dispose();
}

@Override
public boolean isDisposed() {
return disposed;
}

@Override
public io.reactivex.disposables.Disposable schedule(Runnable task, long delay, TimeUnit unit) {
reactor.core.Disposable s = w.schedule(task, delay, unit);
return io.reactivex.disposables.Disposables.fromAction(s::dispose);
}

@Override
public io.reactivex.disposables.Disposable schedulePeriodically(Runnable task, long initialDelay, long period, TimeUnit unit) {
reactor.core.Disposable s = w.schedulePeriodically(task, initialDelay, period, unit);
return io.reactivex.disposables.Disposables.fromAction(s::dispose);
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/*
* Copyright (c) 2019-Present Pivotal Software Inc, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package reactor.adapter.rxjava;

import java.util.Collections;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;

import org.junit.Test;

import io.reactivex.*;
import io.reactivex.schedulers.Schedulers;

public class RxJava2SchedulersTest {

@SuppressWarnings("unchecked")
static void runRxJavaTest(boolean expected, String name, Supplier<Scheduler> supplier) {
Flowable.fromCallable(() -> Thread.currentThread().getName().contains(name))
.subscribeOn(supplier.get())
.test()
.awaitDone(5, TimeUnit.SECONDS)
.assertResult(expected);

Single.fromCallable(() -> Thread.currentThread().getName().contains(name))
.subscribeOn(supplier.get())
.test()
.awaitDone(5, TimeUnit.SECONDS)
.assertResult(expected);

Flowable.timer(100, TimeUnit.MILLISECONDS, supplier.get())
.map(v -> Thread.currentThread().getName().contains(name))
.test()
.awaitDone(5, TimeUnit.SECONDS)
.assertResult(expected);

Flowable.fromCallable(() -> Thread.currentThread().getName().contains(name))
.subscribeOn(supplier.get())
.delay(20, TimeUnit.MILLISECONDS, Schedulers.computation())
.test()
.awaitDone(5, TimeUnit.SECONDS)
.assertResult(expected);

Flowable.interval(20, TimeUnit.MILLISECONDS, supplier.get())
.map(v -> Thread.currentThread().getName().contains(name))
.take(5)
.test()
.awaitDone(5, TimeUnit.SECONDS)
.assertResult(expected, expected, expected, expected, expected);

Flowable.fromCallable(() -> Thread.currentThread().getName().contains(name))
.subscribeOn(supplier.get())
.buffer(10, 20, TimeUnit.MILLISECONDS, supplier.get())
.test()
.awaitDone(5, TimeUnit.SECONDS)
.assertResult(Collections.singletonList(expected));
}

@Test
public void useAndThenDisable() {

// -----------------------------------------------------------------------------------------

runRxJavaTest(true, "RxComputation", () -> Schedulers.computation());
runRxJavaTest(true, "RxSingle", () -> Schedulers.single());
runRxJavaTest(true, "RxCached", () -> Schedulers.io());
runRxJavaTest(true, "RxNewThread", () -> Schedulers.newThread());

// -----------------------------------------------------------------------------------------

RxJava2Schedulers.useReactorCoreSchedulers();

runRxJavaTest(false, "RxComputation", () -> Schedulers.computation());
runRxJavaTest(false, "RxSingle", () -> Schedulers.single());
runRxJavaTest(false, "RxCached", () -> Schedulers.io());
runRxJavaTest(false, "RxNewThread", () -> Schedulers.newThread());

// -----------------------------------------------------------------------------------------

RxJava2Schedulers.resetReactorCoreSchedulers();

runRxJavaTest(true, "RxComputation", () -> Schedulers.computation());
runRxJavaTest(true, "RxSingle", () -> Schedulers.single());
runRxJavaTest(true, "RxCached", () -> Schedulers.io());
runRxJavaTest(true, "RxNewThread", () -> Schedulers.newThread());
}
}

0 comments on commit 98f3a52

Please sign in to comment.