Skip to content

Commit

Permalink
2.x: wrap undeliverable errors (#5080)
Browse files Browse the repository at this point in the history
* 2.x: wrap undeliverable errors

* Add CompositeException to isBug, add test for isBug
  • Loading branch information
akarnokd authored Feb 8, 2017
1 parent 66fbf5a commit 8819cc9
Show file tree
Hide file tree
Showing 154 changed files with 447 additions and 255 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/**
* Copyright (c) 2016-present, RxJava Contributors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/

package io.reactivex.exceptions;

import io.reactivex.annotations.Experimental;

/**
* Explicitly named exception to indicate a Reactive-Streams
* protocol violation.
* @since 2.0.6 - experimental
*/
@Experimental
public final class ProtocolViolationException extends IllegalStateException {

private static final long serialVersionUID = 1644750035281290266L;

/**
* Creates an instance with the given message.
* @param message the message
*/
public ProtocolViolationException(String message) {
super(message);
}
}
35 changes: 35 additions & 0 deletions src/main/java/io/reactivex/exceptions/UndeliverableException.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/**
* Copyright (c) 2016-present, RxJava Contributors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/

package io.reactivex.exceptions;

import io.reactivex.annotations.Experimental;

/**
* Wrapper for Throwable errors that are sent to `RxJavaPlugins.onError`.
* @since 2.0.6 - experimental
*/
@Experimental
public final class UndeliverableException extends IllegalStateException {

private static final long serialVersionUID = 1644750035281290266L;

/**
* Construct an instance by wrapping the given, non-null
* cause Throwable.
* @param cause the cause, not null
*/
public UndeliverableException(Throwable cause) {
super(cause);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import java.util.concurrent.atomic.AtomicReference;

import io.reactivex.disposables.Disposable;
import io.reactivex.exceptions.ProtocolViolationException;
import io.reactivex.internal.functions.ObjectHelper;
import io.reactivex.plugins.RxJavaPlugins;

Expand Down Expand Up @@ -152,7 +153,7 @@ public static boolean validate(Disposable current, Disposable next) {
* Reports that the disposable is already set to the RxJavaPlugins error handler.
*/
public static void reportDisposableSet() {
RxJavaPlugins.onError(new IllegalStateException("Disposable already set!"));
RxJavaPlugins.onError(new ProtocolViolationException("Disposable already set!"));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import org.reactivestreams.Subscription;

import io.reactivex.exceptions.ProtocolViolationException;
import io.reactivex.internal.functions.ObjectHelper;
import io.reactivex.internal.util.BackpressureHelper;
import io.reactivex.plugins.RxJavaPlugins;
Expand Down Expand Up @@ -67,7 +68,7 @@ public static boolean validate(Subscription current, Subscription next) {
* which is an indication of a onSubscribe management bug.
*/
public static void reportSubscriptionSet() {
RxJavaPlugins.onError(new IllegalStateException("Subscription already set!"));
RxJavaPlugins.onError(new ProtocolViolationException("Subscription already set!"));
}

/**
Expand All @@ -89,7 +90,7 @@ public static boolean validate(long n) {
* @param n the overproduction amount
*/
public static void reportMoreProduced(long n) {
RxJavaPlugins.onError(new IllegalStateException("More produced than requested: " + n));
RxJavaPlugins.onError(new ProtocolViolationException("More produced than requested: " + n));
}
/**
* Check if the given subscription is the common cancelled subscription.
Expand Down
45 changes: 45 additions & 0 deletions src/main/java/io/reactivex/plugins/RxJavaPlugins.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import io.reactivex.annotations.Experimental;
import io.reactivex.annotations.NonNull;
import io.reactivex.annotations.Nullable;
import io.reactivex.exceptions.*;
import io.reactivex.flowables.ConnectableFlowable;
import io.reactivex.functions.BiFunction;
import io.reactivex.functions.BooleanSupplier;
Expand Down Expand Up @@ -360,6 +361,10 @@ public static void onError(@NonNull Throwable error) {

if (error == null) {
error = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
} else {
if (!isBug(error)) {
error = new UndeliverableException(error);
}
}

if (f != null) {
Expand All @@ -377,6 +382,46 @@ public static void onError(@NonNull Throwable error) {
uncaught(error);
}

/**
* Checks if the given error is one of the already named
* bug cases that should pass through {@link #onError(Throwable)}
* as is.
* @param error the error to check
* @return true if the error should pass throug, false if
* it may be wrapped into an UndeliverableException
*/
static boolean isBug(Throwable error) {
// user forgot to add the onError handler in subscribe
if (error instanceof OnErrorNotImplementedException) {
return true;
}
// the sender didn't honor the request amount
// it's either due to an operator bug or concurrent onNext
if (error instanceof MissingBackpressureException) {
return true;
}
// general protocol violations
// it's either due to an operator bug or concurrent onNext
if (error instanceof IllegalStateException) {
return true;
}
// nulls are generally not allowed
// likely an operator bug or missing null-check
if (error instanceof NullPointerException) {
return true;
}
// bad arguments, likely invalid user input
if (error instanceof IllegalArgumentException) {
return true;
}
// Crash while handling an exception
if (error instanceof CompositeException) {
return true;
}
// everything else is probably due to lifecycle limits
return false;
}

static void uncaught(@NonNull Throwable error) {
Thread currentThread = Thread.currentThread();
UncaughtExceptionHandler handler = currentThread.getUncaughtExceptionHandler();
Expand Down
42 changes: 40 additions & 2 deletions src/test/java/io/reactivex/TestHelper.java
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,21 @@ public static void assertError(List<Throwable> list, int index, Class<? extends
}
}

public static void assertUndeliverable(List<Throwable> list, int index, Class<? extends Throwable> clazz) {
Throwable ex = list.get(index);
if (!(ex instanceof UndeliverableException)) {
AssertionError err = new AssertionError("Outer exception UndeliverableException expected but got " + list.get(index));
err.initCause(list.get(index));
throw err;
}
ex = ex.getCause();
if (!clazz.isInstance(ex)) {
AssertionError err = new AssertionError("Inner exception " + clazz + " expected but got " + list.get(index));
err.initCause(list.get(index));
throw err;
}
}

public static void assertError(List<Throwable> list, int index, Class<? extends Throwable> clazz, String message) {
Throwable ex = list.get(index);
if (!clazz.isInstance(ex)) {
Expand All @@ -168,6 +183,26 @@ public static void assertError(List<Throwable> list, int index, Class<? extends
}
}

public static void assertUndeliverable(List<Throwable> list, int index, Class<? extends Throwable> clazz, String message) {
Throwable ex = list.get(index);
if (!(ex instanceof UndeliverableException)) {
AssertionError err = new AssertionError("Outer exception UndeliverableException expected but got " + list.get(index));
err.initCause(list.get(index));
throw err;
}
ex = ex.getCause();
if (!clazz.isInstance(ex)) {
AssertionError err = new AssertionError("Inner exception " + clazz + " expected but got " + list.get(index));
err.initCause(list.get(index));
throw err;
}
if (!ObjectHelper.equals(message, ex.getMessage())) {
AssertionError err = new AssertionError("Message " + message + " expected but got " + ex.getMessage());
err.initCause(ex);
throw err;
}
}

public static void assertError(TestObserver<?> ts, int index, Class<? extends Throwable> clazz) {
Throwable ex = ts.errors().get(0);
try {
Expand Down Expand Up @@ -386,6 +421,9 @@ public void run() {
* @return the list of Throwables
*/
public static List<Throwable> compositeList(Throwable ex) {
if (ex instanceof UndeliverableException) {
ex = ex.getCause();
}
return ((CompositeException)ex).getExceptions();
}

Expand Down Expand Up @@ -2428,7 +2466,7 @@ protected void subscribeActual(Observer<? super T> observer) {
}
}

assertError(errors, 0, TestException.class, "second");
assertUndeliverable(errors, 0, TestException.class, "second");
} catch (AssertionError ex) {
throw ex;
} catch (Throwable ex) {
Expand Down Expand Up @@ -2587,7 +2625,7 @@ protected void subscribeActual(Subscriber<? super T> observer) {
}
}

assertError(errors, 0, TestException.class, "second");
assertUndeliverable(errors, 0, TestException.class, "second");
} catch (AssertionError ex) {
throw ex;
} catch (Throwable ex) {
Expand Down
8 changes: 6 additions & 2 deletions src/test/java/io/reactivex/flowable/FlowableCollectTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,9 @@ public void testCollectorFailureDoesNotResultInTwoErrorEmissionsFlowable() {
.test() //
.assertError(e1) //
.assertNotComplete();
assertEquals(Arrays.asList(e2), list);

assertEquals(1, list.size());
assertEquals(e2, list.get(0).getCause());
} finally {
RxJavaPlugins.reset();
}
Expand Down Expand Up @@ -272,7 +274,9 @@ public void testCollectorFailureDoesNotResultInTwoErrorEmissions() {
.test() //
.assertError(e1) //
.assertNotComplete();
assertEquals(Arrays.asList(e2), list);

assertEquals(1, list.size());
assertEquals(e2, list.get(0).getCause());
} finally {
RxJavaPlugins.reset();
}
Expand Down
15 changes: 11 additions & 4 deletions src/test/java/io/reactivex/flowable/FlowableScanTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,16 @@

package io.reactivex.flowable;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.*;

import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

import org.junit.Test;

import io.reactivex.Flowable;
import io.reactivex.*;
import io.reactivex.exceptions.UndeliverableException;
import io.reactivex.flowable.FlowableEventStream.Event;
import io.reactivex.functions.*;
import io.reactivex.plugins.RxJavaPlugins;
Expand Down Expand Up @@ -65,7 +66,10 @@ public void accept(Throwable t) throws Exception {
.test()
.assertNoValues()
.assertError(e);
assertEquals(Arrays.asList(e2), list);

assertEquals("" + list, 1, list.size());
assertTrue("" + list, list.get(0) instanceof UndeliverableException);
assertEquals(e2, list.get(0).getCause());
} finally {
RxJavaPlugins.reset();
}
Expand Down Expand Up @@ -142,7 +146,10 @@ public void accept(Throwable t) throws Exception {
.test()
.assertValue(1)
.assertError(e);
assertEquals(Arrays.asList(e2), list);

assertEquals("" + list, 1, list.size());
assertTrue("" + list, list.get(0) instanceof UndeliverableException);
assertEquals(e2, list.get(0).getCause());
} finally {
RxJavaPlugins.reset();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -722,7 +722,7 @@ public void run() throws Exception {

s.onComplete();

TestHelper.assertError(list, 0, TestException.class, "Inner");
TestHelper.assertUndeliverable(list, 0, TestException.class, "Inner");
} finally {
RxJavaPlugins.reset();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public void cancel() throws Exception {
cd.dispose();
cd.dispose();

TestHelper.assertError(list, 0, TestException.class);
TestHelper.assertUndeliverable(list, 0, TestException.class);
} finally {
RxJavaPlugins.reset();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ public void errorAfterCancel() {
try {
fa.onError(new TestException(), bs);

TestHelper.assertError(errors, 0, TestException.class);
TestHelper.assertUndeliverable(errors, 0, TestException.class);
} finally {
RxJavaPlugins.reset();
}
Expand All @@ -90,7 +90,7 @@ public void cancelAfterError() {
fa.dispose();

fa.drain();
TestHelper.assertError(errors, 0, TestException.class);
TestHelper.assertUndeliverable(errors, 0, TestException.class);
} finally {
RxJavaPlugins.reset();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ public void accept(Disposable s) throws Exception {

assertTrue(o.isDisposed());

TestHelper.assertError(errors, 0, TestException.class);
TestHelper.assertUndeliverable(errors, 0, TestException.class);
} finally {
RxJavaPlugins.reset();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ public void run() {
to.assertFailure(TestException.class);

if (!errors.isEmpty()) {
TestHelper.assertError(errors, 0, TestException.class);
TestHelper.assertUndeliverable(errors, 0, TestException.class);
}
} finally {
RxJavaPlugins.reset();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ public void run() {
to.assertFailure(TestException.class);

if (!errors.isEmpty()) {
TestHelper.assertError(errors, 0, TestException.class);
TestHelper.assertUndeliverable(errors, 0, TestException.class);
}
} finally {
RxJavaPlugins.reset();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ public void errorAfterCancel() {

to.assertEmpty();

TestHelper.assertError(errors, 0, TestException.class);
TestHelper.assertUndeliverable(errors, 0, TestException.class);
} finally {
RxJavaPlugins.reset();
}
Expand Down
Loading

0 comments on commit 8819cc9

Please sign in to comment.