From c6ada18be720fb9367c6c8a2b4fef176ae388c19 Mon Sep 17 00:00:00 2001 From: George Campbell Date: Thu, 12 May 2016 15:13:14 -0700 Subject: [PATCH] Change the workers to capture the stack trace for all subsequent scheduled actions to increase the readability of uncaught and fatal exceptions that bubble up to the schedulers. --- .../schedulers/CachedThreadScheduler.java | 1 + .../schedulers/EventLoopsScheduler.java | 2 +- .../schedulers/ExecutorScheduler.java | 5 +- .../internal/schedulers/NewThreadWorker.java | 11 ++-- .../internal/schedulers/ScheduledAction.java | 12 +++-- .../schedulers/SchedulerContextException.java | 45 ++++++++++++++++ .../rx/schedulers/AbstractSchedulerTests.java | 52 +++++++++++++++++++ 7 files changed, 119 insertions(+), 9 deletions(-) create mode 100644 src/main/java/rx/internal/schedulers/SchedulerContextException.java diff --git a/src/main/java/rx/internal/schedulers/CachedThreadScheduler.java b/src/main/java/rx/internal/schedulers/CachedThreadScheduler.java index c1655f0aa3..6b8bfcd9b1 100644 --- a/src/main/java/rx/internal/schedulers/CachedThreadScheduler.java +++ b/src/main/java/rx/internal/schedulers/CachedThreadScheduler.java @@ -78,6 +78,7 @@ ThreadWorker get() { while (!expiringWorkerQueue.isEmpty()) { ThreadWorker threadWorker = expiringWorkerQueue.poll(); if (threadWorker != null) { + threadWorker.resetContext(); return threadWorker; } } diff --git a/src/main/java/rx/internal/schedulers/EventLoopsScheduler.java b/src/main/java/rx/internal/schedulers/EventLoopsScheduler.java index fb813412db..afd4451a59 100644 --- a/src/main/java/rx/internal/schedulers/EventLoopsScheduler.java +++ b/src/main/java/rx/internal/schedulers/EventLoopsScheduler.java @@ -142,7 +142,7 @@ private static class EventLoopWorker extends Scheduler.Worker { EventLoopWorker(PoolWorker poolWorker) { this.poolWorker = poolWorker; - + poolWorker.resetContext(); } @Override diff --git a/src/main/java/rx/internal/schedulers/ExecutorScheduler.java b/src/main/java/rx/internal/schedulers/ExecutorScheduler.java index a57ee8c938..5dd0bd08e8 100644 --- a/src/main/java/rx/internal/schedulers/ExecutorScheduler.java +++ b/src/main/java/rx/internal/schedulers/ExecutorScheduler.java @@ -42,6 +42,7 @@ public Worker createWorker() { /** Worker that schedules tasks on the executor indirectly through a trampoline mechanism. */ static final class ExecutorSchedulerWorker extends Scheduler.Worker implements Runnable { + private final Throwable creationContext = SchedulerContextException.create(); final Executor executor; // TODO: use a better performing structure for task tracking final CompositeSubscription tasks; @@ -64,7 +65,7 @@ public Subscription schedule(Action0 action) { if (isUnsubscribed()) { return Subscriptions.unsubscribed(); } - ScheduledAction ea = new ScheduledAction(action, tasks); + ScheduledAction ea = new ScheduledAction(action, tasks, creationContext); tasks.add(ea); queue.offer(ea); if (wip.getAndIncrement() == 0) { @@ -146,7 +147,7 @@ public void call() { ((ScheduledAction)s2).add(removeMas); } } - }); + }, this.creationContext); // This will make sure if ea.call() gets executed before this line // we don't override the current task in mas. first.set(ea); diff --git a/src/main/java/rx/internal/schedulers/NewThreadWorker.java b/src/main/java/rx/internal/schedulers/NewThreadWorker.java index 585d27b3b6..4c4acbbee9 100644 --- a/src/main/java/rx/internal/schedulers/NewThreadWorker.java +++ b/src/main/java/rx/internal/schedulers/NewThreadWorker.java @@ -33,6 +33,7 @@ * @warn class description missing */ public class NewThreadWorker extends Scheduler.Worker implements Subscription { + private Throwable creationContext = SchedulerContextException.create(); private final ScheduledExecutorService executor; private final RxJavaSchedulersHook schedulersHook; volatile boolean isUnsubscribed; @@ -234,7 +235,7 @@ public Subscription schedule(final Action0 action, long delayTime, TimeUnit unit */ public ScheduledAction scheduleActual(final Action0 action, long delayTime, TimeUnit unit) { Action0 decoratedAction = schedulersHook.onSchedule(action); - ScheduledAction run = new ScheduledAction(decoratedAction); + ScheduledAction run = new ScheduledAction(decoratedAction, creationContext); Future f; if (delayTime <= 0) { f = executor.submit(run); @@ -247,7 +248,7 @@ public ScheduledAction scheduleActual(final Action0 action, long delayTime, Time } public ScheduledAction scheduleActual(final Action0 action, long delayTime, TimeUnit unit, CompositeSubscription parent) { Action0 decoratedAction = schedulersHook.onSchedule(action); - ScheduledAction run = new ScheduledAction(decoratedAction, parent); + ScheduledAction run = new ScheduledAction(decoratedAction, parent, creationContext); parent.add(run); Future f; @@ -263,7 +264,7 @@ public ScheduledAction scheduleActual(final Action0 action, long delayTime, Time public ScheduledAction scheduleActual(final Action0 action, long delayTime, TimeUnit unit, SubscriptionList parent) { Action0 decoratedAction = schedulersHook.onSchedule(action); - ScheduledAction run = new ScheduledAction(decoratedAction, parent); + ScheduledAction run = new ScheduledAction(decoratedAction, parent, creationContext); parent.add(run); Future f; @@ -288,4 +289,8 @@ public void unsubscribe() { public boolean isUnsubscribed() { return isUnsubscribed; } + + public void resetContext() { + creationContext = SchedulerContextException.create(); + } } diff --git a/src/main/java/rx/internal/schedulers/ScheduledAction.java b/src/main/java/rx/internal/schedulers/ScheduledAction.java index 0f7d145a20..e298ad2cc8 100644 --- a/src/main/java/rx/internal/schedulers/ScheduledAction.java +++ b/src/main/java/rx/internal/schedulers/ScheduledAction.java @@ -19,6 +19,7 @@ import java.util.concurrent.atomic.*; import rx.Subscription; +import rx.exceptions.Exceptions; import rx.exceptions.OnErrorNotImplementedException; import rx.functions.Action0; import rx.internal.util.SubscriptionList; @@ -34,18 +35,22 @@ public final class ScheduledAction extends AtomicReference implements Ru private static final long serialVersionUID = -3962399486978279857L; final SubscriptionList cancel; final Action0 action; + final Throwable creationContext; - public ScheduledAction(Action0 action) { + public ScheduledAction(Action0 action, Throwable creationContext) { this.action = action; this.cancel = new SubscriptionList(); + this.creationContext = creationContext; } - public ScheduledAction(Action0 action, CompositeSubscription parent) { + public ScheduledAction(Action0 action, CompositeSubscription parent, Throwable creationContext) { this.action = action; this.cancel = new SubscriptionList(new Remover(this, parent)); + this.creationContext = creationContext; } - public ScheduledAction(Action0 action, SubscriptionList parent) { + public ScheduledAction(Action0 action, SubscriptionList parent, Throwable creationContext) { this.action = action; this.cancel = new SubscriptionList(new Remover2(this, parent)); + this.creationContext = creationContext; } @Override @@ -61,6 +66,7 @@ public void run() { } else { ie = new IllegalStateException("Fatal Exception thrown on Scheduler.Worker thread.", e); } + Exceptions.addCause(ie, creationContext); RxJavaPlugins.getInstance().getErrorHandler().handleError(ie); Thread thread = Thread.currentThread(); thread.getUncaughtExceptionHandler().uncaughtException(thread, ie); diff --git a/src/main/java/rx/internal/schedulers/SchedulerContextException.java b/src/main/java/rx/internal/schedulers/SchedulerContextException.java new file mode 100644 index 0000000000..182ce58d3f --- /dev/null +++ b/src/main/java/rx/internal/schedulers/SchedulerContextException.java @@ -0,0 +1,45 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.internal.schedulers; + +/** + * Used only for providing context around where work was scheduled should an error occur in a different thread. + */ +public class SchedulerContextException extends Exception { + /** + * Constant to use when disabled + */ + private static final Throwable CONTEXT_MISSING = new SchedulerContextException("Missing context. Enable by setting the system property \"rxjava.captureSchedulerContext=true\""); + + static { + CONTEXT_MISSING.setStackTrace(new StackTraceElement[0]); + } + + /** + * @return a {@link Throwable} that captures the stack trace or a {@link Throwable} that documents how to enable the feature if needed. + */ + public static Throwable create() { + String def = "false"; + String setTo = System.getProperty("rxjava.captureSchedulerContext", def); + return setTo != def && "true".equals(setTo) ? new SchedulerContextException("Asynchronous work scheduled at") : CONTEXT_MISSING; + } + + private SchedulerContextException(String message) { + super(message); + } + + private static final long serialVersionUID = 1L; +} diff --git a/src/test/java/rx/schedulers/AbstractSchedulerTests.java b/src/test/java/rx/schedulers/AbstractSchedulerTests.java index f27c43807c..453b3f91ca 100644 --- a/src/test/java/rx/schedulers/AbstractSchedulerTests.java +++ b/src/test/java/rx/schedulers/AbstractSchedulerTests.java @@ -43,6 +43,8 @@ import rx.functions.Action0; import rx.functions.Action1; import rx.functions.Func1; +import rx.plugins.RxJavaErrorHandler; +import rx.plugins.RxJavaPlugins; /** * Base tests for all schedulers including Immediate/Current. @@ -502,4 +504,54 @@ public void onNext(T args) { } + @Test + public final void testStackTraceAcrossThreads() throws Throwable { + final AtomicReference exceptionRef = new AtomicReference(); + final CountDownLatch done = new CountDownLatch(1); + System.setProperty("rxjava.captureSchedulerContext", "true"); + + try { + + RxJavaPlugins.getInstance().reset(); + RxJavaPlugins.getInstance().registerErrorHandler(new RxJavaErrorHandler() { + @Override + public void handleError(Throwable e) { + exceptionRef.set(e); + done.countDown(); + } + }); + + try { + getScheduler().createWorker().schedule(new Action0() { + @Override + public void call() { + throw new RuntimeException(); + } + }); + } catch (Exception e) { + exceptionRef.set(e); + done.countDown(); + } + + done.await(); + + Throwable exception = exceptionRef.get(); + Throwable e = exception; + while (e.getCause() != null) { + e = e.getCause(); + } + + StackTraceElement[] st = e.getStackTrace(); + for (StackTraceElement stackTraceElement : st) { + if (stackTraceElement.getMethodName().equals("testStackTraceAcrossThreads")) { + // pass we found this class in the stack trace. + return; + } + } + + throw exception; + } finally { + System.setProperty("rxjava.captureSchedulerContext", "false"); + } + } }