diff --git a/src/main/java/io/reactivex/internal/operators/single/SingleTimeout.java b/src/main/java/io/reactivex/internal/operators/single/SingleTimeout.java index 911d7910c6..4644f1ce10 100644 --- a/src/main/java/io/reactivex/internal/operators/single/SingleTimeout.java +++ b/src/main/java/io/reactivex/internal/operators/single/SingleTimeout.java @@ -14,10 +14,12 @@ package io.reactivex.internal.operators.single; import java.util.concurrent.*; -import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; import io.reactivex.*; -import io.reactivex.disposables.*; +import io.reactivex.disposables.Disposable; +import io.reactivex.internal.disposables.DisposableHelper; +import io.reactivex.plugins.RxJavaPlugins; public final class SingleTimeout extends Single { @@ -43,97 +45,118 @@ public SingleTimeout(SingleSource source, long timeout, TimeUnit unit, Schedu @Override protected void subscribeActual(final SingleObserver s) { - final CompositeDisposable set = new CompositeDisposable(); - s.onSubscribe(set); + TimeoutMainObserver parent = new TimeoutMainObserver(s, other); + s.onSubscribe(parent); - final AtomicBoolean once = new AtomicBoolean(); + DisposableHelper.replace(parent.task, scheduler.scheduleDirect(parent, timeout, unit)); - Disposable timer = scheduler.scheduleDirect(new TimeoutDispose(once, set, s), timeout, unit); + source.subscribe(parent); + } - set.add(timer); + static final class TimeoutMainObserver extends AtomicReference + implements SingleObserver, Runnable, Disposable { - source.subscribe(new TimeoutObserver(once, set, s)); + private static final long serialVersionUID = 37497744973048446L; - } + final SingleObserver actual; - final class TimeoutDispose implements Runnable { - private final AtomicBoolean once; - final CompositeDisposable set; - final SingleObserver s; + final AtomicReference task; - TimeoutDispose(AtomicBoolean once, CompositeDisposable set, SingleObserver s) { - this.once = once; - this.set = set; - this.s = s; - } + final TimeoutFallbackObserver fallback; - @Override - public void run() { - if (once.compareAndSet(false, true)) { - if (other != null) { - set.clear(); - other.subscribe(new TimeoutObserver()); - } else { - set.dispose(); - s.onError(new TimeoutException()); - } - } - } + SingleSource other; - final class TimeoutObserver implements SingleObserver { + static final class TimeoutFallbackObserver extends AtomicReference + implements SingleObserver { - @Override - public void onError(Throwable e) { - set.dispose(); - s.onError(e); + private static final long serialVersionUID = 2071387740092105509L; + final SingleObserver actual; + + TimeoutFallbackObserver(SingleObserver actual) { + this.actual = actual; } @Override public void onSubscribe(Disposable d) { - set.add(d); + DisposableHelper.setOnce(this, d); } @Override - public void onSuccess(T value) { - set.dispose(); - s.onSuccess(value); + public void onSuccess(T t) { + actual.onSuccess(t); } + @Override + public void onError(Throwable e) { + actual.onError(e); + } } - } - final class TimeoutObserver implements SingleObserver { + TimeoutMainObserver(SingleObserver actual, SingleSource other) { + this.actual = actual; + this.other = other; + this.task = new AtomicReference(); + if (other != null) { + this.fallback = new TimeoutFallbackObserver(actual); + } else { + this.fallback = null; + } + } - private final AtomicBoolean once; - private final CompositeDisposable set; - private final SingleObserver s; + @Override + public void run() { + Disposable d = get(); + if (d != DisposableHelper.DISPOSED && compareAndSet(d, DisposableHelper.DISPOSED)) { + if (d != null) { + d.dispose(); + } + SingleSource other = this.other; + if (other == null) { + actual.onError(new TimeoutException()); + } else { + this.other = null; + other.subscribe(fallback); + } + } + } - TimeoutObserver(AtomicBoolean once, CompositeDisposable set, SingleObserver s) { - this.once = once; - this.set = set; - this.s = s; + @Override + public void onSubscribe(Disposable d) { + DisposableHelper.setOnce(this, d); } @Override - public void onError(Throwable e) { - if (once.compareAndSet(false, true)) { - set.dispose(); - s.onError(e); + public void onSuccess(T t) { + Disposable d = get(); + if (d != DisposableHelper.DISPOSED && compareAndSet(d, DisposableHelper.DISPOSED)) { + DisposableHelper.dispose(task); + actual.onSuccess(t); } } @Override - public void onSubscribe(Disposable d) { - set.add(d); + public void onError(Throwable e) { + Disposable d = get(); + if (d != DisposableHelper.DISPOSED && compareAndSet(d, DisposableHelper.DISPOSED)) { + DisposableHelper.dispose(task); + actual.onError(e); + } else { + RxJavaPlugins.onError(e); + } } @Override - public void onSuccess(T value) { - if (once.compareAndSet(false, true)) { - set.dispose(); - s.onSuccess(value); + public void dispose() { + DisposableHelper.dispose(this); + DisposableHelper.dispose(task); + if (fallback != null) { + DisposableHelper.dispose(fallback); } } + @Override + public boolean isDisposed() { + return DisposableHelper.isDisposed(get()); + } } } diff --git a/src/test/java/io/reactivex/internal/operators/single/SingleTimeoutTest.java b/src/test/java/io/reactivex/internal/operators/single/SingleTimeoutTest.java index 0a6c590e6f..132b0076d6 100644 --- a/src/test/java/io/reactivex/internal/operators/single/SingleTimeoutTest.java +++ b/src/test/java/io/reactivex/internal/operators/single/SingleTimeoutTest.java @@ -15,15 +15,18 @@ import static org.junit.Assert.*; +import java.util.List; import java.util.concurrent.TimeUnit; import org.junit.Test; -import io.reactivex.Single; +import io.reactivex.*; import io.reactivex.exceptions.TestException; +import io.reactivex.functions.Action; import io.reactivex.observers.TestObserver; +import io.reactivex.plugins.RxJavaPlugins; import io.reactivex.schedulers.TestScheduler; -import io.reactivex.subjects.PublishSubject; +import io.reactivex.subjects.*; public class SingleTimeoutTest { @@ -69,4 +72,141 @@ public void mainError() { .awaitDone(5, TimeUnit.SECONDS) .assertFailure(TestException.class); } + + @Test + public void disposeWhenFallback() { + TestScheduler sch = new TestScheduler(); + + SingleSubject subj = SingleSubject.create(); + + subj.timeout(1, TimeUnit.SECONDS, sch, Single.just(1)) + .test(true) + .assertEmpty(); + + assertFalse(subj.hasObservers()); + } + + @Test + public void isDisposed() { + TestHelper.checkDisposed(SingleSubject.create().timeout(1, TimeUnit.DAYS)); + } + + @Test + public void fallbackDispose() { + TestScheduler sch = new TestScheduler(); + + SingleSubject subj = SingleSubject.create(); + + SingleSubject fallback = SingleSubject.create(); + + TestObserver to = subj.timeout(1, TimeUnit.SECONDS, sch, fallback) + .test(); + + assertFalse(fallback.hasObservers()); + + sch.advanceTimeBy(1, TimeUnit.SECONDS); + + assertFalse(subj.hasObservers()); + assertTrue(fallback.hasObservers()); + + to.cancel(); + + assertFalse(fallback.hasObservers()); + } + + @Test + public void normalSuccessDoesntDisposeMain() { + final int[] calls = { 0 }; + + Single.just(1) + .doOnDispose(new Action() { + @Override + public void run() throws Exception { + calls[0]++; + } + }) + .timeout(1, TimeUnit.DAYS) + .test() + .assertResult(1); + + assertEquals(0, calls[0]); + } + + @Test + public void successTimeoutRace() { + for (int i = 0; i < 1000; i++) { + final SingleSubject subj = SingleSubject.create(); + SingleSubject fallback = SingleSubject.create(); + + final TestScheduler sch = new TestScheduler(); + + TestObserver to = subj.timeout(1, TimeUnit.MILLISECONDS, sch, fallback).test(); + + Runnable r1 = new Runnable() { + @Override + public void run() { + subj.onSuccess(1); + } + }; + + Runnable r2 = new Runnable() { + @Override + public void run() { + sch.advanceTimeBy(1, TimeUnit.MILLISECONDS); + } + }; + + TestHelper.race(r1, r2); + + if (!fallback.hasObservers()) { + to.assertResult(1); + } else { + to.assertEmpty(); + } + } + } + + @Test + public void errorTimeoutRace() { + final TestException ex = new TestException(); + List errors = TestHelper.trackPluginErrors(); + try { + + for (int i = 0; i < 1000; i++) { + final SingleSubject subj = SingleSubject.create(); + SingleSubject fallback = SingleSubject.create(); + + final TestScheduler sch = new TestScheduler(); + + TestObserver to = subj.timeout(1, TimeUnit.MILLISECONDS, sch, fallback).test(); + + Runnable r1 = new Runnable() { + @Override + public void run() { + subj.onError(ex); + } + }; + + Runnable r2 = new Runnable() { + @Override + public void run() { + sch.advanceTimeBy(1, TimeUnit.MILLISECONDS); + } + }; + + TestHelper.race(r1, r2); + + if (!fallback.hasObservers()) { + to.assertFailure(TestException.class); + } else { + to.assertEmpty(); + } + if (!errors.isEmpty()) { + TestHelper.assertUndeliverable(errors, 0, TestException.class); + } + } + } finally { + RxJavaPlugins.reset(); + } + } }