Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

2.x: fix Single.timeout unnecessary dispose calls #5586

Merged
merged 2 commits into from
Sep 10, 2017
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,11 @@
package io.reactivex.internal.operators.single;

import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;

import io.reactivex.*;
import io.reactivex.disposables.*;
import io.reactivex.disposables.Disposable;
import io.reactivex.internal.disposables.DisposableHelper;

public final class SingleTimeout<T> extends Single<T> {

Expand All @@ -43,97 +44,116 @@ public SingleTimeout(SingleSource<T> source, long timeout, TimeUnit unit, Schedu
@Override
protected void subscribeActual(final SingleObserver<? super T> s) {

final CompositeDisposable set = new CompositeDisposable();
s.onSubscribe(set);
TimeoutMainObserver<T> parent = new TimeoutMainObserver<T>(s, other);
s.onSubscribe(parent);

final AtomicBoolean once = new AtomicBoolean();
DisposableHelper.replace(parent.task, scheduler.scheduleDirect(parent, timeout, unit));

Disposable timer = scheduler.scheduleDirect(new TimeoutDispose(once, set, s), timeout, unit);
source.subscribe(parent);
}

set.add(timer);
static final class TimeoutMainObserver<T> extends AtomicReference<Disposable>
implements SingleObserver<T>, Runnable, Disposable {

source.subscribe(new TimeoutObserver(once, set, s));
private static final long serialVersionUID = 37497744973048446L;

}
final SingleObserver<? super T> actual;

final class TimeoutDispose implements Runnable {
private final AtomicBoolean once;
final CompositeDisposable set;
final SingleObserver<? super T> s;
final AtomicReference<Disposable> task;

TimeoutDispose(AtomicBoolean once, CompositeDisposable set, SingleObserver<? super T> s) {
this.once = once;
this.set = set;
this.s = s;
}
final TimeoutFallbackObserver<T> fallback;

@Override
public void run() {
if (once.compareAndSet(false, true)) {
if (other != null) {
set.clear();
other.subscribe(new TimeoutObserver());
} else {
set.dispose();
s.onError(new TimeoutException());
}
}
}
SingleSource<? extends T> other;

final class TimeoutObserver implements SingleObserver<T> {
static final class TimeoutFallbackObserver<T> extends AtomicReference<Disposable>
implements SingleObserver<T> {

@Override
public void onError(Throwable e) {
set.dispose();
s.onError(e);
private static final long serialVersionUID = 2071387740092105509L;
final SingleObserver<? super T> actual;

TimeoutFallbackObserver(SingleObserver<? super T> actual) {
this.actual = actual;
}

@Override
public void onSubscribe(Disposable d) {
set.add(d);
DisposableHelper.setOnce(this, d);
}

@Override
public void onSuccess(T value) {
set.dispose();
s.onSuccess(value);
public void onSuccess(T t) {
actual.onSuccess(t);
}

@Override
public void onError(Throwable e) {
actual.onError(e);
}
}
}

final class TimeoutObserver implements SingleObserver<T> {
TimeoutMainObserver(SingleObserver<? super T> actual, SingleSource<? extends T> other) {
this.actual = actual;
this.other = other;
this.task = new AtomicReference<Disposable>();
if (other != null) {
this.fallback = new TimeoutFallbackObserver<T>(actual);
} else {
this.fallback = null;
}
}

private final AtomicBoolean once;
private final CompositeDisposable set;
private final SingleObserver<? super T> s;
@Override
public void run() {
Disposable d = get();
if (d != DisposableHelper.DISPOSED && compareAndSet(d, DisposableHelper.DISPOSED)) {
if (d != null) {
d.dispose();
}
SingleSource<? extends T> other = this.other;
if (other == null) {
actual.onError(new TimeoutException());
} else {
this.other = null;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, are you trying to avoid memory leak here? Reference to other is still held as a final field in SingleTimeout

I mean, final reference to other in TimeoutMainObserver will make code slightly more readable

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The outer class has to reference other because of multiple subscribers. The inner observer has no use to the other once it has switched to it. There was an issue that some time ago that other was some cached resource that didn't go away when the user wanted it. This will somewhat help in that situation.

other.subscribe(fallback);
}
}
}

TimeoutObserver(AtomicBoolean once, CompositeDisposable set, SingleObserver<? super T> s) {
this.once = once;
this.set = set;
this.s = s;
@Override
public void onSubscribe(Disposable d) {
DisposableHelper.setOnce(this, d);
}

@Override
public void onError(Throwable e) {
if (once.compareAndSet(false, true)) {
set.dispose();
s.onError(e);
public void onSuccess(T t) {
Disposable d = get();
if (d != DisposableHelper.DISPOSED && compareAndSet(d, DisposableHelper.DISPOSED)) {
DisposableHelper.dispose(task);
actual.onSuccess(t);
}
}

@Override
public void onSubscribe(Disposable d) {
set.add(d);
public void onError(Throwable e) {
Disposable d = get();
if (d != DisposableHelper.DISPOSED && compareAndSet(d, DisposableHelper.DISPOSED)) {
DisposableHelper.dispose(task);
actual.onError(e);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we deliver error to RxJavaPlugins.onError() in else block?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed. Fixed.

}

@Override
public void onSuccess(T value) {
if (once.compareAndSet(false, true)) {
set.dispose();
s.onSuccess(value);
public void dispose() {
DisposableHelper.dispose(this);
DisposableHelper.dispose(task);
if (fallback != null) {
DisposableHelper.dispose(fallback);
}
}

@Override
public boolean isDisposed() {
return DisposableHelper.isDisposed(get());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,12 @@

import org.junit.Test;

import io.reactivex.Single;
import io.reactivex.*;
import io.reactivex.exceptions.TestException;
import io.reactivex.functions.Action;
import io.reactivex.observers.TestObserver;
import io.reactivex.schedulers.TestScheduler;
import io.reactivex.subjects.PublishSubject;
import io.reactivex.subjects.*;

public class SingleTimeoutTest {

Expand Down Expand Up @@ -69,4 +70,133 @@ public void mainError() {
.awaitDone(5, TimeUnit.SECONDS)
.assertFailure(TestException.class);
}

@Test
public void disposeWhenFallback() {
TestScheduler sch = new TestScheduler();

SingleSubject<Integer> subj = SingleSubject.create();

subj.timeout(1, TimeUnit.SECONDS, sch, Single.just(1))
.test(true)
.assertEmpty();

assertFalse(subj.hasObservers());
}

@Test
public void isDisposed() {
TestHelper.checkDisposed(SingleSubject.create().timeout(1, TimeUnit.DAYS));
}

@Test
public void fallbackDispose() {
TestScheduler sch = new TestScheduler();

SingleSubject<Integer> subj = SingleSubject.create();

SingleSubject<Integer> fallback = SingleSubject.create();

TestObserver<Integer> to = subj.timeout(1, TimeUnit.SECONDS, sch, fallback)
.test();

assertFalse(fallback.hasObservers());

sch.advanceTimeBy(1, TimeUnit.SECONDS);

assertFalse(subj.hasObservers());
assertTrue(fallback.hasObservers());

to.cancel();

assertFalse(fallback.hasObservers());
}

@Test
public void normalSuccessDoesntDisposeMain() {
final int[] calls = { 0 };
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I know you often use arrays for that, but AtomicInteger kinda much more readable when you need a reference to single "mutable" int :)


Single.just(1)
.doOnDispose(new Action() {
@Override
public void run() throws Exception {
calls[0]++;
}
})
.timeout(1, TimeUnit.DAYS)
.test()
.assertResult(1);

assertEquals(0, calls[0]);
}

@Test
public void successTimeoutRace() {
for (int i = 0; i < 1000; i++) {
final SingleSubject<Integer> subj = SingleSubject.create();
SingleSubject<Integer> fallback = SingleSubject.create();

final TestScheduler sch = new TestScheduler();

TestObserver<Integer> to = subj.timeout(1, TimeUnit.MILLISECONDS, sch, fallback).test();

Runnable r1 = new Runnable() {
@Override
public void run() {
subj.onSuccess(1);
}
};

Runnable r2 = new Runnable() {
@Override
public void run() {
sch.advanceTimeBy(1, TimeUnit.MILLISECONDS);
}
};

TestHelper.race(r1, r2);

if (!fallback.hasObservers()) {
to.assertResult(1);
} else {
to.assertEmpty();
}
}
}

@Test
public void errorTimeoutRace() {
final TestException ex = new TestException();

for (int i = 0; i < 1000; i++) {
final SingleSubject<Integer> subj = SingleSubject.create();
SingleSubject<Integer> fallback = SingleSubject.create();

final TestScheduler sch = new TestScheduler();

TestObserver<Integer> to = subj.timeout(1, TimeUnit.MILLISECONDS, sch, fallback).test();

Runnable r1 = new Runnable() {
@Override
public void run() {
subj.onError(ex);
}
};

Runnable r2 = new Runnable() {
@Override
public void run() {
sch.advanceTimeBy(1, TimeUnit.MILLISECONDS);
}
};

TestHelper.race(r1, r2);

if (!fallback.hasObservers()) {
to.assertFailure(TestException.class);
} else {
to.assertEmpty();
}
}
}
}