Skip to content

Commit

Permalink
1.x: fix reset() shutting down everything other than the schedulers (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
akarnokd committed Jun 7, 2016
1 parent f78c0d4 commit 08a2130
Showing 1 changed file with 41 additions and 23 deletions.
64 changes: 41 additions & 23 deletions src/main/java/rx/schedulers/Schedulers.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ private static Schedulers getInstance() {
if (INSTANCE.compareAndSet(null, current)) {
return current;
} else {
shutdown();
current.shutdownInstance();
}
}
}
Expand Down Expand Up @@ -168,8 +168,10 @@ public static Scheduler from(Executor executor) {
*/
@Experimental
public static void reset() {
shutdown();
INSTANCE.set(null);
Schedulers s = INSTANCE.getAndSet(null);
if (s != null) {
s.shutdownInstance();
}
}

/**
Expand All @@ -178,16 +180,10 @@ public static void reset() {
*/
/* public test only */ static void start() {
Schedulers s = getInstance();

s.startInstance();

synchronized (s) {
if (s.computationScheduler instanceof SchedulerLifecycle) {
((SchedulerLifecycle) s.computationScheduler).start();
}
if (s.ioScheduler instanceof SchedulerLifecycle) {
((SchedulerLifecycle) s.ioScheduler).start();
}
if (s.newThreadScheduler instanceof SchedulerLifecycle) {
((SchedulerLifecycle) s.newThreadScheduler).start();
}
GenericScheduledExecutorService.INSTANCE.start();

RxRingBuffer.SPSC_POOL.start();
Expand All @@ -201,22 +197,44 @@ public static void reset() {
*/
public static void shutdown() {
Schedulers s = getInstance();
synchronized (s) {
if (s.computationScheduler instanceof SchedulerLifecycle) {
((SchedulerLifecycle) s.computationScheduler).shutdown();
}
if (s.ioScheduler instanceof SchedulerLifecycle) {
((SchedulerLifecycle) s.ioScheduler).shutdown();
}
if (s.newThreadScheduler instanceof SchedulerLifecycle) {
((SchedulerLifecycle) s.newThreadScheduler).shutdown();
}
s.shutdownInstance();

synchronized (s) {
GenericScheduledExecutorService.INSTANCE.shutdown();

RxRingBuffer.SPSC_POOL.shutdown();

RxRingBuffer.SPMC_POOL.shutdown();
}
}

/**
* Start the instance-specific schedulers.
*/
synchronized void startInstance() {
if (computationScheduler instanceof SchedulerLifecycle) {
((SchedulerLifecycle) computationScheduler).start();
}
if (ioScheduler instanceof SchedulerLifecycle) {
((SchedulerLifecycle) ioScheduler).start();
}
if (newThreadScheduler instanceof SchedulerLifecycle) {
((SchedulerLifecycle) newThreadScheduler).start();
}
}

/**
* Start the instance-specific schedulers.
*/
synchronized void shutdownInstance() {
if (computationScheduler instanceof SchedulerLifecycle) {
((SchedulerLifecycle) computationScheduler).shutdown();
}
if (ioScheduler instanceof SchedulerLifecycle) {
((SchedulerLifecycle) ioScheduler).shutdown();
}
if (newThreadScheduler instanceof SchedulerLifecycle) {
((SchedulerLifecycle) newThreadScheduler).shutdown();
}
}
}

0 comments on commit 08a2130

Please sign in to comment.