Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

retry_when operator #616

Merged
merged 14 commits into from
Aug 23, 2024
Merged

retry_when operator #616

merged 14 commits into from
Aug 23, 2024

Conversation

CorentinBT
Copy link
Contributor

@CorentinBT CorentinBT commented Aug 18, 2024

Summary by CodeRabbit

  • New Features

    • Introduced a retry_when operator for improved error handling and retry logic in reactive programming.
    • Added functionality to resubscribe to observables upon error notifications, enhancing user control over data flow.
  • Tests

    • Added a comprehensive suite of unit tests for the retry_when operator to validate its functionality and ensure reliability in various scenarios.
  • Documentation

    • Created an example demonstrating the use of the retry_when operator for conditional retries based on error types, serving as a practical guide for developers.

Copy link
Contributor

coderabbitai bot commented Aug 18, 2024

Walkthrough

Walkthrough

This update introduces the retry_when operator to the ReactivePlusPlus library, enhancing error handling in reactive programming. This feature allows resubscription to observables upon error notifications, improving control flow in reactive streams. The addition includes new header files, an implementation for managing retry logic, and comprehensive unit tests to ensure reliability and performance.

Changes

Files Change Summary
src/rpp/rpp/operators.hpp, src/rpp/rpp/operators/fwd.hpp Added an include for retry_when.hpp and introduced the retry_when template function with observable constraints, enhancing error handling capabilities.
src/rpp/rpp/operators/retry_when.hpp Implemented the retry_when operator, encapsulating retry logic, observable management, and user-friendly interfaces for applying retries in error scenarios.
src/tests/rpp/test_retry_when.cpp Added unit tests for retry_when, covering various scenarios including error handling, correct subscriptions, and resource management, enhancing overall test coverage.
src/examples/rpp/doxygen/retry_when.cpp Provided an example demonstrating the use of retry_when, showcasing basic and advanced error handling and retry strategies in reactive applications.

Sequence Diagram(s)

sequenceDiagram
    participant Observable
    participant Notifier
    participant RetryLogic

    Observable->>RetryLogic: Emit value
    RetryLogic-->>Observable: Forward value
    Observable->>RetryLogic: Emit error
    RetryLogic->>Notifier: Notify error
    Notifier-->>RetryLogic: Return new observable
    RetryLogic->>Observable: Resubscribe to new observable
Loading

Poem

🐇 In the meadow where streams flow,
New retries dance, with a vibrant glow.
Errors managed, with grace so fine,
Reactive hops in perfect line!
A joyful leap through each mistake,
For every error, a fresh path we make! 🌼


Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media?

Share
Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai generate interesting stats about this repository and render them as a table.
    • @coderabbitai show all the console.log statements in this repository.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (invoked as PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Additionally, you can add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

Review details

Configuration used: CodeRabbit UI
Review profile: CHILL

Commits

Files that changed from the base of the PR and between b8b0ed5 and 4f5f845.

Files selected for processing (4)
  • src/rpp/rpp/operators.hpp (1 hunks)
  • src/rpp/rpp/operators/fwd.hpp (1 hunks)
  • src/rpp/rpp/operators/retry_when.hpp (1 hunks)
  • src/tests/rpp/test_retry_when.cpp (1 hunks)
Additional comments not posted (14)
src/rpp/rpp/operators.hpp (1)

132-132: Include directive for retry_when.hpp is appropriate.

The addition of the #include <rpp/operators/retry_when.hpp> directive is necessary for integrating the retry_when operator into the module.

src/rpp/rpp/operators/retry_when.hpp (4)

21-53: Implementation of retry_when_impl_inner_strategy looks correct.

The retry_when_impl_inner_strategy is well-implemented with appropriate handling of on_next, on_error, and on_completed methods. The use of RPP_NO_UNIQUE_ADDRESS is suitable for optimizing memory layout.


55-97: Implementation of retry_when_impl_strategy is robust.

The retry_when_impl_strategy correctly handles the error notification and resubscription logic. The use of std::optional for the notifier observable ensures safe handling of potential exceptions.


117-135: Implementation of retry_when_t is appropriate.

The retry_when_t structure provides a clean interface for applying the retry_when operator to observables. The use of perfect forwarding ensures efficient handling of the notifier.


138-157: retry_when operator function is well-defined.

The retry_when function is correctly constrained to ensure the notifier returns an observable. The documentation provides clear guidance on its usage.

src/tests/rpp/test_retry_when.cpp (8)

25-47: Test for retry_when with no error emission is comprehensive.

The test correctly verifies that the observable behaves as expected when no errors occur. The checks for subscription count and observer notifications are appropriate.


51-77: Test for retry_when with a single error is thorough.

The test accurately simulates an error scenario and verifies that the observable resubscribes correctly. The assertions ensure the operator's behavior aligns with expectations.


80-91: Test for retry_when with multiple emissions is valid.

The test checks that the observable resubscribes only once despite multiple emissions from the notifier, confirming the operator's correct behavior.


94-108: Test for retry_when with a throwing notifier is effective.

The test ensures that if the notifier throws an exception, the observable does not resubscribe and the error is propagated correctly.


111-124: Test for retry_when with an error notifier is correct.

The test verifies that when the notifier returns an error, the observable does not resubscribe and the error is handled appropriately.


127-140: Test for retry_when with an empty notifier is accurate.

The test confirms that if the notifier returns an empty observable, the original observable completes without resubscribing.


145-157: Test for retry_when copy behavior is precise.

The test ensures that the retry_when operator does not produce unnecessary copies, maintaining efficient memory usage.


159-171: Test for retry_when disposable contracts is valid.

The test verifies that the retry_when operator adheres to disposable contracts, ensuring proper resource management.

src/rpp/rpp/operators/fwd.hpp (1)

185-187: Addition of retry_when to forward declarations is appropriate.

The retry_when function is correctly constrained and its inclusion in the forward declarations aligns with the overall design of the operators module.

Copy link
Contributor

github-actions bot commented Aug 18, 2024

BENCHMARK RESULTS (AUTOGENERATED)

ci-ubuntu-gcc

General

name rxcpp rpp prev rpp ratio
Subscribe empty callbacks to empty observable 322.29 ns 2.20 ns 2.22 ns 0.99
Subscribe empty callbacks to empty observable via pipe operator 305.59 ns 2.16 ns 2.16 ns 1.00

Sources

name rxcpp rpp prev rpp ratio
from array of 1 - create + subscribe + immediate 731.99 ns 0.31 ns 0.31 ns 1.01
from array of 1 - create + subscribe + current_thread 1083.43 ns 3.73 ns 4.02 ns 0.93
concat_as_source of just(1 immediate) create + subscribe 2306.57 ns 115.03 ns 114.56 ns 1.00
defer from array of 1 - defer + create + subscribe + immediate 865.46 ns 0.31 ns 0.31 ns 1.00
interval - interval + take(3) + subscribe + immediate 2152.31 ns 59.23 ns 59.23 ns 1.00
interval - interval + take(3) + subscribe + current_thread 3220.85 ns 32.42 ns 32.42 ns 1.00
from array of 1 - create + as_blocking + subscribe + new_thread 29785.00 ns 27700.68 ns 28741.19 ns 0.96
from array of 1000 - create + as_blocking + subscribe + new_thread 40466.28 ns 51358.50 ns 49464.57 ns 1.04
concat_as_source of just(1 immediate) and just(1,2 immediate)create + subscribe 3557.33 ns 124.03 ns 132.49 ns 0.94

Filtering Operators

name rxcpp rpp prev rpp ratio
immediate_just+take(1)+subscribe 1118.31 ns 0.31 ns 0.31 ns 1.00
immediate_just+filter(true)+subscribe 895.12 ns 0.31 ns 0.31 ns 1.00
immediate_just(1,2)+skip(1)+subscribe 1051.89 ns 0.31 ns 0.31 ns 1.00
immediate_just(1,1,2)+distinct_until_changed()+subscribe 947.29 ns 0.31 ns 0.31 ns 1.00
immediate_just(1,2)+first()+subscribe 1251.39 ns 0.62 ns 0.62 ns 1.00
immediate_just(1,2)+last()+subscribe 992.22 ns 0.31 ns 0.31 ns 1.00
immediate_just+take_last(1)+subscribe 1156.44 ns 17.60 ns 17.60 ns 1.00
immediate_just(1,2,3)+element_at(1)+subscribe 893.23 ns 0.31 ns 0.31 ns 1.00

Schedulers

name rxcpp rpp prev rpp ratio
immediate scheduler create worker + schedule 268.43 ns 2.16 ns 2.16 ns 1.00
current_thread scheduler create worker + schedule 372.04 ns 5.86 ns 5.87 ns 1.00
current_thread scheduler create worker + schedule + recursive schedule 940.16 ns 56.88 ns 57.20 ns 0.99

Transforming Operators

name rxcpp rpp prev rpp ratio
immediate_just+map(v*2)+subscribe 891.83 ns 0.31 ns 0.31 ns 1.00
immediate_just+scan(10, std::plus)+subscribe 923.94 ns 0.31 ns 0.31 ns 1.00
immediate_just+flat_map(immediate_just(v*2))+subscribe 2373.80 ns 163.21 ns 162.26 ns 1.01
immediate_just+buffer(2)+subscribe 1627.74 ns 13.91 ns 13.90 ns 1.00
immediate_just+window(2)+subscribe + subscsribe inner 2480.53 ns 1035.92 ns 1095.39 ns 0.95

Conditional Operators

name rxcpp rpp prev rpp ratio
immediate_just+take_while(false)+subscribe 878.95 ns - - 0.00
immediate_just+take_while(true)+subscribe 886.33 ns 0.31 ns 0.31 ns 1.00

Utility Operators

name rxcpp rpp prev rpp ratio
immediate_just(1)+subscribe_on(immediate)+subscribe 1965.68 ns 0.31 ns 0.31 ns 1.00

Combining Operators

name rxcpp rpp prev rpp ratio
immediate_just(immediate_just(1), immediate_just(1)) + merge() + subscribe 3422.75 ns 180.94 ns 176.98 ns 1.02
immediate_just(1) + merge_with(immediate_just(2)) + subscribe 3654.95 ns 171.77 ns 165.39 ns 1.04
immediate_just(1) + with_latest_from(immediate_just(2)) + subscribe - 150.80 ns 143.08 ns 1.05
immediate_just(immediate_just(1),immediate_just(1)) + switch_on_next() + subscribe 3563.77 ns 917.32 ns 973.55 ns 0.94
immediate_just(1) + zip(immediate_just(2)) + subscribe 2159.35 ns 213.81 ns 209.29 ns 1.02

Subjects

name rxcpp rpp prev rpp ratio
publish_subject with 1 observer - on_next 34.61 ns 14.71 ns 14.99 ns 0.98
subscribe 100 observers to publish_subject 209795.40 ns 15538.99 ns 15549.80 ns 1.00
100 on_next to 100 observers to publish_subject 32696.68 ns 20171.48 ns 20082.22 ns 1.00

Scenarios

name rxcpp rpp prev rpp ratio
basic sample 1457.45 ns 12.66 ns 12.65 ns 1.00
basic sample with immediate scheduler 1419.92 ns 5.55 ns 5.55 ns 1.00

Aggregating Operators

name rxcpp rpp prev rpp ratio
immediate_just+reduce(10, std::plus)+subscribe 936.30 ns 0.31 ns 0.31 ns 1.00

Error Handling Operators

name rxcpp rpp prev rpp ratio
create(on_next(1), on_error())+on_error_resume_next(immediate_just(2)))+subscribe 2074.39 ns 976.24 ns 985.15 ns 0.99
create(on_error())+retry(1)+subscribe 673.09 ns 119.19 ns 119.02 ns 1.00

ci-macos

General

name rxcpp rpp prev rpp ratio
Subscribe empty callbacks to empty observable 982.25 ns 3.92 ns 4.78 ns 0.82
Subscribe empty callbacks to empty observable via pipe operator 989.79 ns 4.02 ns 4.78 ns 0.84

Sources

name rxcpp rpp prev rpp ratio
from array of 1 - create + subscribe + immediate 1941.74 ns 0.23 ns 0.29 ns 0.82
from array of 1 - create + subscribe + current_thread 2421.15 ns 33.57 ns 43.41 ns 0.77
concat_as_source of just(1 immediate) create + subscribe 5371.34 ns 334.48 ns 409.43 ns 0.82
defer from array of 1 - defer + create + subscribe + immediate 1943.18 ns 0.23 ns 0.29 ns 0.81
interval - interval + take(3) + subscribe + immediate 4765.79 ns 113.41 ns 141.04 ns 0.80
interval - interval + take(3) + subscribe + current_thread 5888.96 ns 94.14 ns 118.04 ns 0.80
from array of 1 - create + as_blocking + subscribe + new_thread 81811.00 ns 87220.75 ns 102220.82 ns 0.85
from array of 1000 - create + as_blocking + subscribe + new_thread 85052.07 ns 84338.77 ns 107671.22 ns 0.78
concat_as_source of just(1 immediate) and just(1,2 immediate)create + subscribe 8124.80 ns 375.84 ns 463.93 ns 0.81

Filtering Operators

name rxcpp rpp prev rpp ratio
immediate_just+take(1)+subscribe 2847.52 ns 0.23 ns 0.29 ns 0.81
immediate_just+filter(true)+subscribe 2101.50 ns 0.23 ns 0.29 ns 0.81
immediate_just(1,2)+skip(1)+subscribe 2744.08 ns 0.23 ns 0.29 ns 0.81
immediate_just(1,1,2)+distinct_until_changed()+subscribe 2070.22 ns 0.47 ns 0.58 ns 0.81
immediate_just(1,2)+first()+subscribe 3170.61 ns 0.23 ns 0.29 ns 0.81
immediate_just(1,2)+last()+subscribe 2396.18 ns 0.23 ns 0.29 ns 0.81
immediate_just+take_last(1)+subscribe 3003.91 ns 0.23 ns 0.32 ns 0.73
immediate_just(1,2,3)+element_at(1)+subscribe 2165.75 ns 0.23 ns 0.29 ns 0.81

Schedulers

name rxcpp rpp prev rpp ratio
immediate scheduler create worker + schedule 823.57 ns 4.06 ns 5.10 ns 0.79
current_thread scheduler create worker + schedule 1201.45 ns 35.81 ns 45.03 ns 0.80
current_thread scheduler create worker + schedule + recursive schedule 1968.95 ns 197.66 ns 248.11 ns 0.80

Transforming Operators

name rxcpp rpp prev rpp ratio
immediate_just+map(v*2)+subscribe 2237.96 ns 4.40 ns 5.44 ns 0.81
immediate_just+scan(10, std::plus)+subscribe 2452.41 ns 0.50 ns 0.57 ns 0.88
immediate_just+flat_map(immediate_just(v*2))+subscribe 5693.73 ns 439.86 ns 500.12 ns 0.88
immediate_just+buffer(2)+subscribe 2533.85 ns 64.55 ns 118.37 ns 0.55
immediate_just+window(2)+subscribe + subscsribe inner 5338.59 ns 2417.08 ns 2961.78 ns 0.82

Conditional Operators

name rxcpp rpp prev rpp ratio
immediate_just+take_while(false)+subscribe 2166.18 ns - - 0.00
immediate_just+take_while(true)+subscribe 2257.60 ns 0.25 ns 0.29 ns 0.88

Utility Operators

name rxcpp rpp prev rpp ratio
immediate_just(1)+subscribe_on(immediate)+subscribe 4912.77 ns 4.90 ns 6.02 ns 0.81

Combining Operators

name rxcpp rpp prev rpp ratio
immediate_just(immediate_just(1), immediate_just(1)) + merge() + subscribe 7386.34 ns 442.21 ns 541.66 ns 0.82
immediate_just(1) + merge_with(immediate_just(2)) + subscribe 8540.69 ns 441.29 ns 540.14 ns 0.82
immediate_just(1) + with_latest_from(immediate_just(2)) + subscribe - 480.98 ns 565.46 ns 0.85
immediate_just(immediate_just(1),immediate_just(1)) + switch_on_next() + subscribe 8182.92 ns 1993.27 ns 2345.32 ns 0.85
immediate_just(1) + zip(immediate_just(2)) + subscribe 5284.04 ns 814.97 ns 990.04 ns 0.82

Subjects

name rxcpp rpp prev rpp ratio
publish_subject with 1 observer - on_next 75.72 ns 50.13 ns 60.20 ns 0.83
subscribe 100 observers to publish_subject 345017.67 ns 40678.52 ns 50191.35 ns 0.81
100 on_next to 100 observers to publish_subject 50205.50 ns 16876.83 ns 20655.67 ns 0.82

Scenarios

name rxcpp rpp prev rpp ratio
basic sample 2755.78 ns 67.84 ns 83.24 ns 0.81
basic sample with immediate scheduler 2757.23 ns 18.76 ns 22.98 ns 0.82

Aggregating Operators

name rxcpp rpp prev rpp ratio
immediate_just+reduce(10, std::plus)+subscribe 2378.10 ns 0.23 ns 0.29 ns 0.81

Error Handling Operators

name rxcpp rpp prev rpp ratio
create(on_next(1), on_error())+on_error_resume_next(immediate_just(2)))+subscribe 6436.16 ns 4058.66 ns 4982.32 ns 0.81
create(on_error())+retry(1)+subscribe 1802.36 ns 295.91 ns 364.22 ns 0.81

ci-ubuntu-clang

General

name rxcpp rpp prev rpp ratio
Subscribe empty callbacks to empty observable 294.66 ns 1.56 ns 1.54 ns 1.01
Subscribe empty callbacks to empty observable via pipe operator 296.40 ns 1.56 ns 1.54 ns 1.01

Sources

name rxcpp rpp prev rpp ratio
from array of 1 - create + subscribe + immediate 611.22 ns 0.31 ns 0.31 ns 1.00
from array of 1 - create + subscribe + current_thread 839.68 ns 4.32 ns 4.32 ns 1.00
concat_as_source of just(1 immediate) create + subscribe 2437.99 ns 134.59 ns 135.22 ns 1.00
defer from array of 1 - defer + create + subscribe + immediate 829.87 ns 0.31 ns 0.31 ns 1.01
interval - interval + take(3) + subscribe + immediate 2335.02 ns 58.27 ns 58.27 ns 1.00
interval - interval + take(3) + subscribe + current_thread 3318.60 ns 30.88 ns 30.86 ns 1.00
from array of 1 - create + as_blocking + subscribe + new_thread 29871.14 ns 28405.29 ns 30085.74 ns 0.94
from array of 1000 - create + as_blocking + subscribe + new_thread 40799.48 ns 37398.96 ns 36005.55 ns 1.04
concat_as_source of just(1 immediate) and just(1,2 immediate)create + subscribe 3910.18 ns 157.35 ns 155.77 ns 1.01

Filtering Operators

name rxcpp rpp prev rpp ratio
immediate_just+take(1)+subscribe 1319.59 ns 0.31 ns 0.31 ns 1.00
immediate_just+filter(true)+subscribe 934.26 ns 0.31 ns 0.31 ns 1.00
immediate_just(1,2)+skip(1)+subscribe 1175.20 ns 0.31 ns 0.31 ns 1.00
immediate_just(1,1,2)+distinct_until_changed()+subscribe 967.74 ns 0.62 ns 0.62 ns 1.00
immediate_just(1,2)+first()+subscribe 1532.29 ns 0.31 ns 0.31 ns 1.00
immediate_just(1,2)+last()+subscribe 1133.33 ns 0.31 ns 0.31 ns 1.00
immediate_just+take_last(1)+subscribe 1296.97 ns 0.31 ns 0.31 ns 1.00
immediate_just(1,2,3)+element_at(1)+subscribe 942.95 ns 0.31 ns 0.31 ns 1.00

Schedulers

name rxcpp rpp prev rpp ratio
immediate scheduler create worker + schedule 318.78 ns 1.56 ns 1.54 ns 1.01
current_thread scheduler create worker + schedule 455.42 ns 4.63 ns 4.79 ns 0.97
current_thread scheduler create worker + schedule + recursive schedule 950.95 ns 58.15 ns 54.78 ns 1.06

Transforming Operators

name rxcpp rpp prev rpp ratio
immediate_just+map(v*2)+subscribe 946.47 ns 0.31 ns 0.31 ns 1.00
immediate_just+scan(10, std::plus)+subscribe 1059.76 ns 0.31 ns 0.31 ns 1.00
immediate_just+flat_map(immediate_just(v*2))+subscribe 2376.29 ns 139.31 ns 137.13 ns 1.02
immediate_just+buffer(2)+subscribe 1693.31 ns 14.04 ns 13.58 ns 1.03
immediate_just+window(2)+subscribe + subscsribe inner 2559.97 ns 1025.54 ns 921.17 ns 1.11

Conditional Operators

name rxcpp rpp prev rpp ratio
immediate_just+take_while(false)+subscribe 927.38 ns - - 0.00
immediate_just+take_while(true)+subscribe 922.14 ns 0.31 ns 0.31 ns 1.00

Utility Operators

name rxcpp rpp prev rpp ratio
immediate_just(1)+subscribe_on(immediate)+subscribe 2104.11 ns 0.31 ns 0.31 ns 1.00

Combining Operators

name rxcpp rpp prev rpp ratio
immediate_just(immediate_just(1), immediate_just(1)) + merge() + subscribe 3445.51 ns 158.62 ns 158.41 ns 1.00
immediate_just(1) + merge_with(immediate_just(2)) + subscribe 3958.63 ns 146.44 ns 146.37 ns 1.00
immediate_just(1) + with_latest_from(immediate_just(2)) + subscribe - 145.13 ns 144.05 ns 1.01
immediate_just(immediate_just(1),immediate_just(1)) + switch_on_next() + subscribe 3517.62 ns 848.45 ns 850.46 ns 1.00
immediate_just(1) + zip(immediate_just(2)) + subscribe 2475.94 ns 204.87 ns 205.94 ns 0.99

Subjects

name rxcpp rpp prev rpp ratio
publish_subject with 1 observer - on_next 54.03 ns 17.56 ns 17.65 ns 0.99
subscribe 100 observers to publish_subject 225228.60 ns 16156.95 ns 15997.28 ns 1.01
100 on_next to 100 observers to publish_subject 35834.72 ns 17552.43 ns 20432.88 ns 0.86

Scenarios

name rxcpp rpp prev rpp ratio
basic sample 1389.87 ns 11.72 ns 11.73 ns 1.00
basic sample with immediate scheduler 1493.59 ns 6.17 ns 6.17 ns 1.00

Aggregating Operators

name rxcpp rpp prev rpp ratio
immediate_just+reduce(10, std::plus)+subscribe 1092.96 ns 0.31 ns 0.31 ns 1.00

Error Handling Operators

name rxcpp rpp prev rpp ratio
create(on_next(1), on_error())+on_error_resume_next(immediate_just(2)))+subscribe 2193.09 ns 1210.08 ns 1207.77 ns 1.00
create(on_error())+retry(1)+subscribe 727.10 ns 145.54 ns 146.31 ns 0.99

ci-windows

General

name rxcpp rpp prev rpp ratio
Subscribe empty callbacks to empty observable 557.44 ns 4.32 ns 4.32 ns 1.00
Subscribe empty callbacks to empty observable via pipe operator 583.32 ns 4.32 ns 4.32 ns 1.00

Sources

name rxcpp rpp prev rpp ratio
from array of 1 - create + subscribe + immediate 1142.84 ns 4.63 ns 4.62 ns 1.00
from array of 1 - create + subscribe + current_thread 1415.21 ns 15.43 ns 15.44 ns 1.00
concat_as_source of just(1 immediate) create + subscribe 3710.87 ns 174.79 ns 173.72 ns 1.01
defer from array of 1 - defer + create + subscribe + immediate 1196.62 ns 4.93 ns 4.94 ns 1.00
interval - interval + take(3) + subscribe + immediate 3047.83 ns 133.54 ns 133.45 ns 1.00
interval - interval + take(3) + subscribe + current_thread 3702.17 ns 54.33 ns 58.94 ns 0.92
from array of 1 - create + as_blocking + subscribe + new_thread 121422.22 ns 112844.44 ns 113800.00 ns 0.99
from array of 1000 - create + as_blocking + subscribe + new_thread 128622.22 ns 129325.00 ns 131325.00 ns 0.98
concat_as_source of just(1 immediate) and just(1,2 immediate)create + subscribe 5367.54 ns 206.87 ns 202.37 ns 1.02

Filtering Operators

name rxcpp rpp prev rpp ratio
immediate_just+take(1)+subscribe 1832.77 ns 12.87 ns 12.88 ns 1.00
immediate_just+filter(true)+subscribe 1324.51 ns 11.70 ns 11.68 ns 1.00
immediate_just(1,2)+skip(1)+subscribe 1969.36 ns 13.07 ns 13.05 ns 1.00
immediate_just(1,1,2)+distinct_until_changed()+subscribe 1365.76 ns 15.80 ns 15.81 ns 1.00
immediate_just(1,2)+first()+subscribe 2366.53 ns 12.63 ns 12.63 ns 1.00
immediate_just(1,2)+last()+subscribe 1803.20 ns 14.13 ns 14.13 ns 1.00
immediate_just+take_last(1)+subscribe 2035.16 ns 59.39 ns 59.02 ns 1.01
immediate_just(1,2,3)+element_at(1)+subscribe 1375.64 ns 13.79 ns 13.78 ns 1.00

Schedulers

name rxcpp rpp prev rpp ratio
immediate scheduler create worker + schedule 677.06 ns 6.17 ns 6.18 ns 1.00
current_thread scheduler create worker + schedule 654.66 ns 14.15 ns 14.20 ns 1.00
current_thread scheduler create worker + schedule + recursive schedule 1092.78 ns 101.86 ns 102.15 ns 1.00

Transforming Operators

name rxcpp rpp prev rpp ratio
immediate_just+map(v*2)+subscribe 1319.05 ns 11.11 ns 10.66 ns 1.04
immediate_just+scan(10, std::plus)+subscribe 1415.93 ns 21.29 ns 21.29 ns 1.00
immediate_just+flat_map(immediate_just(v*2))+subscribe 3547.64 ns 202.07 ns 200.40 ns 1.01
immediate_just+buffer(2)+subscribe 2586.55 ns 58.15 ns 57.92 ns 1.00
immediate_just+window(2)+subscribe + subscsribe inner 4231.95 ns 1302.54 ns 1306.14 ns 1.00

Conditional Operators

name rxcpp rpp prev rpp ratio
immediate_just+take_while(false)+subscribe 1309.46 ns 11.44 ns 11.60 ns 0.99
immediate_just+take_while(true)+subscribe 1337.79 ns 11.70 ns 11.70 ns 1.00

Utility Operators

name rxcpp rpp prev rpp ratio
immediate_just(1)+subscribe_on(immediate)+subscribe 3472.97 ns 7.40 ns 7.40 ns 1.00

Combining Operators

name rxcpp rpp prev rpp ratio
immediate_just(immediate_just(1), immediate_just(1)) + merge() + subscribe 5153.95 ns 221.39 ns 222.32 ns 1.00
immediate_just(1) + merge_with(immediate_just(2)) + subscribe 5490.91 ns 215.64 ns 214.85 ns 1.00
immediate_just(1) + with_latest_from(immediate_just(2)) + subscribe - 212.32 ns 194.78 ns 1.09
immediate_just(immediate_just(1),immediate_just(1)) + switch_on_next() + subscribe 5481.82 ns 937.55 ns 958.71 ns 0.98
immediate_just(1) + zip(immediate_just(2)) + subscribe 3542.16 ns 554.89 ns 531.58 ns 1.04

Subjects

name rxcpp rpp prev rpp ratio
publish_subject with 1 observer - on_next 37.95 ns 20.09 ns 19.74 ns 1.02
subscribe 100 observers to publish_subject 268000.00 ns 28840.54 ns 28521.05 ns 1.01
100 on_next to 100 observers to publish_subject 52215.00 ns 38723.33 ns 39975.00 ns 0.97

Scenarios

name rxcpp rpp prev rpp ratio
basic sample 1897.43 ns 57.55 ns 56.93 ns 1.01
basic sample with immediate scheduler 1911.32 ns 36.75 ns 36.72 ns 1.00

Aggregating Operators

name rxcpp rpp prev rpp ratio
immediate_just+reduce(10, std::plus)+subscribe 1461.48 ns 19.97 ns 19.99 ns 1.00

Error Handling Operators

name rxcpp rpp prev rpp ratio
create(on_next(1), on_error())+on_error_resume_next(immediate_just(2)))+subscribe 1937.25 ns 343.85 ns 331.13 ns 1.04
create(on_error())+retry(1)+subscribe 1180.02 ns 146.90 ns 146.33 ns 1.00

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

Review details

Configuration used: CodeRabbit UI
Review profile: CHILL

Commits

Files that changed from the base of the PR and between 4f5f845 and 883c8a6.

Files selected for processing (2)
  • src/rpp/rpp/operators/retry_when.hpp (1 hunks)
  • src/tests/rpp/test_retry_when.cpp (1 hunks)
Files skipped from review as they are similar to previous changes (2)
  • src/rpp/rpp/operators/retry_when.hpp
  • src/tests/rpp/test_retry_when.cpp

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

Review details

Configuration used: CodeRabbit UI
Review profile: CHILL

Commits

Files that changed from the base of the PR and between 883c8a6 and 23514f8.

Files selected for processing (1)
  • src/rpp/rpp/operators/retry_when.hpp (1 hunks)
Files skipped from review as they are similar to previous changes (1)
  • src/rpp/rpp/operators/retry_when.hpp

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

Review details

Configuration used: CodeRabbit UI
Review profile: CHILL

Commits

Files that changed from the base of the PR and between 23514f8 and 00c028b.

Files selected for processing (2)
  • src/rpp/rpp/operators/retry_when.hpp (1 hunks)
  • src/tests/rpp/test_retry_when.cpp (1 hunks)
Files skipped from review as they are similar to previous changes (1)
  • src/tests/rpp/test_retry_when.cpp
Additional comments not posted (4)
src/rpp/rpp/operators/retry_when.hpp (4)

1-20: Header and includes are correct.

The file header and include directives are appropriate and complete.


82-119: Logic in retry_when_impl_strategy is correct.

The handling of on_next, on_error, and on_completed is well-implemented, and the use of try-catch for notifier exceptions is appropriate.


121-142: Logic in drain function is robust.

The drain function effectively manages subscription and resubscription, with appropriate exception handling.


170-189: retry_when operator and documentation are well-defined.

The retry_when operator is correctly implemented, and the accompanying documentation is clear and informative.

src/rpp/rpp/operators/retry_when.hpp Outdated Show resolved Hide resolved
src/rpp/rpp/operators/retry_when.hpp Outdated Show resolved Hide resolved
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

Review details

Configuration used: CodeRabbit UI
Review profile: CHILL

Commits

Files that changed from the base of the PR and between 00c028b and 0b4d9e8.

Files selected for processing (1)
  • src/rpp/rpp/operators/retry_when.hpp (1 hunks)
Additional comments not posted (5)
src/rpp/rpp/operators/retry_when.hpp (5)

48-77: Ensure on_next is called once.

Consider adding protection to ensure that on_next is called only once, possibly with a take(1) on the notifier returned observable.


145-167: Verify notifier behavior.

Ensure that the notifier behaves as expected, especially in edge cases where it might throw or return an empty observable.

Run the following script to verify the notifier behavior:

Verification successful

Notifier behavior is adequately verified.

The retry_when operator's notifier behavior is well-tested in test_retry_when.cpp, covering scenarios such as exceptions, empty observables, and resubscription logic. No further verification is needed.

Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Verify notifier behavior in the retry_when operator.

# Test: Check for proper notifier behavior.
rg --type cpp -A 5 $'retry_when'

Length of output: 13266


82-119: Verify error handling and resource management.

Ensure that the implementation correctly handles errors and manages resources efficiently to prevent leaks or unintended behavior.

Run the following script to verify error handling and resource management:

Verification successful

Error handling and resource management verified.

The retry_when operator's implementation is thoroughly tested in test_retry_when.cpp, covering various scenarios of error handling and resource management. The tests ensure that the operator behaves correctly under different conditions, including proper disposal and resubscription logic.

  • Test file: src/tests/rpp/test_retry_when.cpp
Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Verify error handling and resource management in the retry_when operator.

# Test: Check for proper error handling and resource management.
rg --type cpp -A 5 $'retry_when'

Length of output: 13266


121-142: Verify handling of infinite retries.

Ensure that the implementation correctly handles scenarios with infinite retries to prevent resource exhaustion.

Run the following script to verify the handling of infinite retries:


184-189: Verify constraints and requirements.

Ensure that the constraints and requirements for the retry_when function are correctly defined and enforced.

Run the following script to verify the constraints and requirements:

Verification successful

Verify rpp::constraint::observable Definition and Enforcement

Ensure that the constraint rpp::constraint::observable is correctly defined and enforced within the codebase to validate the retry_when function's requirements.


Verify the Definition of rpp::constraint::observable

Ensure that the rpp::constraint::observable is correctly defined to validate its use in the retry_when function.


Constraints and Requirements Verified

The constraints and requirements for the retry_when function are correctly defined and enforced. The rpp::constraint::observable ensures that the notifier returns a valid observable type, meeting the function's requirements.

  • The rpp::constraint::observable concept is defined to check if a type is derived from rpp::observable.
Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Verify constraints and requirements for the retry_when function.

# Test: Check for proper constraints and requirements.
rg --type cpp -A 5 $'retry_when'

Length of output: 13266


Script:

#!/bin/bash
# Description: Verify the definition and enforcement of `rpp::constraint::observable`.

# Search for the definition of `rpp::constraint::observable`.
rg --type cpp 'rpp::constraint::observable'

# Search for any related constraints or requirements in the codebase.
rg --type cpp 'constraint'

Length of output: 98707


Script:

#!/bin/bash
# Description: Search for the definition of `rpp::constraint::observable`.

# Locate the definition of `rpp::constraint::observable` in the codebase.
rg --type cpp 'namespace rpp::constraint' -A 10 | rg 'observable'

Length of output: 3803

src/rpp/rpp/operators/retry_when.hpp Show resolved Hide resolved
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

Review details

Configuration used: CodeRabbit UI
Review profile: CHILL

Commits

Files that changed from the base of the PR and between 0b4d9e8 and 9e3bd72.

Files selected for processing (2)
  • src/rpp/rpp/operators/retry_when.hpp (1 hunks)
  • src/tests/rpp/test_retry_when.cpp (1 hunks)
Files skipped from review as they are similar to previous changes (2)
  • src/rpp/rpp/operators/retry_when.hpp
  • src/tests/rpp/test_retry_when.cpp

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

Review details

Configuration used: CodeRabbit UI
Review profile: CHILL

Commits

Files that changed from the base of the PR and between 9e3bd72 and 868a8d6.

Files selected for processing (3)
  • src/examples/rpp/doxygen/retry_when.cpp (1 hunks)
  • src/rpp/rpp/operators.hpp (1 hunks)
  • src/rpp/rpp/operators/fwd.hpp (1 hunks)
Files skipped from review as they are similar to previous changes (2)
  • src/rpp/rpp/operators.hpp
  • src/rpp/rpp/operators/fwd.hpp
Additional comments not posted (1)
src/examples/rpp/doxygen/retry_when.cpp (1)

1-5: Ensure necessary headers are included.

The file includes <rpp/rpp.hpp>, <iostream>, and <string>. Ensure that these headers are sufficient for the functionality demonstrated and that no additional headers are needed.

Comment on lines 12 to 30
//! [retry_when delay]
size_t retry_count = 0;
rpp::source::create<std::string>([&retry_count](const auto& sub) {
if (retry_count != 4)
{
sub.on_error({});
}
else
{
sub.on_next(std::string{"success"});
sub.on_completed();
}
})
| rpp::operators::retry_when([](const std::exception_ptr&) {
return rpp::source::timer(std::chrono::seconds{5}, rpp::schedulers::current_thread{});
})
| rpp::operators::subscribe([](const std::string& v) { std::cout << v << std::endl; });
// Source observable is resubscribed after 5 seconds on each error emission
//! [retry_when delay]
Copy link
Contributor

@coderabbitai coderabbitai bot Aug 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Clarify the retry logic and increment retry_count.

The example demonstrates retrying with a delay but does not increment retry_count, which could lead to infinite retries.

Apply this diff to increment retry_count:

 rpp::source::create<std::string>([&retry_count](const auto& sub) {
     if (retry_count != 4)
     {
+        ++retry_count;
         sub.on_error({});
     }
     else
     {
         sub.on_next(std::string{"success"});
         sub.on_completed();
     }
 })
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
//! [retry_when delay]
size_t retry_count = 0;
rpp::source::create<std::string>([&retry_count](const auto& sub) {
if (retry_count != 4)
{
sub.on_error({});
}
else
{
sub.on_next(std::string{"success"});
sub.on_completed();
}
})
| rpp::operators::retry_when([](const std::exception_ptr&) {
return rpp::source::timer(std::chrono::seconds{5}, rpp::schedulers::current_thread{});
})
| rpp::operators::subscribe([](const std::string& v) { std::cout << v << std::endl; });
// Source observable is resubscribed after 5 seconds on each error emission
//! [retry_when delay]
//! [retry_when delay]
size_t retry_count = 0;
rpp::source::create<std::string>([&retry_count](const auto& sub) {
if (retry_count != 4)
{
++retry_count;
sub.on_error({});
}
else
{
sub.on_next(std::string{"success"});
sub.on_completed();
}
})
| rpp::operators::retry_when([](const std::exception_ptr&) {
return rpp::source::timer(std::chrono::seconds{5}, rpp::schedulers::current_thread{});
})
| rpp::operators::subscribe([](const std::string& v) { std::cout << v << std::endl; });
// Source observable is resubscribed after 5 seconds on each error emission
//! [retry_when delay]

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wow, looks like pretty valid point

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@victimsnino, I'm glad you found the point valid! If you need any further assistance or have more questions, feel free to ask.

 (\(\ 
 ( -.-)
 o_(")(")

Learnings added
Learnt from: victimsnino
PR: victimsnino/ReactivePlusPlus#616
File: src/examples/rpp/doxygen/retry_when.cpp:12-30
Timestamp: 2024-08-21T21:06:32.053Z
Learning: In the `retry_when` example in `retry_when.cpp`, incrementing `retry_count` is necessary to prevent infinite retries.

Note: Learnings are effective only in the context of similar code segments. To apply general coding guidelines, please configure review instructions. You can manage existing learnings in the app.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

indeed!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like the humans are having a chat. I'll hop back into my burrow for now. If you need me again, just tag @coderabbitai in a new comment, and I'll come hopping out!

Comment on lines 32 to 61
//! [retry_when]
retry_count = 0;
rpp::source::create<std::string>([&retry_count](const auto& sub) {
if (retry_count != 4)
{
sub.on_error({});
}
else
{
sub.on_next(std::string{"success"});
sub.on_completed();
}
})
| rpp::operators::retry_when([](const std::exception_ptr& ep) {
try
{
std::rethrow_exception(ep);
}
catch (const std::runtime_error&)
{
return rpp::source::timer(std::chrono::seconds{5}, rpp::schedulers::current_thread{});
}
catch (...)
{
throw;
}
})
| rpp::operators::subscribe([](const std::string& v) { std::cout << v << std::endl; });
// Source observable is resubscribed after 5 seconds only on particular error emissions
//! [retry_when]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Clarify the error handling logic and increment retry_count.

The example demonstrates retrying based on specific error types but does not increment retry_count, which could lead to infinite retries.

Apply this diff to increment retry_count:

 rpp::source::create<std::string>([&retry_count](const auto& sub) {
     if (retry_count != 4)
     {
+        ++retry_count;
         sub.on_error({});
     }
     else
     {
         sub.on_next(std::string{"success"});
         sub.on_completed();
     }
 })
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
//! [retry_when]
retry_count = 0;
rpp::source::create<std::string>([&retry_count](const auto& sub) {
if (retry_count != 4)
{
sub.on_error({});
}
else
{
sub.on_next(std::string{"success"});
sub.on_completed();
}
})
| rpp::operators::retry_when([](const std::exception_ptr& ep) {
try
{
std::rethrow_exception(ep);
}
catch (const std::runtime_error&)
{
return rpp::source::timer(std::chrono::seconds{5}, rpp::schedulers::current_thread{});
}
catch (...)
{
throw;
}
})
| rpp::operators::subscribe([](const std::string& v) { std::cout << v << std::endl; });
// Source observable is resubscribed after 5 seconds only on particular error emissions
//! [retry_when]
//! [retry_when]
retry_count = 0;
rpp::source::create<std::string>([&retry_count](const auto& sub) {
if (retry_count != 4)
{
++retry_count;
sub.on_error({});
}
else
{
sub.on_next(std::string{"success"});
sub.on_completed();
}
})
| rpp::operators::retry_when([](const std::exception_ptr& ep) {
try
{
std::rethrow_exception(ep);
}
catch (const std::runtime_error&)
{
return rpp::source::timer(std::chrono::seconds{5}, rpp::schedulers::current_thread{});
}
catch (...)
{
throw;
}
})
| rpp::operators::subscribe([](const std::string& v) { std::cout << v << std::endl; });
// Source observable is resubscribed after 5 seconds only on particular error emissions
//! [retry_when]

rpp::source::create<std::string>([&retry_count](const auto& sub) {
if (retry_count != 4)
{
sub.on_error({});
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC, the fundamental guarantees of observables are broken in this example:

After on_error, no other message must be emitted. This includes the emission of on_completed. See the observable contract for reference.

OnNext
conveys an item that is emitted by the Observable to the observer
OnCompleted
indicates that the Observable has completed successfully and that it will be emitting no further items
OnError
indicates that the Observable has terminated with a specified error condition and that it will be emitting no further items

I am having difficulty understanding how the semantics of retry_when fit into this contract. Operators usually act on the output of an observable. In this example, we have a restart functionality that backpropagates into the generator of a cold observable and acts on the observer itself. I am not happy with that design.

Please explain why we need retry_when.

Copy link
Owner

@victimsnino victimsnino Aug 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually not due to as required no any other messages emitted from this observable, but they would be emitted from new one (after resubscribe). Any observer obtained on_error wouldn't obtain any new messages.
Actually we have chain like this
observable->observer_inside_retry_when->final_observer
when observable emits error, then only observer_inside_retry_when actually obtains error. Instead of forwarding error to final_observer it does attempt to subscribe new observer_inside_retry_when (let's say observer_inside_retry_when_2) observer to new observable_2. so old chain is partially destructed
observable->observer_inside_retry_when->| final_observer
and it becomes like
observable_2->observer_inside_retry_when_2->final_observer

No any guarantees are broken in this case.

Why do we need it? For example, to implement error handling loic with custom delaying. Like this

rpp::source::create<int>([](const auto& observer)
{
    // some hard job to construct state
     while(true) {
          observer.on_next(std::rand());
          if (get_current_cpu_temp() > 95) { // emulating some issues
              observer.on_error(.....);
              break;
          }
     }
    // some hard job to destruct state
})
| rpp::operators::retry_when([](const std::exception_ptr&) {
   return rpp::source::timer(std::chrono::seconds{5}, rpp::schedulers::new_thread{}); // kind of "ok, we are obtained error, it is ok, let's wait 5 seconds to make our cpu cooler and try again"
})
| rpp::operators::subscribe(...);

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no any other messages emitted from this observable, but they would be emitted from new one (after resubscribe)

That is precisely my problem with this operator. This only works and makes sense for cold observables. What happens if this operator is applied on a hot observable? It would subscribe to an observable that has been completed. Can we obtain a compile-time error to avoid this?

Copy link
Owner

@victimsnino victimsnino Aug 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this case hot observable should be in “error state”. For example, subject caches last error and emits it on new subscriptions. In this case it would be infinite loop, but it is up to user to control it.
Not sure if it is possible to add compile time error. Anyway it can be easily suppressed just via converting observable to dynamic version (and losing all meta info)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given your arguments, I drop my request for a compile-time error message. Also, I accept your explanation:

The current design means that if the hot observable goes into an error state, you obtain zero emissions on consecutive subscriptions—unless you add a stateful relay like subject. This behavior follows the contract.

I recommend that the difference in behavior for hot and cold observables is explicitly mentioned and explained by example in the documentation of retry_when to avoid people running into a pitfall.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, sure, thank you!
@CorentinBT , could you please handle this documentation request?

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

Review details

Configuration used: CodeRabbit UI
Review profile: CHILL

Commits

Files that changed from the base of the PR and between 868a8d6 and 523330c.

Files selected for processing (1)
  • src/examples/rpp/doxygen/retry_when.cpp (1 hunks)
Files skipped from review as they are similar to previous changes (1)
  • src/examples/rpp/doxygen/retry_when.cpp

@victimsnino
Copy link
Owner

https://sonarcloud.io/code?id=victimsnino_ReactivePlusPlus&pullRequest=616&selected=victimsnino_ReactivePlusPlus%3Asrc%2Frpp%2Frpp%2Foperators%2Fretry_when.hpp&line=62
looks like your code (same as mine) is not testing non-recursive re-subscription

template<rpp::constraint::decayed_type TNotifier>
struct retry_when_t
{
TNotifier notifier;
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NO_UNIQUE

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so its UB in msvc only if there are other members?

Copy link
Owner

@victimsnino victimsnino Aug 22, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If inherited from base class with vtable. I think , it is partially overrides vtable in this case

Copy link

codecov bot commented Aug 22, 2024

Codecov Report

Attention: Patch coverage is 94.59459% with 2 lines in your changes missing coverage. Please review.

Project coverage is 94.87%. Comparing base (0ea16c2) to head (fef5910).
Report is 1 commits behind head on v2.

Files Patch % Lines
src/rpp/rpp/operators/retry_when.hpp 94.59% 2 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##               v2     #616      +/-   ##
==========================================
- Coverage   94.88%   94.87%   -0.01%     
==========================================
  Files          97       98       +1     
  Lines        1857     1894      +37     
==========================================
+ Hits         1762     1797      +35     
- Misses         95       97       +2     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

Review details

Configuration used: CodeRabbit UI
Review profile: CHILL

Commits

Files that changed from the base of the PR and between 523330c and fef5910.

Files selected for processing (2)
  • src/rpp/rpp/operators/retry_when.hpp (1 hunks)
  • src/tests/rpp/test_retry_when.cpp (1 hunks)
Files skipped from review as they are similar to previous changes (1)
  • src/tests/rpp/test_retry_when.cpp
Additional comments not posted (4)
src/rpp/rpp/operators/retry_when.hpp (4)

37-39: Address potential UB with RPP_NO_UNIQUE_ADDRESS.

The first member with RPP_NO_UNIQUE_ADDRESS for a class with a base class with a vtable could cause undefined behavior on MSVC. Consider placing the bool as the first member or removing RPP_NO_UNIQUE_ADDRESS for the first member.

- RPP_NO_UNIQUE_ADDRESS TObserver   observer;
+ TObserver   observer;

196-201: Verify the requirements for the retry_when function.

Ensure that the notifier passed to the retry_when function meets the requirement of returning an observable. The use of std::forward is appropriate for perfect forwarding.

Verification successful

Verified: The retry_when function's notifier requirement is satisfied.

The test cases and examples confirm that the notifiers used return observable types, meeting the function's requirements.

  • The notifiers in test_retry_when.cpp return observable types like rpp::source::just, rpp::source::empty, and rpp::source::error.
Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Verify that the notifier passed to the `retry_when` function returns an observable.
rg --type cpp -A 5 'retry_when'

Length of output: 17610


48-80: Verify the effectiveness of locally_disposed in retry_when_impl_inner_strategy.

The use of locally_disposed helps prevent multiple calls to on_next. Ensure that this mechanism is effective in all scenarios, especially with complex observables.


147-170: Verify the use of RPP_NO_UNIQUE_ADDRESS and subscription logic in retry_when_t.

Ensure that the use of RPP_NO_UNIQUE_ADDRESS for the notifier member does not cause issues. Verify that the subscription logic correctly sets up the state and initiates the drain process.

Verification successful

Verification successful for RPP_NO_UNIQUE_ADDRESS and subscription logic in retry_when_t.

The use of RPP_NO_UNIQUE_ADDRESS in the retry_when_t class is consistent with its usage across the codebase, indicating a deliberate design choice. The subscription logic correctly sets up the state and initiates the drain process, following a standard pattern.

Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Verify the use of `RPP_NO_UNIQUE_ADDRESS` and the subscription logic in `retry_when_t`.
rg --type cpp -A 5 'RPP_NO_UNIQUE_ADDRESS'

Length of output: 35078

src/rpp/rpp/operators/retry_when.hpp Show resolved Hide resolved
src/rpp/rpp/operators/retry_when.hpp Show resolved Hide resolved
Copy link

sonarqubecloud bot commented Aug 23, 2024

@victimsnino victimsnino merged commit 7e811fb into victimsnino:v2 Aug 23, 2024
36 of 37 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants