Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

1.x: Change the workers to capture the stack trace #3937

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ ThreadWorker get() {
while (!expiringWorkerQueue.isEmpty()) {
ThreadWorker threadWorker = expiringWorkerQueue.poll();
if (threadWorker != null) {
threadWorker.resetContext();
return threadWorker;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ private static class EventLoopWorker extends Scheduler.Worker {

EventLoopWorker(PoolWorker poolWorker) {
this.poolWorker = poolWorker;

poolWorker.resetContext();
}

@Override
Expand Down
5 changes: 3 additions & 2 deletions src/main/java/rx/internal/schedulers/ExecutorScheduler.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ public Worker createWorker() {

/** Worker that schedules tasks on the executor indirectly through a trampoline mechanism. */
static final class ExecutorSchedulerWorker extends Scheduler.Worker implements Runnable {
private final Throwable creationContext = SchedulerContextException.create();
final Executor executor;
// TODO: use a better performing structure for task tracking
final CompositeSubscription tasks;
Expand All @@ -64,7 +65,7 @@ public Subscription schedule(Action0 action) {
if (isUnsubscribed()) {
return Subscriptions.unsubscribed();
}
ScheduledAction ea = new ScheduledAction(action, tasks);
ScheduledAction ea = new ScheduledAction(action, tasks, creationContext);
tasks.add(ea);
queue.offer(ea);
if (wip.getAndIncrement() == 0) {
Expand Down Expand Up @@ -146,7 +147,7 @@ public void call() {
((ScheduledAction)s2).add(removeMas);
}
}
});
}, this.creationContext);
// This will make sure if ea.call() gets executed before this line
// we don't override the current task in mas.
first.set(ea);
Expand Down
11 changes: 8 additions & 3 deletions src/main/java/rx/internal/schedulers/NewThreadWorker.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
* @warn class description missing
*/
public class NewThreadWorker extends Scheduler.Worker implements Subscription {
private Throwable creationContext = SchedulerContextException.create();
private final ScheduledExecutorService executor;
private final RxJavaSchedulersHook schedulersHook;
volatile boolean isUnsubscribed;
Expand Down Expand Up @@ -234,7 +235,7 @@ public Subscription schedule(final Action0 action, long delayTime, TimeUnit unit
*/
public ScheduledAction scheduleActual(final Action0 action, long delayTime, TimeUnit unit) {
Action0 decoratedAction = schedulersHook.onSchedule(action);
ScheduledAction run = new ScheduledAction(decoratedAction);
ScheduledAction run = new ScheduledAction(decoratedAction, creationContext);
Future<?> f;
if (delayTime <= 0) {
f = executor.submit(run);
Expand All @@ -247,7 +248,7 @@ public ScheduledAction scheduleActual(final Action0 action, long delayTime, Time
}
public ScheduledAction scheduleActual(final Action0 action, long delayTime, TimeUnit unit, CompositeSubscription parent) {
Action0 decoratedAction = schedulersHook.onSchedule(action);
ScheduledAction run = new ScheduledAction(decoratedAction, parent);
ScheduledAction run = new ScheduledAction(decoratedAction, parent, creationContext);
parent.add(run);

Future<?> f;
Expand All @@ -263,7 +264,7 @@ public ScheduledAction scheduleActual(final Action0 action, long delayTime, Time

public ScheduledAction scheduleActual(final Action0 action, long delayTime, TimeUnit unit, SubscriptionList parent) {
Action0 decoratedAction = schedulersHook.onSchedule(action);
ScheduledAction run = new ScheduledAction(decoratedAction, parent);
ScheduledAction run = new ScheduledAction(decoratedAction, parent, creationContext);
parent.add(run);

Future<?> f;
Expand All @@ -288,4 +289,8 @@ public void unsubscribe() {
public boolean isUnsubscribed() {
return isUnsubscribed;
}

public void resetContext() {
creationContext = SchedulerContextException.create();
}
}
12 changes: 9 additions & 3 deletions src/main/java/rx/internal/schedulers/ScheduledAction.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.util.concurrent.atomic.*;

import rx.Subscription;
import rx.exceptions.Exceptions;
import rx.exceptions.OnErrorNotImplementedException;
import rx.functions.Action0;
import rx.internal.util.SubscriptionList;
Expand All @@ -34,18 +35,22 @@ public final class ScheduledAction extends AtomicReference<Thread> implements Ru
private static final long serialVersionUID = -3962399486978279857L;
final SubscriptionList cancel;
final Action0 action;
final Throwable creationContext;

public ScheduledAction(Action0 action) {
public ScheduledAction(Action0 action, Throwable creationContext) {
this.action = action;
this.cancel = new SubscriptionList();
this.creationContext = creationContext;
}
public ScheduledAction(Action0 action, CompositeSubscription parent) {
public ScheduledAction(Action0 action, CompositeSubscription parent, Throwable creationContext) {
this.action = action;
this.cancel = new SubscriptionList(new Remover(this, parent));
this.creationContext = creationContext;
}
public ScheduledAction(Action0 action, SubscriptionList parent) {
public ScheduledAction(Action0 action, SubscriptionList parent, Throwable creationContext) {
this.action = action;
this.cancel = new SubscriptionList(new Remover2(this, parent));
this.creationContext = creationContext;
}

@Override
Expand All @@ -61,6 +66,7 @@ public void run() {
} else {
ie = new IllegalStateException("Fatal Exception thrown on Scheduler.Worker thread.", e);
}
Exceptions.addCause(ie, creationContext);
RxJavaPlugins.getInstance().getErrorHandler().handleError(ie);
Thread thread = Thread.currentThread();
thread.getUncaughtExceptionHandler().uncaughtException(thread, ie);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/**
* Copyright 2016 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package rx.internal.schedulers;

/**
* Used only for providing context around where work was scheduled should an error occur in a different thread.
*/
public class SchedulerContextException extends Exception {
/**
* Constant to use when disabled
*/
private static final Throwable CONTEXT_MISSING = new SchedulerContextException("Missing context. Enable by setting the system property \"rxjava.captureSchedulerContext=true\"");

static {
CONTEXT_MISSING.setStackTrace(new StackTraceElement[0]);
}

/**
* @return a {@link Throwable} that captures the stack trace or a {@link Throwable} that documents how to enable the feature if needed.
*/
public static Throwable create() {
String def = "false";
String setTo = System.getProperty("rxjava.captureSchedulerContext", def);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not experienced with it, but AccessController.doPrivileged could be used for defending the setting of the volatile boolean field.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this about setting up a public API to toggle the features? I am considering adding the method to the plugin class or the execution hooks.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

doPreveleged only penalizes the mode change and not this create() method.

return setTo != def && "true".equals(setTo) ? new SchedulerContextException("Asynchronous work scheduled at") : CONTEXT_MISSING;
}

private SchedulerContextException(String message) {
super(message);
}

private static final long serialVersionUID = 1L;
}
52 changes: 52 additions & 0 deletions src/test/java/rx/schedulers/AbstractSchedulerTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@
import rx.functions.Action0;
import rx.functions.Action1;
import rx.functions.Func1;
import rx.plugins.RxJavaErrorHandler;
import rx.plugins.RxJavaPlugins;

/**
* Base tests for all schedulers including Immediate/Current.
Expand Down Expand Up @@ -502,4 +504,54 @@ public void onNext(T args) {

}

@Test
public final void testStackTraceAcrossThreads() throws Throwable {
final AtomicReference<Throwable> exceptionRef = new AtomicReference<Throwable>();
final CountDownLatch done = new CountDownLatch(1);
System.setProperty("rxjava.captureSchedulerContext", "true");

try {

RxJavaPlugins.getInstance().reset();
RxJavaPlugins.getInstance().registerErrorHandler(new RxJavaErrorHandler() {
@Override
public void handleError(Throwable e) {
exceptionRef.set(e);
done.countDown();
}
});

try {
getScheduler().createWorker().schedule(new Action0() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This leaks the worker.

@Override
public void call() {
throw new RuntimeException();
}
});
} catch (Exception e) {
exceptionRef.set(e);
done.countDown();
}

done.await();

Throwable exception = exceptionRef.get();
Throwable e = exception;
while (e.getCause() != null) {
e = e.getCause();
}

StackTraceElement[] st = e.getStackTrace();
for (StackTraceElement stackTraceElement : st) {
if (stackTraceElement.getMethodName().equals("testStackTraceAcrossThreads")) {
// pass we found this class in the stack trace.
return;
}
}

throw exception;
} finally {
System.setProperty("rxjava.captureSchedulerContext", "false");
}
}
}