Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow for thread interruption #593

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 100 additions & 2 deletions hystrix-core/src/main/java/com/netflix/hystrix/AbstractCommand.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,18 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

import com.netflix.hystrix.strategy.concurrency.HystrixContextScheduler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import rx.Notification;
import rx.Observable;
import rx.Observable.OnSubscribe;
import rx.Observable.Operator;
import rx.Producer;
import rx.Scheduler;
import rx.Subscriber;
import rx.Subscription;
import rx.functions.Action0;
import rx.functions.Action1;
import rx.functions.Func1;
Expand Down Expand Up @@ -528,7 +532,12 @@ public void call(Subscriber<? super R> s) {
}
}

}).subscribeOn(threadPool.getScheduler());
});

//I believe there is a bug in RxJava's {@link Observable#subscribeOn} method that doesn't properly hook up unsubscription.
//In place of that method, I'm using a custom operator ({@link OperatorSubscribeOnThatAllowsUnsubscribe}) that properly hooks these up.
//Once I get the work done in Rx to apply the fix, this should return to use {@link Observable#susbcribeOn}
run = run.nest().lift(new OperatorSubscribeOnThatAllowsUnsubscribe<R>(threadPool.getScheduler(), properties));
} else {
// semaphore isolated
executionHook.onRunStart(_self);
Expand Down Expand Up @@ -937,7 +946,6 @@ public void tick() {

timeoutRunnable.run();
}

}

@Override
Expand Down Expand Up @@ -997,6 +1005,96 @@ private boolean isNotTimedOut() {

}

public static class OperatorSubscribeOnThatAllowsUnsubscribe<T> implements Operator<T, Observable<T>> {

private final Scheduler scheduler;
private final HystrixCommandProperties properties;

public OperatorSubscribeOnThatAllowsUnsubscribe(Scheduler scheduler, HystrixCommandProperties properties) {
this.scheduler = scheduler;
this.properties = properties;
}

@Override
public Subscriber<? super Observable<T>> call(final Subscriber<? super T> subscriber) {
boolean shouldInterrupt = properties.executionIsolationThreadInterruptOnTimeout().get();
final Scheduler.Worker inner = ((HystrixContextScheduler) scheduler).createWorker(shouldInterrupt);
subscriber.add(inner);
return new Subscriber<Observable<T>>(subscriber) {

@Override
public void onCompleted() {
// ignore because this is a nested Observable and we expect only 1 Observable<T> emitted to onNext
}

@Override
public void onError(Throwable e) {
subscriber.onError(e);
}

@Override
public void onNext(final Observable<T> o) {
/**
* This is the diff from OperatorSubscribeOn. In that version, inner.schedule is called and then
* the Subscription is lost. Here we call subscriber.add on it.
*/
Subscription subscription = inner.schedule(new Action0() {

@Override
public void call() {
final Thread t = Thread.currentThread();
o.unsafeSubscribe(new Subscriber<T>(subscriber) {

@Override
public void onCompleted() {
subscriber.onCompleted();
}

@Override
public void onError(Throwable e) {
subscriber.onError(e);
}

@Override
public void onNext(T t) {
subscriber.onNext(t);
}

@Override
public void setProducer(final Producer producer) {
subscriber.setProducer(new Producer() {

@Override
public void request(final long n) {
if (Thread.currentThread() == t) {
// don't schedule if we're already on the thread (primarily for first setProducer call)
// see unit test 'testSetProducerSynchronousRequest' for more context on this
producer.request(n);
} else {
inner.schedule(new Action0() {

@Override
public void call() {
producer.request(n);
}
});
}
}

});
}

});
}
});
subscriber.add(subscription);
}

};
}
}


private static void setRequestContextIfNeeded(final HystrixRequestContext currentRequestContext) {
if (!HystrixRequestContext.isCurrentThreadInitialized()) {
// even if the user Observable doesn't have context we want it set for chained operators
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,12 @@
*/
package com.netflix.hystrix.strategy.concurrency;

import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

import com.netflix.hystrix.HystrixCommandProperties;
import rx.Scheduler;
import rx.Subscription;
import rx.functions.Action0;
Expand Down Expand Up @@ -62,6 +64,14 @@ public Worker createWorker() {
return new HystrixContextSchedulerWorker(actualScheduler.createWorker());
}

public Worker createWorker(boolean shouldInterrupt) {
if (actualScheduler instanceof ThreadPoolScheduler) {
return new HystrixContextSchedulerWorker(((ThreadPoolScheduler) actualScheduler).createWorker(shouldInterrupt));
} else {
return new HystrixContextSchedulerWorker(actualScheduler.createWorker());
}
}

private class HystrixContextSchedulerWorker extends Worker {

private BooleanSubscription s = new BooleanSubscription();
Expand Down Expand Up @@ -103,7 +113,7 @@ public Subscription schedule(Action0 action) {

}

private static class ThreadPoolScheduler extends Scheduler {
public static class ThreadPoolScheduler extends Scheduler {

private final HystrixThreadPool threadPool;

Expand All @@ -113,9 +123,12 @@ public ThreadPoolScheduler(HystrixThreadPool threadPool) {

@Override
public Worker createWorker() {
return new ThreadPoolWorker(threadPool);
return new ThreadPoolWorker(threadPool, true);
}

public Worker createWorker(boolean shouldInterrupt) {
return new ThreadPoolWorker(threadPool, shouldInterrupt);
}
}

/**
Expand All @@ -130,10 +143,12 @@ public Worker createWorker() {
private static class ThreadPoolWorker extends Worker {

private final HystrixThreadPool threadPool;
private final boolean shouldInterrupt;
private final CompositeSubscription subscription = new CompositeSubscription();

public ThreadPoolWorker(HystrixThreadPool threadPool) {
public ThreadPoolWorker(HystrixThreadPool threadPool, boolean shouldInterrupt) {
this.threadPool = threadPool;
this.shouldInterrupt = shouldInterrupt;
}

@Override
Expand All @@ -154,7 +169,7 @@ public Subscription schedule(final Action0 action) {
}

final AtomicReference<Subscription> sf = new AtomicReference<>();
Subscription s = Subscriptions.from(threadPool.getExecutor().submit(new Runnable() {
Future<?> submittedTask = threadPool.getExecutor().submit(new Runnable() {

@Override
public void run() {
Expand All @@ -165,13 +180,15 @@ public void run() {
action.call();
} finally {
// remove the subscription now that we're completed
Subscription s = sf.get();
if (s != null) {
subscription.remove(s);
Subscription completed = sf.get();
if (completed != null) {
subscription.remove(completed);
}
}
}
}));
});

Subscription s = new FutureSubscriptionWithConfigurableInterrupt(submittedTask, shouldInterrupt);

sf.set(s);
subscription.add(s);
Expand All @@ -183,6 +200,32 @@ public Subscription schedule(Action0 action, long delayTime, TimeUnit unit) {
throw new IllegalStateException("Hystrix does not support delayed scheduling");
}

/**
* Experimentally copied over from RxJava {@link rx.subscriptions.Subscriptions)}
* (https://github.com/ReactiveX/RxJava/blob/1.x/src/main/java/rx/subscriptions/Subscriptions.java)
*
* If this proves worthwhile, will work to get this functionality in RxJava proper and depend on it
*/
private static final class FutureSubscriptionWithConfigurableInterrupt implements Subscription {
final Future<?> f;
final boolean shouldInterruptOnCancel;

public FutureSubscriptionWithConfigurableInterrupt(Future<?> f, boolean shouldInterruptOnCancel) {
this.f = f;
this.shouldInterruptOnCancel = shouldInterruptOnCancel;
}
@Override
public void unsubscribe() {
f.cancel(shouldInterruptOnCancel);
}

@Override
public boolean isUnsubscribed() {
return f.isCancelled();
}
}


}

}
Loading