From b5aaf82ea8c7addd2cd197f1c61ddec9ee5ee962 Mon Sep 17 00:00:00 2001 From: akarnokd Date: Mon, 19 Jan 2015 09:10:53 +0100 Subject: [PATCH] ScheduledExecutorService: call purge periodically on JDK 6 to avoid cancelled task-retention. --- .../internal/schedulers/NewThreadWorker.java | 118 ++++++++++++++++-- .../rx/schedulers/CachedThreadScheduler.java | 1 + .../GenericScheduledExecutorService.java | 14 ++- .../schedulers/CachedThreadSchedulerTest.java | 91 +++++++------- 4 files changed, 158 insertions(+), 66 deletions(-) diff --git a/src/main/java/rx/internal/schedulers/NewThreadWorker.java b/src/main/java/rx/internal/schedulers/NewThreadWorker.java index ea80c651c3..65b8ac92a9 100644 --- a/src/main/java/rx/internal/schedulers/NewThreadWorker.java +++ b/src/main/java/rx/internal/schedulers/NewThreadWorker.java @@ -16,10 +16,14 @@ package rx.internal.schedulers; import java.lang.reflect.Method; +import java.util.Iterator; import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicReference; import rx.*; +import rx.exceptions.Exceptions; import rx.functions.Action0; +import rx.internal.util.RxThreadFactory; import rx.plugins.*; import rx.subscriptions.Subscriptions; @@ -30,24 +34,111 @@ public class NewThreadWorker extends Scheduler.Worker implements Subscription { private final ScheduledExecutorService executor; private final RxJavaSchedulersHook schedulersHook; volatile boolean isUnsubscribed; - + /** The purge frequency in milliseconds. */ + private static final String FREQUENCY_KEY = "io.reactivex.rxjava.scheduler.jdk6.purge-frequency-millis"; + /** Force the use of purge (true/false). */ + private static final String PURGE_FORCE_KEY = "io.reactivex.rxjava.scheduler.jdk6.purge-force"; + private static final String PURGE_THREAD_PREFIX = "RxSchedulerPurge-"; + /** Forces the use of purge even if setRemoveOnCancelPolicy is available. */ + private static final boolean PURGE_FORCE; + /** The purge frequency in milliseconds. */ + public static final int PURGE_FREQUENCY; + private static final ConcurrentHashMap EXECUTORS; + private static final AtomicReference PURGE; + static { + EXECUTORS = new ConcurrentHashMap(); + PURGE = new AtomicReference(); + PURGE_FORCE = Boolean.getBoolean(PURGE_FORCE_KEY); + PURGE_FREQUENCY = Integer.getInteger(FREQUENCY_KEY, 1000); + } + /** + * Registers the given executor service and starts the purge thread if not already started. + *

{@code public} visibility reason: called from other package(s) within RxJava + * @param service a scheduled thread pool executor instance + */ + public static void registerExecutor(ScheduledThreadPoolExecutor service) { + do { + ScheduledExecutorService exec = PURGE.get(); + if (exec != null) { + break; + } + exec = Executors.newScheduledThreadPool(1, new RxThreadFactory(PURGE_THREAD_PREFIX)); + if (PURGE.compareAndSet(null, exec)) { + exec.scheduleAtFixedRate(new Runnable() { + @Override + public void run() { + purgeExecutors(); + } + }, PURGE_FREQUENCY, PURGE_FREQUENCY, TimeUnit.MILLISECONDS); + + break; + } + } while (true); + + EXECUTORS.putIfAbsent(service, service); + } + /** + * Deregisters the executor service. + *

{@code public} visibility reason: called from other package(s) within RxJava + * @param service a scheduled thread pool executor instance + */ + public static void deregisterExecutor(ScheduledExecutorService service) { + EXECUTORS.remove(service); + } + /** Purges each registered executor and eagerly evicts shutdown executors. */ + static void purgeExecutors() { + try { + Iterator it = EXECUTORS.keySet().iterator(); + while (it.hasNext()) { + ScheduledThreadPoolExecutor exec = it.next(); + if (!exec.isShutdown()) { + exec.purge(); + } else { + it.remove(); + } + } + } catch (Throwable t) { + Exceptions.throwIfFatal(t); + RxJavaPlugins.getInstance().getErrorHandler().handleError(t); + } + } + + /** + * Tries to enable the Java 7+ setRemoveOnCancelPolicy. + *

{@code public} visibility reason: called from other package(s) within RxJava. + * If the method returns false, the {@link #registerExecutor(ScheduledThreadPoolExecutor)} may + * be called to enable the backup option of purging the executors. + * @param exec the executor to call setRemoveOnCaneclPolicy if available. + * @return true if the policy was successfully enabled + */ + public static boolean tryEnableCancelPolicy(ScheduledExecutorService exec) { + if (!PURGE_FORCE) { + for (Method m : exec.getClass().getMethods()) { + if (m.getName().equals("setRemoveOnCancelPolicy") + && m.getParameterTypes().length == 1 + && m.getParameterTypes()[0] == Boolean.TYPE) { + try { + m.invoke(exec, true); + return true; + } catch (Exception ex) { + RxJavaPlugins.getInstance().getErrorHandler().handleError(ex); + } + } + } + } + return false; + } + /* package */ public NewThreadWorker(ThreadFactory threadFactory) { - executor = Executors.newScheduledThreadPool(1, threadFactory); + ScheduledExecutorService exec = Executors.newScheduledThreadPool(1, threadFactory); // Java 7+: cancelled future tasks can be removed from the executor thus avoiding memory leak - for (Method m : executor.getClass().getMethods()) { - if (m.getName().equals("setRemoveOnCancelPolicy") - && m.getParameterTypes().length == 1 - && m.getParameterTypes()[0] == Boolean.TYPE) { - try { - m.invoke(executor, true); - } catch (Exception ex) { - RxJavaPlugins.getInstance().getErrorHandler().handleError(ex); - } - break; - } + boolean cancelSupported = tryEnableCancelPolicy(exec); + if (!cancelSupported && exec instanceof ScheduledThreadPoolExecutor) { + registerExecutor((ScheduledThreadPoolExecutor)exec); } schedulersHook = RxJavaPlugins.getInstance().getSchedulersHook(); + executor = exec; } @Override @@ -88,6 +179,7 @@ public ScheduledAction scheduleActual(final Action0 action, long delayTime, Time public void unsubscribe() { isUnsubscribed = true; executor.shutdownNow(); + deregisterExecutor(executor); } @Override diff --git a/src/main/java/rx/schedulers/CachedThreadScheduler.java b/src/main/java/rx/schedulers/CachedThreadScheduler.java index fe70a58a43..f1cd815b64 100644 --- a/src/main/java/rx/schedulers/CachedThreadScheduler.java +++ b/src/main/java/rx/schedulers/CachedThreadScheduler.java @@ -110,6 +110,7 @@ public Worker createWorker() { private static final class EventLoopWorker extends Scheduler.Worker { private final CompositeSubscription innerSubscription = new CompositeSubscription(); private final ThreadWorker threadWorker; + @SuppressWarnings("unused") volatile int once; static final AtomicIntegerFieldUpdater ONCE_UPDATER = AtomicIntegerFieldUpdater.newUpdater(EventLoopWorker.class, "once"); diff --git a/src/main/java/rx/schedulers/GenericScheduledExecutorService.java b/src/main/java/rx/schedulers/GenericScheduledExecutorService.java index 2feccf92d6..ca133275e7 100644 --- a/src/main/java/rx/schedulers/GenericScheduledExecutorService.java +++ b/src/main/java/rx/schedulers/GenericScheduledExecutorService.java @@ -16,12 +16,10 @@ package rx.schedulers; import rx.Scheduler; +import rx.internal.schedulers.NewThreadWorker; import rx.internal.util.RxThreadFactory; -import java.util.concurrent.Executor; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.*; /** * A default {@link ScheduledExecutorService} that can be used for scheduling actions when a {@link Scheduler} implementation doesn't have that ability. @@ -49,7 +47,13 @@ private GenericScheduledExecutorService() { if (count > 8) { count = 8; } - executor = Executors.newScheduledThreadPool(count, THREAD_FACTORY); + ScheduledExecutorService exec = Executors.newScheduledThreadPool(count, THREAD_FACTORY); + if (!NewThreadWorker.tryEnableCancelPolicy(exec)) { + if (exec instanceof ScheduledThreadPoolExecutor) { + NewThreadWorker.registerExecutor((ScheduledThreadPoolExecutor)exec); + } + } + executor = exec; } /** diff --git a/src/test/java/rx/schedulers/CachedThreadSchedulerTest.java b/src/test/java/rx/schedulers/CachedThreadSchedulerTest.java index 4b6b02eff2..2b22e78068 100644 --- a/src/test/java/rx/schedulers/CachedThreadSchedulerTest.java +++ b/src/test/java/rx/schedulers/CachedThreadSchedulerTest.java @@ -26,6 +26,7 @@ import rx.Observable; import rx.Scheduler; import rx.functions.*; +import rx.internal.schedulers.NewThreadWorker; import static org.junit.Assert.assertTrue; public class CachedThreadSchedulerTest extends AbstractSchedulerConcurrencyTests { @@ -73,55 +74,49 @@ public final void testHandledErrorIsNotDeliveredToThreadHandler() throws Interru @Test(timeout = 30000) public void testCancelledTaskRetention() throws InterruptedException { - try { - ScheduledThreadPoolExecutor.class.getMethod("setRemoveOnCancelPolicy", Boolean.TYPE); - - System.out.println("Wait before GC"); - Thread.sleep(1000); - - System.out.println("GC"); - System.gc(); - - Thread.sleep(1000); - - - MemoryMXBean memoryMXBean = ManagementFactory.getMemoryMXBean(); - MemoryUsage memHeap = memoryMXBean.getHeapMemoryUsage(); - long initial = memHeap.getUsed(); - - System.out.printf("Starting: %.3f MB%n", initial / 1024.0 / 1024.0); - - Scheduler.Worker w = Schedulers.io().createWorker(); - for (int i = 0; i < 750000; i++) { - if (i % 50000 == 0) { - System.out.println(" -> still scheduling: " + i); - } - w.schedule(Actions.empty(), 1, TimeUnit.DAYS); + System.out.println("Wait before GC"); + Thread.sleep(1000); + + System.out.println("GC"); + System.gc(); + + Thread.sleep(1000); + + + MemoryMXBean memoryMXBean = ManagementFactory.getMemoryMXBean(); + MemoryUsage memHeap = memoryMXBean.getHeapMemoryUsage(); + long initial = memHeap.getUsed(); + + System.out.printf("Starting: %.3f MB%n", initial / 1024.0 / 1024.0); + + Scheduler.Worker w = Schedulers.io().createWorker(); + for (int i = 0; i < 750000; i++) { + if (i % 50000 == 0) { + System.out.println(" -> still scheduling: " + i); } - - memHeap = memoryMXBean.getHeapMemoryUsage(); - long after = memHeap.getUsed(); - System.out.printf("Peak: %.3f MB%n", after / 1024.0 / 1024.0); - - w.unsubscribe(); - - System.out.println("Wait before second GC"); - Thread.sleep(1000); - - System.out.println("Second GC"); - System.gc(); - - Thread.sleep(1000); - - memHeap = memoryMXBean.getHeapMemoryUsage(); - long finish = memHeap.getUsed(); - System.out.printf("After: %.3f MB%n", finish / 1024.0 / 1024.0); - - if (finish > initial * 5) { - Assert.fail(String.format("Tasks retained: %.3f -> %.3f -> %.3f", initial / 1024 / 1024.0, after / 1024 / 1024.0, finish / 1024 / 1024d)); - } - } catch (NoSuchMethodException ex) { - // not supported, no reason to test for it + w.schedule(Actions.empty(), 1, TimeUnit.DAYS); + } + + memHeap = memoryMXBean.getHeapMemoryUsage(); + long after = memHeap.getUsed(); + System.out.printf("Peak: %.3f MB%n", after / 1024.0 / 1024.0); + + w.unsubscribe(); + + System.out.println("Wait before second GC"); + Thread.sleep(NewThreadWorker.PURGE_FREQUENCY + 2000); + + System.out.println("Second GC"); + System.gc(); + + Thread.sleep(1000); + + memHeap = memoryMXBean.getHeapMemoryUsage(); + long finish = memHeap.getUsed(); + System.out.printf("After: %.3f MB%n", finish / 1024.0 / 1024.0); + + if (finish > initial * 5) { + Assert.fail(String.format("Tasks retained: %.3f -> %.3f -> %.3f", initial / 1024 / 1024.0, after / 1024 / 1024.0, finish / 1024 / 1024d)); } }