Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add onDropped callback for throttleFirst - addresses #7481 #7482

Merged
merged 1 commit into from
Nov 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 42 additions & 1 deletion src/main/java/io/reactivex/rxjava3/core/Flowable.java
Original file line number Diff line number Diff line change
Expand Up @@ -17096,7 +17096,48 @@ public final Flowable<T> throttleFirst(long windowDuration, @NonNull TimeUnit un
public final Flowable<T> throttleFirst(long skipDuration, @NonNull TimeUnit unit, @NonNull Scheduler scheduler) {
Objects.requireNonNull(unit, "unit is null");
Objects.requireNonNull(scheduler, "scheduler is null");
return RxJavaPlugins.onAssembly(new FlowableThrottleFirstTimed<>(this, skipDuration, unit, scheduler));
return RxJavaPlugins.onAssembly(new FlowableThrottleFirstTimed<>(this, skipDuration, unit, scheduler, null));
}

/**
* Returns a {@code Flowable} that emits only the first item emitted by the current {@code Flowable} during sequential
* time windows of a specified duration, where the windows are managed by a specified {@link Scheduler}.
* <p>
* This differs from {@link #throttleLast} in that this only tracks the passage of time whereas
* {@link #throttleLast} ticks at scheduled intervals.
* <p>
* <img width="640" height="305" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/throttleFirst.s.v3.png" alt="">
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>This operator does not support backpressure as it uses time to control data flow.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>You specify which {@code Scheduler} this operator will use.</dd>
* </dl>
*
* @param skipDuration
* time to wait before emitting another item after emitting the last item
* @param unit
* the unit of time of {@code skipDuration}
* @param scheduler
* the {@code Scheduler} to use internally to manage the timers that handle timeout for each
* event
* @param onDropped
* called when an item doesn't get delivered to the downstream
Desislav-Petrov marked this conversation as resolved.
Show resolved Hide resolved
*
* @return the new {@code Flowable} instance
* @throws NullPointerException if {@code unit} or {@code scheduler} or {@code onDropped} is {@code null}
* @see <a href="http://reactivex.io/documentation/operators/sample.html">ReactiveX operators documentation: Sample</a>
* @see <a href="https://github.com/ReactiveX/RxJava/wiki/Backpressure">RxJava wiki: Backpressure</a>
*/
@CheckReturnValue
@NonNull
@BackpressureSupport(BackpressureKind.ERROR)
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Flowable<T> throttleFirst(long skipDuration, @NonNull TimeUnit unit, @NonNull Scheduler scheduler, @NonNull Consumer<? super T> onDropped) {
Objects.requireNonNull(unit, "unit is null");
Objects.requireNonNull(scheduler, "scheduler is null");
Objects.requireNonNull(onDropped, "onDropped is null");
return RxJavaPlugins.onAssembly(new FlowableThrottleFirstTimed<>(this, skipDuration, unit, scheduler, onDropped));
}

/**
Expand Down
39 changes: 38 additions & 1 deletion src/main/java/io/reactivex/rxjava3/core/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -14163,7 +14163,44 @@ public final Observable<T> throttleFirst(long windowDuration, @NonNull TimeUnit
public final Observable<T> throttleFirst(long skipDuration, @NonNull TimeUnit unit, @NonNull Scheduler scheduler) {
Objects.requireNonNull(unit, "unit is null");
Objects.requireNonNull(scheduler, "scheduler is null");
return RxJavaPlugins.onAssembly(new ObservableThrottleFirstTimed<>(this, skipDuration, unit, scheduler));
return RxJavaPlugins.onAssembly(new ObservableThrottleFirstTimed<>(this, skipDuration, unit, scheduler, null));
}

/**
* Returns an {@code Observable} that emits only the first item emitted by the current {@code Observable} during sequential
* time windows of a specified duration, where the windows are managed by a specified {@link Scheduler}.
* <p>
* This differs from {@link #throttleLast} in that this only tracks passage of time whereas
* {@code throttleLast} ticks at scheduled intervals.
* <p>
* <img width="640" height="305" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/throttleFirst.s.v3.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>You specify which {@code Scheduler} this operator will use.</dd>
* </dl>
*
* @param skipDuration
* time to wait before emitting another item after emitting the last item
* @param unit
* the unit of time of {@code skipDuration}
* @param scheduler
* the {@code Scheduler} to use internally to manage the timers that handle timeout for each
* event
* @param onDropped
* called when an item doesn't get delivered to the downstream
*
* @return the new {@code Observable} instance
* @throws NullPointerException if {@code unit} or {@code scheduler} or {@code onDropped} is {@code null}
* @see <a href="http://reactivex.io/documentation/operators/sample.html">ReactiveX operators documentation: Sample</a>
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
@NonNull
public final Observable<T> throttleFirst(long skipDuration, @NonNull TimeUnit unit, @NonNull Scheduler scheduler, @NonNull Consumer<? super T> onDropped) {
Objects.requireNonNull(unit, "unit is null");
Objects.requireNonNull(scheduler, "scheduler is null");
Objects.requireNonNull(onDropped, "onDropped is null");
return RxJavaPlugins.onAssembly(new ObservableThrottleFirstTimed<>(this, skipDuration, unit, scheduler, onDropped));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

import io.reactivex.rxjava3.exceptions.Exceptions;
import io.reactivex.rxjava3.functions.Consumer;
import org.reactivestreams.*;

import io.reactivex.rxjava3.core.*;
Expand All @@ -32,44 +34,53 @@ public final class FlowableThrottleFirstTimed<T> extends AbstractFlowableWithUps
final long timeout;
final TimeUnit unit;
final Scheduler scheduler;
final Consumer<? super T> onDropped;

public FlowableThrottleFirstTimed(Flowable<T> source, long timeout, TimeUnit unit, Scheduler scheduler) {
public FlowableThrottleFirstTimed(Flowable<T> source,
long timeout,
TimeUnit unit,
Scheduler scheduler,
Consumer<? super T> onDropped) {
super(source);
this.timeout = timeout;
this.unit = unit;
this.scheduler = scheduler;
this.onDropped = onDropped;
}

@Override
protected void subscribeActual(Subscriber<? super T> s) {
source.subscribe(new DebounceTimedSubscriber<>(
new SerializedSubscriber<>(s),
timeout, unit, scheduler.createWorker()));
timeout, unit, scheduler.createWorker(),
onDropped));
}

static final class DebounceTimedSubscriber<T>
extends AtomicLong
implements FlowableSubscriber<T>, Subscription, Runnable {

private static final long serialVersionUID = -9102637559663639004L;

final Subscriber<? super T> downstream;
final long timeout;
final TimeUnit unit;
final Scheduler.Worker worker;

final Consumer<? super T> onDropped;
Subscription upstream;

final SequentialDisposable timer = new SequentialDisposable();

volatile boolean gate;

boolean done;

DebounceTimedSubscriber(Subscriber<? super T> actual, long timeout, TimeUnit unit, Worker worker) {
DebounceTimedSubscriber(Subscriber<? super T> actual,
long timeout,
TimeUnit unit,
Worker worker,
Consumer<? super T> onDropped) {
this.downstream = actual;
this.timeout = timeout;
this.unit = unit;
this.worker = worker;
this.onDropped = onDropped;
}

@Override
Expand Down Expand Up @@ -106,6 +117,16 @@ public void onNext(T t) {
}

timer.replace(worker.schedule(this, timeout, unit));
} else if (onDropped != null) {
try {
onDropped.accept(t);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
downstream.onError(ex);
worker.dispose();
upstream.cancel();
done = true;
Desislav-Petrov marked this conversation as resolved.
Show resolved Hide resolved
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,27 +19,36 @@
import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.core.Scheduler.Worker;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.exceptions.Exceptions;
import io.reactivex.rxjava3.functions.Consumer;
import io.reactivex.rxjava3.internal.disposables.DisposableHelper;
import io.reactivex.rxjava3.observers.SerializedObserver;

public final class ObservableThrottleFirstTimed<T> extends AbstractObservableWithUpstream<T, T> {
final long timeout;
final TimeUnit unit;
final Scheduler scheduler;

public ObservableThrottleFirstTimed(ObservableSource<T> source,
long timeout, TimeUnit unit, Scheduler scheduler) {
final Consumer<? super T> onDropped;

public ObservableThrottleFirstTimed(
ObservableSource<T> source,
long timeout,
TimeUnit unit,
Scheduler scheduler,
Consumer<? super T> onDropped) {
super(source);
this.timeout = timeout;
this.unit = unit;
this.scheduler = scheduler;
this.onDropped = onDropped;
}

@Override
public void subscribeActual(Observer<? super T> t) {
source.subscribe(new DebounceTimedObserver<>(
new SerializedObserver<>(t),
timeout, unit, scheduler.createWorker()));
timeout, unit, scheduler.createWorker(),
onDropped));
}

static final class DebounceTimedObserver<T>
Expand All @@ -51,16 +60,21 @@ static final class DebounceTimedObserver<T>
final long timeout;
final TimeUnit unit;
final Scheduler.Worker worker;

final Consumer<? super T> onDropped;
Disposable upstream;

volatile boolean gate;

DebounceTimedObserver(Observer<? super T> actual, long timeout, TimeUnit unit, Worker worker) {
DebounceTimedObserver(
Observer<? super T> actual,
long timeout,
TimeUnit unit,
Worker worker,
Consumer<? super T> onDropped) {
this.downstream = actual;
this.timeout = timeout;
this.unit = unit;
this.worker = worker;
this.onDropped = onDropped;
}

@Override
Expand All @@ -83,6 +97,15 @@ public void onNext(T t) {
d.dispose();
}
DisposableHelper.replace(this, worker.schedule(this, timeout, unit));
} else if (onDropped != null) {
try {
onDropped.accept(t);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
downstream.onError(ex);
worker.dispose();
upstream.dispose();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will hang the sequence because the error is swallowed. Also there is no cleanup of the worker.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So we have to do downstream.onError(t) to propagate this, right? And .dispose() of the worker too?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be addressed now, thx for the review

}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.util.List;
import java.util.concurrent.TimeUnit;

import io.reactivex.rxjava3.functions.Action;
import org.junit.*;
import org.mockito.InOrder;
import org.reactivestreams.*;
Expand All @@ -44,6 +45,77 @@ public void before() {
subscriber = TestHelper.mockSubscriber();
}

@Test
public void throttlingWithDropCallbackCrashes() throws Throwable {
Flowable<String> source = Flowable.unsafeCreate(new Publisher<String>() {
@Override
public void subscribe(Subscriber<? super String> subscriber) {
subscriber.onSubscribe(new BooleanSubscription());
publishNext(subscriber, 100, "one"); // publish as it's first
publishNext(subscriber, 300, "two"); // skip as it's last within the first 400
publishNext(subscriber, 900, "three"); // publish
publishNext(subscriber, 905, "four"); // skip
publishCompleted(subscriber, 1000); // Should be published as soon as the timeout expires.
}
});

Action whenDisposed = mock(Action.class);

Flowable<String> sampled = source
.doOnCancel(whenDisposed)
.throttleFirst(400, TimeUnit.MILLISECONDS, scheduler, e -> {
if ("two".equals(e)) {
throw new TestException("forced");
}
});
sampled.subscribe(subscriber);

InOrder inOrder = inOrder(subscriber);

scheduler.advanceTimeTo(1000, TimeUnit.MILLISECONDS);
inOrder.verify(subscriber, times(1)).onNext("one");
inOrder.verify(subscriber, times(1)).onError(any(TestException.class));
inOrder.verify(subscriber, times(0)).onNext("two");
inOrder.verify(subscriber, times(0)).onNext("three");
inOrder.verify(subscriber, times(0)).onNext("four");
inOrder.verify(subscriber, times(0)).onComplete();
inOrder.verifyNoMoreInteractions();
verify(whenDisposed).run();
}

@Test
public void throttlingWithDropCallback() {
Flowable<String> source = Flowable.unsafeCreate(new Publisher<String>() {
@Override
public void subscribe(Subscriber<? super String> subscriber) {
subscriber.onSubscribe(new BooleanSubscription());
publishNext(subscriber, 100, "one"); // publish as it's first
publishNext(subscriber, 300, "two"); // skip as it's last within the first 400
publishNext(subscriber, 900, "three"); // publish
publishNext(subscriber, 905, "four"); // skip
publishCompleted(subscriber, 1000); // Should be published as soon as the timeout expires.
}
});

Observer<Object> dropCallbackObserver = TestHelper.mockObserver();
Flowable<String> sampled = source.throttleFirst(400, TimeUnit.MILLISECONDS, scheduler, dropCallbackObserver::onNext);
sampled.subscribe(subscriber);

InOrder inOrder = inOrder(subscriber);
InOrder dropCallbackOrder = inOrder(dropCallbackObserver);

scheduler.advanceTimeTo(1000, TimeUnit.MILLISECONDS);
inOrder.verify(subscriber, times(1)).onNext("one");
inOrder.verify(subscriber, times(0)).onNext("two");
dropCallbackOrder.verify(dropCallbackObserver, times(1)).onNext("two");
inOrder.verify(subscriber, times(1)).onNext("three");
inOrder.verify(subscriber, times(0)).onNext("four");
dropCallbackOrder.verify(dropCallbackObserver, times(1)).onNext("four");
inOrder.verify(subscriber, times(1)).onComplete();
inOrder.verifyNoMoreInteractions();
dropCallbackOrder.verifyNoMoreInteractions();
}

Desislav-Petrov marked this conversation as resolved.
Show resolved Hide resolved
@Test
public void throttlingWithCompleted() {
Flowable<String> source = Flowable.unsafeCreate(new Publisher<String>() {
Expand Down
Loading