-
Notifications
You must be signed in to change notification settings - Fork 7.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add onDropped callback for throttleFirst - addresses #7481 #7482
Conversation
src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableThrottleFirstTimed.java
Show resolved
Hide resolved
onDropped.accept(t); | ||
} catch (Throwable ex) { | ||
Exceptions.throwIfFatal(ex); | ||
upstream.dispose(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will hang the sequence because the error is swallowed. Also there is no cleanup of the worker.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So we have to do downstream.onError(t) to propagate this, right? And .dispose() of the worker too?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should be addressed now, thx for the review
src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableThrottleFirstTest.java
Show resolved
Hide resolved
...est/java/io/reactivex/rxjava3/internal/operators/observable/ObservableThrottleFirstTest.java
Show resolved
Hide resolved
Codecov Report
@@ Coverage Diff @@
## 3.x #7482 +/- ##
============================================
+ Coverage 99.52% 99.55% +0.02%
- Complexity 6788 6790 +2
============================================
Files 752 752
Lines 47586 47611 +25
Branches 6394 6396 +2
============================================
+ Hits 47359 47398 +39
+ Misses 107 104 -3
+ Partials 120 109 -11
📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more |
[![Mend Renovate](https://app.renovatebot.com/images/banner.svg)](https://renovatebot.com) This PR contains the following updates: | Package | Change | Age | Adoption | Passing | Confidence | |---|---|---|---|---|---| | [io.reactivex.rxjava3:rxjava](https://github.com/ReactiveX/RxJava) | `3.1.5` -> `3.1.6` | [![age](https://badges.renovateapi.com/packages/maven/io.reactivex.rxjava3:rxjava/3.1.6/age-slim)](https://docs.renovatebot.com/merge-confidence/) | [![adoption](https://badges.renovateapi.com/packages/maven/io.reactivex.rxjava3:rxjava/3.1.6/adoption-slim)](https://docs.renovatebot.com/merge-confidence/) | [![passing](https://badges.renovateapi.com/packages/maven/io.reactivex.rxjava3:rxjava/3.1.6/compatibility-slim/3.1.5)](https://docs.renovatebot.com/merge-confidence/) | [![confidence](https://badges.renovateapi.com/packages/maven/io.reactivex.rxjava3:rxjava/3.1.6/confidence-slim/3.1.5)](https://docs.renovatebot.com/merge-confidence/) | --- ### ⚠ Dependency Lookup Warnings ⚠ Warnings were logged while processing this repo. Please check the Dependency Dashboard for more information. --- ### Release Notes <details> <summary>ReactiveX/RxJava</summary> ### [`v3.1.6`](https://github.com/ReactiveX/RxJava/releases/tag/v3.1.6) [Maven](http://search.maven.org/#artifactdetails%7Cio.reactivex.rxjava3%7Crxjava%7C3.1.6%7C) [JavaDocs](http://reactivex.io/RxJava/3.x/javadoc/3.1.6) ##### API changes - Add an overload to `throttleLatest` operator with an `onDropped` callback. (<a href='https://github.com/ReactiveX/RxJava/issues/7457'>[#​7457](https://github.com/ReactiveX/RxJava/issues/7457)</a>) - Add an overload to `throttleFirst` operator with an `onDropped` callback. (<a href='https://github.com/ReactiveX/RxJava/issues/7482'>[#​7482](https://github.com/ReactiveX/RxJava/issues/7482)</a>) - Add an overload to `throttleLast` operator with an `onDropped` callback. (<a href='https://github.com/ReactiveX/RxJava/issues/7488'>[#​7488](https://github.com/ReactiveX/RxJava/issues/7488)</a>) - Add an overload to `throttleWithTimeout` operator with an `onDropped` callback. (<a href='https://github.com/ReactiveX/RxJava/issues/7510'>[#​7510](https://github.com/ReactiveX/RxJava/issues/7510)</a>) ##### Bugfixes - Fix a race condition in `Single.timeout` that prevented the timeout signal from happening. (<a href='https://github.com/ReactiveX/RxJava/issues/7515'>[#​7515](https://github.com/ReactiveX/RxJava/issues/7515)</a>) ##### Documentation - Fix formatting in `TestObserver/Consumer/Subscriber` javadoc html. (<a href='https://github.com/ReactiveX/RxJava/issues/7442'>[#​7442](https://github.com/ReactiveX/RxJava/issues/7442)</a>) - Fix typo in `BehaviorSubject.java`. (<a href='https://github.com/ReactiveX/RxJava/issues/7452'>[#​7452](https://github.com/ReactiveX/RxJava/issues/7452)</a>) - Fix grammar about cancellation in `Schedulers` javadoc. (<a href='https://github.com/ReactiveX/RxJava/issues/7453'>[#​7453](https://github.com/ReactiveX/RxJava/issues/7453)</a>) - Change `@coded` tag to `@code` tag. (<a href='https://github.com/ReactiveX/RxJava/issues/7463'>[#​7463](https://github.com/ReactiveX/RxJava/issues/7463)</a>) - Fix `fromCompletionStage` javadoc. (<a href='https://github.com/ReactiveX/RxJava/issues/7508'>[#​7508](https://github.com/ReactiveX/RxJava/issues/7508)</a>) ##### Other - Add missing `@NonNull` annotation to `Maybe` type argument. (<a href='https://github.com/ReactiveX/RxJava/issues/7436'>[#​7436](https://github.com/ReactiveX/RxJava/issues/7436)</a>) - Remove redundant interface declarations. (<a href='https://github.com/ReactiveX/RxJava/issues/7438'>[#​7438](https://github.com/ReactiveX/RxJava/issues/7438)</a>) - Standardize `MissingBackpressureException` message, introduce `QueueOverflowException`. (<a href='https://github.com/ReactiveX/RxJava/issues/7459'>[#​7459](https://github.com/ReactiveX/RxJava/issues/7459)</a>) - Update `Flowable.throttleLatest` `MissingBackpressureException` message. (<a href='https://github.com/ReactiveX/RxJava/issues/7460'>[#​7460](https://github.com/ReactiveX/RxJava/issues/7460)</a>) - Fix cancellation order in `throttleFirst`. (<a href='https://github.com/ReactiveX/RxJava/issues/7484'>[#​7484](https://github.com/ReactiveX/RxJava/issues/7484)</a>) </details> --- ### Configuration 📅 **Schedule**: Branch creation - At any time (no schedule defined), Automerge - At any time (no schedule defined). 🚦 **Automerge**: Disabled by config. Please merge this manually once you are satisfied. ♻ **Rebasing**: Whenever PR becomes conflicted, or you tick the rebase/retry checkbox. 🔕 **Ignore**: Close this PR and you won't be reminded about this update again. --- - [ ] <!-- rebase-check -->If you want to rebase/retry this PR, check this box --- This PR has been generated by [Mend Renovate](https://www.mend.io/free-developer-tools/renovate/). View repository job log [here](https://app.renovatebot.com/dashboard#github/Goooler/DemoApp). <!--renovate-debug:eyJjcmVhdGVkSW5WZXIiOiIzNC4xMDUuMyIsInVwZGF0ZWRJblZlciI6IjM0LjEwNS4zIn0=--> Co-authored-by: renovate[bot] <29139614+renovate[bot]@users.noreply.github.com>
This PR contains the following updates: | Package | Type | Update | Change | |---|---|---|---| | [io.reactivex.rxjava3:rxjava](https://github.com/ReactiveX/RxJava) | dependencies | patch | `3.1.5` -> `3.1.6` | --- ### Release Notes <details> <summary>ReactiveX/RxJava</summary> ### [`v3.1.6`](https://github.com/ReactiveX/RxJava/releases/tag/v3.1.6) [Maven](http://search.maven.org/#artifactdetails%7Cio.reactivex.rxjava3%7Crxjava%7C3.1.6%7C) [JavaDocs](http://reactivex.io/RxJava/3.x/javadoc/3.1.6) ##### API changes - Add an overload to `throttleLatest` operator with an `onDropped` callback. (<a href='https://github.com/ReactiveX/RxJava/issues/7457'>[#​7457](https://github.com/ReactiveX/RxJava/issues/7457)</a>) - Add an overload to `throttleFirst` operator with an `onDropped` callback. (<a href='https://github.com/ReactiveX/RxJava/issues/7482'>[#​7482](https://github.com/ReactiveX/RxJava/issues/7482)</a>) - Add an overload to `throttleLast` operator with an `onDropped` callback. (<a href='https://github.com/ReactiveX/RxJava/issues/7488'>[#​7488](https://github.com/ReactiveX/RxJava/issues/7488)</a>) - Add an overload to `throttleWithTimeout` operator with an `onDropped` callback. (<a href='https://github.com/ReactiveX/RxJava/issues/7510'>[#​7510](https://github.com/ReactiveX/RxJava/issues/7510)</a>) ##### Bugfixes - Fix a race condition in `Single.timeout` that prevented the timeout signal from happening. (<a href='https://github.com/ReactiveX/RxJava/issues/7515'>[#​7515](https://github.com/ReactiveX/RxJava/issues/7515)</a>) ##### Documentation - Fix formatting in `TestObserver/Consumer/Subscriber` javadoc html. (<a href='https://github.com/ReactiveX/RxJava/issues/7442'>[#​7442](https://github.com/ReactiveX/RxJava/issues/7442)</a>) - Fix typo in `BehaviorSubject.java`. (<a href='https://github.com/ReactiveX/RxJava/issues/7452'>[#​7452](https://github.com/ReactiveX/RxJava/issues/7452)</a>) - Fix grammar about cancellation in `Schedulers` javadoc. (<a href='https://github.com/ReactiveX/RxJava/issues/7453'>[#​7453](https://github.com/ReactiveX/RxJava/issues/7453)</a>) - Change `@coded` tag to `@code` tag. (<a href='https://github.com/ReactiveX/RxJava/issues/7463'>[#​7463](https://github.com/ReactiveX/RxJava/issues/7463)</a>) - Fix `fromCompletionStage` javadoc. (<a href='https://github.com/ReactiveX/RxJava/issues/7508'>[#​7508](https://github.com/ReactiveX/RxJava/issues/7508)</a>) ##### Other - Add missing `@NonNull` annotation to `Maybe` type argument. (<a href='https://github.com/ReactiveX/RxJava/issues/7436'>[#​7436](https://github.com/ReactiveX/RxJava/issues/7436)</a>) - Remove redundant interface declarations. (<a href='https://github.com/ReactiveX/RxJava/issues/7438'>[#​7438](https://github.com/ReactiveX/RxJava/issues/7438)</a>) - Standardize `MissingBackpressureException` message, introduce `QueueOverflowException`. (<a href='https://github.com/ReactiveX/RxJava/issues/7459'>[#​7459](https://github.com/ReactiveX/RxJava/issues/7459)</a>) - Update `Flowable.throttleLatest` `MissingBackpressureException` message. (<a href='https://github.com/ReactiveX/RxJava/issues/7460'>[#​7460](https://github.com/ReactiveX/RxJava/issues/7460)</a>) - Fix cancellation order in `throttleFirst`. (<a href='https://github.com/ReactiveX/RxJava/issues/7484'>[#​7484](https://github.com/ReactiveX/RxJava/issues/7484)</a>) </details> --- ### Configuration 📅 **Schedule**: Branch creation - At any time (no schedule defined), Automerge - At any time (no schedule defined). 🚦 **Automerge**: Disabled by config. Please merge this manually once you are satisfied. ♻ **Rebasing**: Whenever PR becomes conflicted, or you tick the rebase/retry checkbox. 🔕 **Ignore**: Close this PR and you won't be reminded about this update again. --- - [ ] <!-- rebase-check -->If you want to rebase/retry this PR, click this checkbox. --- This PR has been generated by [Renovate Bot](https://github.com/renovatebot/renovate). <!--renovate-debug:eyJjcmVhdGVkSW5WZXIiOiIzMi4xOTAuNiIsInVwZGF0ZWRJblZlciI6IjMyLjE5MC42In0=-->
This PR contains the following updates: | Package | Type | Update | Change | |---|---|---|---| | [io.reactivex.rxjava3:rxjava](https://github.com/ReactiveX/RxJava) | dependencies | patch | `3.1.5` -> `3.1.6` | --- ### Release Notes <details> <summary>ReactiveX/RxJava</summary> ### [`v3.1.6`](https://github.com/ReactiveX/RxJava/releases/tag/v3.1.6) [Maven](http://search.maven.org/#artifactdetails%7Cio.reactivex.rxjava3%7Crxjava%7C3.1.6%7C) [JavaDocs](http://reactivex.io/RxJava/3.x/javadoc/3.1.6) ##### API changes - Add an overload to `throttleLatest` operator with an `onDropped` callback. (<a href='https://github.com/ReactiveX/RxJava/issues/7457'>[#​7457](https://github.com/ReactiveX/RxJava/issues/7457)</a>) - Add an overload to `throttleFirst` operator with an `onDropped` callback. (<a href='https://github.com/ReactiveX/RxJava/issues/7482'>[#​7482](https://github.com/ReactiveX/RxJava/issues/7482)</a>) - Add an overload to `throttleLast` operator with an `onDropped` callback. (<a href='https://github.com/ReactiveX/RxJava/issues/7488'>[#​7488](https://github.com/ReactiveX/RxJava/issues/7488)</a>) - Add an overload to `throttleWithTimeout` operator with an `onDropped` callback. (<a href='https://github.com/ReactiveX/RxJava/issues/7510'>[#​7510](https://github.com/ReactiveX/RxJava/issues/7510)</a>) ##### Bugfixes - Fix a race condition in `Single.timeout` that prevented the timeout signal from happening. (<a href='https://github.com/ReactiveX/RxJava/issues/7515'>[#​7515](https://github.com/ReactiveX/RxJava/issues/7515)</a>) ##### Documentation - Fix formatting in `TestObserver/Consumer/Subscriber` javadoc html. (<a href='https://github.com/ReactiveX/RxJava/issues/7442'>[#​7442](https://github.com/ReactiveX/RxJava/issues/7442)</a>) - Fix typo in `BehaviorSubject.java`. (<a href='https://github.com/ReactiveX/RxJava/issues/7452'>[#​7452](https://github.com/ReactiveX/RxJava/issues/7452)</a>) - Fix grammar about cancellation in `Schedulers` javadoc. (<a href='https://github.com/ReactiveX/RxJava/issues/7453'>[#​7453](https://github.com/ReactiveX/RxJava/issues/7453)</a>) - Change `@coded` tag to `@code` tag. (<a href='https://github.com/ReactiveX/RxJava/issues/7463'>[#​7463](https://github.com/ReactiveX/RxJava/issues/7463)</a>) - Fix `fromCompletionStage` javadoc. (<a href='https://github.com/ReactiveX/RxJava/issues/7508'>[#​7508](https://github.com/ReactiveX/RxJava/issues/7508)</a>) ##### Other - Add missing `@NonNull` annotation to `Maybe` type argument. (<a href='https://github.com/ReactiveX/RxJava/issues/7436'>[#​7436](https://github.com/ReactiveX/RxJava/issues/7436)</a>) - Remove redundant interface declarations. (<a href='https://github.com/ReactiveX/RxJava/issues/7438'>[#​7438](https://github.com/ReactiveX/RxJava/issues/7438)</a>) - Standardize `MissingBackpressureException` message, introduce `QueueOverflowException`. (<a href='https://github.com/ReactiveX/RxJava/issues/7459'>[#​7459](https://github.com/ReactiveX/RxJava/issues/7459)</a>) - Update `Flowable.throttleLatest` `MissingBackpressureException` message. (<a href='https://github.com/ReactiveX/RxJava/issues/7460'>[#​7460](https://github.com/ReactiveX/RxJava/issues/7460)</a>) - Fix cancellation order in `throttleFirst`. (<a href='https://github.com/ReactiveX/RxJava/issues/7484'>[#​7484](https://github.com/ReactiveX/RxJava/issues/7484)</a>) </details> --- ### Configuration 📅 **Schedule**: Branch creation - At any time (no schedule defined), Automerge - At any time (no schedule defined). 🚦 **Automerge**: Disabled by config. Please merge this manually once you are satisfied. ♻ **Rebasing**: Whenever PR becomes conflicted, or you tick the rebase/retry checkbox. 🔕 **Ignore**: Close this PR and you won't be reminded about this update again. --- - [ ] <!-- rebase-check -->If you want to rebase/retry this PR, click this checkbox. --- This PR has been generated by [Renovate Bot](https://github.com/renovatebot/renovate). <!--renovate-debug:eyJjcmVhdGVkSW5WZXIiOiIzMi4xOTAuNiIsInVwZGF0ZWRJblZlciI6IjMyLjE5MC42In0=-->
This PR adds an overload to the throttleFirst operator which takes an onDropped consumer.
This consumer is called when we're still within the gated time window when any new item isn't passed to downstream.
If the onDropped crashes, the exception is either delivered to the downstream (if the sequence is still live at that point) or to the global error handler (sequence is stopped).
Related: #7458