Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

1.x: new hook management proposal #4007

Merged
merged 2 commits into from
Jun 23, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 24 additions & 30 deletions src/main/java/rx/Completable.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,6 @@
*/
@Experimental
public class Completable {
/** The error handler instance. */
static final RxJavaErrorHandler ERROR_HANDLER = RxJavaPlugins.getInstance().getErrorHandler();

/** The completable hook. */
static RxJavaCompletableExecutionHook HOOK = RxJavaPlugins.getInstance().getCompletableExecutionHook();

/**
* Callback used for building deferred computations that takes a CompletableSubscriber.
*/
Expand Down Expand Up @@ -146,7 +140,7 @@ public void onError(Throwable e) {
set.unsubscribe();
s.onError(e);
} else {
ERROR_HANDLER.handleError(e);
RxJavaHooks.onError(e);
}
}

Expand All @@ -167,7 +161,7 @@ public void onSubscribe(Subscription d) {
set.unsubscribe();
s.onError(npe);
} else {
ERROR_HANDLER.handleError(npe);
RxJavaHooks.onError(npe);
}
return;
}
Expand Down Expand Up @@ -215,7 +209,7 @@ public void onError(Throwable e) {
set.unsubscribe();
s.onError(e);
} else {
ERROR_HANDLER.handleError(e);
RxJavaHooks.onError(e);
}
}

Expand Down Expand Up @@ -256,7 +250,7 @@ public void onSubscribe(Subscription d) {
set.unsubscribe();
s.onError(e);
} else {
ERROR_HANDLER.handleError(e);
RxJavaHooks.onError(e);
}
return;
}
Expand All @@ -283,7 +277,7 @@ public void onSubscribe(Subscription d) {
set.unsubscribe();
s.onError(e);
} else {
ERROR_HANDLER.handleError(e);
RxJavaHooks.onError(e);
}
return;
}
Expand All @@ -294,7 +288,7 @@ public void onSubscribe(Subscription d) {
set.unsubscribe();
s.onError(npe);
} else {
ERROR_HANDLER.handleError(npe);
RxJavaHooks.onError(npe);
}
return;
}
Expand Down Expand Up @@ -386,7 +380,7 @@ public static Completable create(CompletableOnSubscribe onSubscribe) {
} catch (NullPointerException ex) {
throw ex;
} catch (Throwable ex) {
ERROR_HANDLER.handleError(ex);
RxJavaHooks.onError(ex);
throw toNpe(ex);
}
}
Expand Down Expand Up @@ -908,7 +902,7 @@ void dispose() {
try {
disposer.call(resource);
} catch (Throwable ex) {
ERROR_HANDLER.handleError(ex);
RxJavaHooks.onError(ex);
}
}
}
Expand Down Expand Up @@ -976,7 +970,7 @@ public void call() {
* not null (not verified)
*/
protected Completable(CompletableOnSubscribe onSubscribe) {
this.onSubscribe = HOOK.onCreate(onSubscribe);
this.onSubscribe = RxJavaHooks.onCreate(onSubscribe);
}

/**
Expand Down Expand Up @@ -1332,7 +1326,7 @@ public void onCompleted() {
try {
onAfterComplete.call();
} catch (Throwable e) {
ERROR_HANDLER.handleError(e);
RxJavaHooks.onError(e);
}
}

Expand Down Expand Up @@ -1365,7 +1359,7 @@ public void call() {
try {
onUnsubscribe.call();
} catch (Throwable e) {
ERROR_HANDLER.handleError(e);
RxJavaHooks.onError(e);
}
d.unsubscribe();
}
Expand Down Expand Up @@ -1543,7 +1537,7 @@ public final Completable lift(final CompletableOperator onLift) {
@Override
public void call(CompletableSubscriber s) {
try {
CompletableOperator onLiftDecorated = HOOK.onLift(onLift);
CompletableOperator onLiftDecorated = RxJavaHooks.onCompletableLift(onLift);
CompletableSubscriber sw = onLiftDecorated.call(s);

unsafeSubscribe(sw);
Expand Down Expand Up @@ -1868,7 +1862,7 @@ public void onCompleted() {

@Override
public void onError(Throwable e) {
ERROR_HANDLER.handleError(e);
RxJavaHooks.onError(e);
mad.unsubscribe();
deliverUncaughtException(e);
}
Expand All @@ -1885,7 +1879,7 @@ public void onSubscribe(Subscription d) {
* Subscribes to this Completable and calls the given Action0 when this Completable
* completes normally.
* <p>
* If this Completable emits an error, it is sent to ERROR_HANDLER.handleError and gets swallowed.
* If this Completable emits an error, it is sent to RxJavaHooks.onError and gets swallowed.
* @param onComplete the runnable called when this Completable completes normally
* @return the Subscription that allows cancelling the subscription
*/
Expand All @@ -1902,7 +1896,7 @@ public void onCompleted() {
try {
onComplete.call();
} catch (Throwable e) {
ERROR_HANDLER.handleError(e);
RxJavaHooks.onError(e);
deliverUncaughtException(e);
} finally {
mad.unsubscribe();
Expand All @@ -1912,7 +1906,7 @@ public void onCompleted() {

@Override
public void onError(Throwable e) {
ERROR_HANDLER.handleError(e);
RxJavaHooks.onError(e);
mad.unsubscribe();
deliverUncaughtException(e);
}
Expand Down Expand Up @@ -1961,7 +1955,7 @@ public void onError(Throwable e) {
done = true;
callOnError(e);
} else {
ERROR_HANDLER.handleError(e);
RxJavaHooks.onError(e);
deliverUncaughtException(e);
}
}
Expand All @@ -1971,7 +1965,7 @@ void callOnError(Throwable e) {
onError.call(e);
} catch (Throwable ex) {
e = new CompositeException(Arrays.asList(e, ex));
ERROR_HANDLER.handleError(e);
RxJavaHooks.onError(e);
deliverUncaughtException(e);
} finally {
mad.unsubscribe();
Expand Down Expand Up @@ -2000,15 +1994,15 @@ private static void deliverUncaughtException(Throwable e) {
public final void unsafeSubscribe(CompletableSubscriber s) {
requireNonNull(s);
try {
CompletableOnSubscribe onSubscribeDecorated = HOOK.onSubscribeStart(this, this.onSubscribe);
CompletableOnSubscribe onSubscribeDecorated = RxJavaHooks.onCompletableStart(this, this.onSubscribe);

onSubscribeDecorated.call(s);
} catch (NullPointerException ex) {
throw ex;
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
ex = HOOK.onSubscribeError(ex);
ERROR_HANDLER.handleError(ex);
ex = RxJavaHooks.onCompletableError(ex);
RxJavaHooks.onError(ex);
throw toNpe(ex);
}
}
Expand Down Expand Up @@ -2066,13 +2060,13 @@ public void onSubscribe(Subscription d) {
s.add(d);
}
});
RxJavaPlugins.getInstance().getObservableExecutionHook().onSubscribeReturn(s);
RxJavaHooks.onObservableReturn(s);
} catch (NullPointerException ex) {
throw ex;
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
ex = HOOK.onSubscribeError(ex);
ERROR_HANDLER.handleError(ex);
ex = RxJavaHooks.onObservableError(ex);
RxJavaHooks.onError(ex);
throw toNpe(ex);
}
}
Expand Down
28 changes: 13 additions & 15 deletions src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,6 @@ protected Observable(OnSubscribe<T> f) {
this.onSubscribe = f;
}

static final RxJavaObservableExecutionHook hook = RxJavaPlugins.getInstance().getObservableExecutionHook();

/**
* Returns an Observable that will execute the specified function when a {@link Subscriber} subscribes to
* it.
Expand Down Expand Up @@ -91,7 +89,7 @@ protected Observable(OnSubscribe<T> f) {
* @see <a href="http://reactivex.io/documentation/operators/create.html">ReactiveX operators documentation: Create</a>
*/
public static <T> Observable<T> create(OnSubscribe<T> f) {
return new Observable<T>(hook.onCreate(f));
return new Observable<T>(RxJavaHooks.onCreate(f));
}

/**
Expand Down Expand Up @@ -128,7 +126,7 @@ public static <T> Observable<T> create(OnSubscribe<T> f) {
*/
@Beta
public static <S, T> Observable<T> create(SyncOnSubscribe<S, T> syncOnSubscribe) {
return new Observable<T>(hook.onCreate(syncOnSubscribe));
return create((OnSubscribe<T>)syncOnSubscribe);
}

/**
Expand Down Expand Up @@ -164,7 +162,7 @@ public static <S, T> Observable<T> create(SyncOnSubscribe<S, T> syncOnSubscribe)
*/
@Experimental
public static <S, T> Observable<T> create(AsyncOnSubscribe<S, T> asyncOnSubscribe) {
return new Observable<T>(hook.onCreate(asyncOnSubscribe));
return create((OnSubscribe<T>)asyncOnSubscribe);
}

/**
Expand Down Expand Up @@ -238,7 +236,7 @@ public void call(Subscriber<? super T> subscriber) {
* @see <a href="https://github.com/ReactiveX/RxJava/wiki/Implementing-Your-Own-Operators">RxJava wiki: Implementing Your Own Operators</a>
*/
public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {
return new Observable<R>(new OnSubscribeLift<T, R>(onSubscribe, operator));
return create(new OnSubscribeLift<T, R>(onSubscribe, operator));
}

/**
Expand Down Expand Up @@ -8663,21 +8661,21 @@ public final Subscription unsafeSubscribe(Subscriber<? super T> subscriber) {
// new Subscriber so onStart it
subscriber.onStart();
// allow the hook to intercept and/or decorate
hook.onSubscribeStart(this, onSubscribe).call(subscriber);
return hook.onSubscribeReturn(subscriber);
RxJavaHooks.onObservableStart(this, onSubscribe).call(subscriber);
return RxJavaHooks.onObservableReturn(subscriber);
} catch (Throwable e) {
// special handling for certain Throwable/Error/Exception types
Exceptions.throwIfFatal(e);
// if an unhandled error occurs executing the onSubscribe we will propagate it
try {
subscriber.onError(hook.onSubscribeError(e));
subscriber.onError(RxJavaHooks.onObservableError(e));
} catch (Throwable e2) {
Exceptions.throwIfFatal(e2);
// if this happens it means the onError itself failed (perhaps an invalid function implementation)
// so we are unable to propagate the error correctly and will just throw
RuntimeException r = new RuntimeException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2);
// TODO could the hook be the cause of the error in the on error handling.
hook.onSubscribeError(r);
RxJavaHooks.onObservableError(r);
// TODO why aren't we throwing the hook's return value.
throw r;
}
Expand Down Expand Up @@ -8756,25 +8754,25 @@ static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T
// add a significant depth to already huge call stacks.
try {
// allow the hook to intercept and/or decorate
hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber);
return hook.onSubscribeReturn(subscriber);
RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber);
return RxJavaHooks.onObservableReturn(subscriber);
} catch (Throwable e) {
// special handling for certain Throwable/Error/Exception types
Exceptions.throwIfFatal(e);
// in case the subscriber can't listen to exceptions anymore
if (subscriber.isUnsubscribed()) {
RxJavaPluginUtils.handleException(hook.onSubscribeError(e));
RxJavaPluginUtils.handleException(RxJavaHooks.onObservableError(e));
} else {
// if an unhandled error occurs executing the onSubscribe we will propagate it
try {
subscriber.onError(hook.onSubscribeError(e));
subscriber.onError(RxJavaHooks.onObservableError(e));
} catch (Throwable e2) {
Exceptions.throwIfFatal(e2);
// if this happens it means the onError itself failed (perhaps an invalid function implementation)
// so we are unable to propagate the error correctly and will just throw
RuntimeException r = new OnErrorFailedException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2);
// TODO could the hook be the cause of the error in the on error handling.
hook.onSubscribeError(r);
RxJavaHooks.onObservableError(r);
// TODO why aren't we throwing the hook's return value.
throw r;
}
Expand Down
Loading