From c5cd88d15da3c82d5890d078a0e067794a557730 Mon Sep 17 00:00:00 2001 From: akarnokd Date: Tue, 25 Jul 2017 15:55:45 +0200 Subject: [PATCH] 2.x: add missing null check to fused Observable.fromCallable --- .../observable/ObservableFromCallable.java | 2 +- .../flowable/FlowableFromCallableTest.java | 82 ++++++ .../ObservableFromCallableTest.java | 242 ++++++++++++++++++ 3 files changed, 325 insertions(+), 1 deletion(-) create mode 100644 src/test/java/io/reactivex/internal/operators/observable/ObservableFromCallableTest.java diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableFromCallable.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableFromCallable.java index cb2671b852..aaa3756ec2 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableFromCallable.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableFromCallable.java @@ -54,6 +54,6 @@ public void subscribeActual(Observer s) { @Override public T call() throws Exception { - return callable.call(); + return ObjectHelper.requireNonNull(callable.call(), "The callable returned a null value"); } } diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableFromCallableTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableFromCallableTest.java index 9e93e15e4d..bb51c905aa 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableFromCallableTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableFromCallableTest.java @@ -17,6 +17,7 @@ package io.reactivex.internal.operators.flowable; import static org.mockito.Mockito.*; +import static org.junit.Assert.*; import java.util.concurrent.*; @@ -26,6 +27,7 @@ import org.reactivestreams.*; import io.reactivex.*; +import io.reactivex.functions.Function; import io.reactivex.schedulers.Schedulers; import io.reactivex.subscribers.TestSubscriber; @@ -156,4 +158,84 @@ public Object call() throws Exception { verify(observer).onError(checkedException); verifyNoMoreInteractions(observer); } + + @Test + public void fusedFlatMapExecution() { + final int[] calls = { 0 }; + + Flowable.just(1).flatMap(new Function>() { + @Override + public Publisher apply(Integer v) + throws Exception { + return Flowable.fromCallable(new Callable() { + @Override + public Object call() throws Exception { + return ++calls[0]; + } + }); + } + }) + .test() + .assertResult(1); + + assertEquals(1, calls[0]); + } + + @Test + public void fusedFlatMapExecutionHidden() { + final int[] calls = { 0 }; + + Flowable.just(1).hide().flatMap(new Function>() { + @Override + public Publisher apply(Integer v) + throws Exception { + return Flowable.fromCallable(new Callable() { + @Override + public Object call() throws Exception { + return ++calls[0]; + } + }); + } + }) + .test() + .assertResult(1); + + assertEquals(1, calls[0]); + } + + @Test + public void fusedFlatMapNull() { + Flowable.just(1).flatMap(new Function>() { + @Override + public Publisher apply(Integer v) + throws Exception { + return Flowable.fromCallable(new Callable() { + @Override + public Object call() throws Exception { + return null; + } + }); + } + }) + .test() + .assertFailure(NullPointerException.class); + } + + @Test + public void fusedFlatMapNullHidden() { + Flowable.just(1).hide().flatMap(new Function>() { + @Override + public Publisher apply(Integer v) + throws Exception { + return Flowable.fromCallable(new Callable() { + @Override + public Object call() throws Exception { + return null; + } + }); + } + }) + .test() + .assertFailure(NullPointerException.class); + } } diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableFromCallableTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableFromCallableTest.java new file mode 100644 index 0000000000..d0f8234da6 --- /dev/null +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableFromCallableTest.java @@ -0,0 +1,242 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.reactivex.internal.operators.observable; + +import static org.junit.Assert.assertEquals; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.*; + +import java.util.concurrent.*; + +import org.junit.Test; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +import io.reactivex.*; +import io.reactivex.disposables.Disposable; +import io.reactivex.functions.Function; +import io.reactivex.observers.TestObserver; +import io.reactivex.schedulers.Schedulers; + +public class ObservableFromCallableTest { + + @SuppressWarnings("unchecked") + @Test + public void shouldNotInvokeFuncUntilSubscription() throws Exception { + Callable func = mock(Callable.class); + + when(func.call()).thenReturn(new Object()); + + Observable fromCallableObservable = Observable.fromCallable(func); + + verifyZeroInteractions(func); + + fromCallableObservable.subscribe(); + + verify(func).call(); + } + + @SuppressWarnings("unchecked") + @Test + public void shouldCallOnNextAndOnCompleted() throws Exception { + Callable func = mock(Callable.class); + + when(func.call()).thenReturn("test_value"); + + Observable fromCallableObservable = Observable.fromCallable(func); + + Observer observer = TestHelper.mockObserver(); + + fromCallableObservable.subscribe(observer); + + verify(observer).onNext("test_value"); + verify(observer).onComplete(); + verify(observer, never()).onError(any(Throwable.class)); + } + + @SuppressWarnings("unchecked") + @Test + public void shouldCallOnError() throws Exception { + Callable func = mock(Callable.class); + + Throwable throwable = new IllegalStateException("Test exception"); + when(func.call()).thenThrow(throwable); + + Observable fromCallableObservable = Observable.fromCallable(func); + + Observer observer = TestHelper.mockObserver(); + + fromCallableObservable.subscribe(observer); + + verify(observer, never()).onNext(any()); + verify(observer, never()).onComplete(); + verify(observer).onError(throwable); + } + + @SuppressWarnings("unchecked") + @Test + public void shouldNotDeliverResultIfSubscriberUnsubscribedBeforeEmission() throws Exception { + Callable func = mock(Callable.class); + + final CountDownLatch funcLatch = new CountDownLatch(1); + final CountDownLatch observerLatch = new CountDownLatch(1); + + when(func.call()).thenAnswer(new Answer() { + @Override + public String answer(InvocationOnMock invocation) throws Throwable { + observerLatch.countDown(); + + try { + funcLatch.await(); + } catch (InterruptedException e) { + // It's okay, unsubscription causes Thread interruption + + // Restoring interruption status of the Thread + Thread.currentThread().interrupt(); + } + + return "should_not_be_delivered"; + } + }); + + Observable fromCallableObservable = Observable.fromCallable(func); + + Observer observer = TestHelper.mockObserver(); + + TestObserver outer = new TestObserver(observer); + + fromCallableObservable + .subscribeOn(Schedulers.computation()) + .subscribe(outer); + + // Wait until func will be invoked + observerLatch.await(); + + // Unsubscribing before emission + outer.cancel(); + + // Emitting result + funcLatch.countDown(); + + // func must be invoked + verify(func).call(); + + // Observer must not be notified at all + verify(observer).onSubscribe(any(Disposable.class)); + verifyNoMoreInteractions(observer); + } + + @Test + public void shouldAllowToThrowCheckedException() { + final Exception checkedException = new Exception("test exception"); + + Observable fromCallableObservable = Observable.fromCallable(new Callable() { + @Override + public Object call() throws Exception { + throw checkedException; + } + }); + + Observer observer = TestHelper.mockObserver(); + + fromCallableObservable.subscribe(observer); + + verify(observer).onSubscribe(any(Disposable.class)); + verify(observer).onError(checkedException); + verifyNoMoreInteractions(observer); + } + + @Test + public void fusedFlatMapExecution() { + final int[] calls = { 0 }; + + Observable.just(1).flatMap(new Function>() { + @Override + public ObservableSource apply(Integer v) + throws Exception { + return Observable.fromCallable(new Callable() { + @Override + public Object call() throws Exception { + return ++calls[0]; + } + }); + } + }) + .test() + .assertResult(1); + + assertEquals(1, calls[0]); + } + + @Test + public void fusedFlatMapExecutionHidden() { + final int[] calls = { 0 }; + + Observable.just(1).hide().flatMap(new Function>() { + @Override + public ObservableSource apply(Integer v) + throws Exception { + return Observable.fromCallable(new Callable() { + @Override + public Object call() throws Exception { + return ++calls[0]; + } + }); + } + }) + .test() + .assertResult(1); + + assertEquals(1, calls[0]); + } + + @Test + public void fusedFlatMapNull() { + Observable.just(1).flatMap(new Function>() { + @Override + public ObservableSource apply(Integer v) + throws Exception { + return Observable.fromCallable(new Callable() { + @Override + public Object call() throws Exception { + return null; + } + }); + } + }) + .test() + .assertFailure(NullPointerException.class); + } + + @Test + public void fusedFlatMapNullHidden() { + Observable.just(1).hide().flatMap(new Function>() { + @Override + public ObservableSource apply(Integer v) + throws Exception { + return Observable.fromCallable(new Callable() { + @Override + public Object call() throws Exception { + return null; + } + }); + } + }) + .test() + .assertFailure(NullPointerException.class); + } +}