-
Notifications
You must be signed in to change notification settings - Fork 31
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Try to stabilize disposables logic #673
Conversation
📝 Walkthrough📝 Walkthrough📝 Walkthrough📝 Walkthrough📝 Walkthrough📝 Walkthrough📝 Walkthrough📝 Walkthrough📝 Walkthrough📝 Walkthrough📝 WalkthroughWalkthroughThis pull request introduces significant modifications across multiple files in the ReactivePlusPlus library, primarily focusing on the restructuring of classes and methods related to disposables. Key changes include renaming various state classes to disposable classes, simplifying method signatures, and enhancing the management of disposables within the operators. The modifications aim to streamline the reference counting and disposal logic, ensuring a more intuitive handling of resources throughout the library. Changes
Possibly related PRs
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 7
🧹 Outside diff range and nitpick comments (9)
src/rpp/rpp/disposables/refcount_disposable.hpp (2)
Line range hint
94-110
: LGTM with minor documentation suggestion.The implementation is thread-safe and correctly handles reference counting and disposal. Consider improving the comment on line 100 to better explain the atomicity requirements.
- // just need atomicity, not guarding anything + // Atomic increment required to prevent race conditions in reference counting
Line range hint
94-110
: Good architectural direction.The simplification of the reference counting logic and removal of the Mode enum aligns well with the single responsibility principle. The changes make the disposable management more straightforward and less error-prone.
Consider documenting these architectural decisions in the codebase, perhaps in a design.md file, to help future maintainers understand the rationale behind the simplified disposable management approach.
src/rpp/rpp/operators/retry_when.hpp (2)
59-59
: LGTM! Consider adding const qualifier to set_upstream parameter.The refactoring to use composite_disposable methods is clean and consistent. However, since the disposable_wrapper parameter in set_upstream is passed by const reference and only used for adding to the state, consider marking it as const:
-void set_upstream(const disposable_wrapper& d) const { state->add(d); } +void set_upstream(const disposable_wrapper& d) const noexcept { state->add(d); }Also applies to: 79-79, 81-81
116-116
: Ensure consistency in const qualifiers.The
set_upstream
method is non-const while other methods in the structure are const. Consider making it const for consistency:-void set_upstream(const disposable_wrapper& d) { state->add(d); } +void set_upstream(const disposable_wrapper& d) const noexcept { state->add(d); }Also applies to: 118-118
src/rpp/rpp/operators/take_until.hpp (2)
26-34
: Refactor constructors to use delegation to reduce code duplicationBoth constructors of
take_until_disposable
initializem_observer_with_mutex
withobserver
. Consider using constructor delegation to streamline the code.Apply this refactor to delegate one constructor to the other:
-class take_until_disposable final : public rpp::composite_disposable +class take_until_disposable final : public rpp::composite_disposable { public: - take_until_disposable(TObserver&& observer) - : m_observer_with_mutex(std::move(observer)) {} - - take_until_disposable(const TObserver& observer) - : m_observer_with_mutex(observer) {} + take_until_disposable(const TObserver& observer) + : m_observer_with_mutex(observer) {} + take_until_disposable(TObserver&& observer) + : take_until_disposable(static_cast<const TObserver&>(observer)) {}
Line range hint
112-118
: Properly initializestate
in observer strategies withinlift
methodIn the
lift
method,state
should be initialized and passed to the observer strategies to prevent null references.Modify the construction of observer strategies to include the initialized
state
:const auto d = disposable_wrapper_impl<take_until_disposable<std::decay_t<Observer>>>::make(std::forward<Observer>(observer)); auto ptr = d.lock(); ptr->get_observer()->set_upstream(d.as_weak()); observable.subscribe(take_until_throttle_observer_strategy<std::decay_t<Observer>>{ptr}); -return rpp::observer<Type, take_until_observer_strategy<std::decay_t<Observer>>>(std::move(ptr)); +return rpp::observer<Type, take_until_observer_strategy<std::decay_t<Observer>>>(take_until_observer_strategy<std::decay_t<Observer>>{ptr});Ensure that
take_until_observer_strategy
is constructed withptr
to initializestate
:+struct take_until_observer_strategy : public take_until_observer_strategy_base<TObserver> { + take_until_observer_strategy(const std::shared_ptr<take_until_disposable<TObserver>>& s) + : take_until_observer_strategy_base<TObserver>{s} {} template<typename T> void on_next(T&& v) const { if (!take_until_observer_strategy_base<TObserver>::state->is_stopped()) take_until_observer_strategy_base<TObserver>::state->get_observer()->on_next(std::forward<T>(v)); } };src/rpp/rpp/operators/combine_latest.hpp (1)
Line range hint
1-1
: Consider updating the documentation to reflect the changes in thecombine_latest
operator.The refactoring from state-based to disposable-based management is a significant change in the library's design. Updating the documentation, including the marble diagrams, performance notes, and examples, will help users understand the new structure and behavior of the
combine_latest
operator.Do you want me to open a GitHub issue to track this documentation update task?
src/rpp/rpp/sources/concat.hpp (1)
25-25
: Inheriting fromrpp::composite_disposable
The
concat_state_t
struct now inherits fromrpp::composite_disposable
. This change streamlines disposable management by allowingconcat_state_t
to directly manage disposables without an additional member variable. Ensure that this inheritance does not introduce any unintended side effects or conflicts with existing functionality.src/rpp/rpp/operators/merge.hpp (1)
Line range hint
1-250
: Consider adding unit tests for the refactored code.While the refactoring looks good overall, it's important to ensure that the behavior of the
merge
andmerge_with
operators remains unchanged after the transition from state management to disposable management. Consider adding or updating unit tests to cover the refactored code and verify that the operators still function as expected.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
📒 Files selected for processing (14)
src/rpp/rpp/disposables/refcount_disposable.hpp
(3 hunks)src/rpp/rpp/operators/combine_latest.hpp
(2 hunks)src/rpp/rpp/operators/concat.hpp
(5 hunks)src/rpp/rpp/operators/debounce.hpp
(4 hunks)src/rpp/rpp/operators/delay.hpp
(5 hunks)src/rpp/rpp/operators/details/combining_strategy.hpp
(5 hunks)src/rpp/rpp/operators/merge.hpp
(5 hunks)src/rpp/rpp/operators/on_error_resume_next.hpp
(2 hunks)src/rpp/rpp/operators/retry.hpp
(4 hunks)src/rpp/rpp/operators/retry_when.hpp
(5 hunks)src/rpp/rpp/operators/take_until.hpp
(4 hunks)src/rpp/rpp/operators/with_latest_from.hpp
(5 hunks)src/rpp/rpp/operators/zip.hpp
(3 hunks)src/rpp/rpp/sources/concat.hpp
(4 hunks)
🔇 Additional comments (69)
src/rpp/rpp/disposables/refcount_disposable.hpp (1)
57-57
: Verify impact of breaking API change.
The simplified method signature improves clarity but represents a breaking change that could affect existing code.
Let's verify the usage of this method with the old signature:
✅ Verification successful
Breaking change is safe to proceed
The removal of the Mode
parameter from add_ref()
is a safe breaking change because:
- All current usages in the codebase call
add_ref()
without any parameters - The
Mode
enum (found ininterface_disposable.hpp
) is not used withadd_ref()
anywhere in the codebase - The change aligns with the documented behavior in
docs/readme.md
which describesadd_ref()
without mentioning any mode parameter
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Search for any remaining calls using the old Mode parameter
rg -l "add_ref\s*\([^)]*Mode"
# Search for any test cases that might need updating
rg -l "add_ref.*Mode" --glob "test/**/*"
Length of output: 223
Script:
#!/bin/bash
# First, let's find all test files to ensure we're searching in the right places
fd -t f "test.*\.(cpp|hpp)$"
# Then, let's do a broader search for add_ref usage
rg -A 2 "add_ref\("
# Also search for Mode enum or type to understand the previous API
rg -A 2 "enum.*Mode|class.*Mode|struct.*Mode"
Length of output: 7150
src/rpp/rpp/operators/retry_when.hpp (2)
26-33
: LGTM! Good refactoring of disposable management.
The inheritance from rpp::composite_disposable
is a solid improvement that provides built-in disposable management functionality, aligning well with the PR's objective of stabilizing disposables logic.
124-124
: LGTM! Consistent use of disposable management.
The change to use state->is_disposed()
aligns well with the new disposable management approach while maintaining the existing thread-safety guarantees.
src/rpp/rpp/operators/with_latest_from.hpp (5)
26-45
: LGTM! Clean refactoring of state management to disposable management.
The renaming and restructuring of the class aligns well with the PR's objective of stabilizing disposables logic. The inheritance from composite_disposable
provides a solid foundation for disposable management.
Line range hint 50-77
: LGTM! Simplified disposable management in inner observer strategy.
The changes streamline the disposable management by using direct calls and appropriate disposables mode. The code is now more maintainable and follows a consistent pattern.
Line range hint 82-121
: LGTM! Well-structured observer strategy with proper disposable management.
The observer strategy correctly handles all reactive events while maintaining clean disposable management. The type aliases and member variables are appropriately updated to reflect the new disposable-focused approach.
141-141
: LGTM! Optimized disposables strategy.
The change to fixed_disposables_strategy<1>
provides more precise control over disposable resources and better reflects the actual requirements of the operator.
153-166
: LGTM! Clean subscription implementation with proper disposable management.
The subscription logic maintains thread safety and correctly handles the disposable lifecycle. The implementation is consistent with reactive programming patterns and the overall disposable-focused approach.
src/rpp/rpp/operators/details/combining_strategy.hpp (4)
16-16
: Ensure Correct Inclusion of composite_disposable
Header
Including <rpp/disposables/composite_disposable.hpp>
is necessary for the use of composite_disposable
. Verify that this header file is available and correctly included to prevent compilation issues.
Line range hint 26-33
: Verify Proper Inheritance and Initialization in combining_disposable
The combining_disposable
class now inherits from composite_disposable
. Ensure that this inheritance is appropriate and that all required virtual functions from the base class are correctly overridden if necessary. Additionally, confirm that the constructor initializes all member variables properly.
95-95
: Review the Use of fixed_disposables_strategy<0>
The updated_optimal_disposables_strategy
is set to fixed_disposables_strategy<0>
. Verify that setting it to zero aligns with the intended disposable management strategy. This could impact how disposables are handled upstream and downstream.
119-121
: Confirm Correct Variadic Subscription Logic
In the subscribe
function, the code subscribes to each observable using a variadic parameter pack expansion. Ensure that the indices I
correctly align with each TObservables
type, and that TStrategy<I + 1, ...>
correctly instantiates the strategy for each observable.
src/rpp/rpp/operators/on_error_resume_next.hpp (4)
20-30
: New disposable struct is correctly implemented
The on_error_resume_next_disposable
struct correctly encapsulates the observer and inherits from rpp::composite_disposable
. The implementation ensures proper ownership and resource management.
67-70
: State initialization is properly performed
The constructor of on_error_resume_next_observer_strategy
correctly initializes the state
using init_state
, ensuring that the observer is properly wrapped within the disposable structure.
85-91
: Verify disposal timing after error handling
In the on_error
method, state->dispose()
is called after subscribing to the new observable returned by the selector
. Please verify that disposing of state
at this point does not interfere with the lifecycle of the new subscription or lead to unintended disposal of shared resources.
106-112
: Proper setup of observer's upstream disposable
The init_state
method correctly sets up the observer's upstream disposable using d.as_weak()
. This ensures that the observer maintains a weak reference to the disposable, facilitating appropriate resource management without preventing disposal when no longer needed.
src/rpp/rpp/operators/zip.hpp (4)
24-28
: Refactoring to zip_disposable
enhances resource management
The change from zip_state
to zip_disposable
, along with the updated inheritance from combining_disposable<Observer>
, is appropriate and aligns with the overall shift towards a disposable-based approach in the codebase.
45-47
: zip_observer_strategy
adaptation is consistent with the refactoring
Updating the inheritance and using declaration to reference zip_disposable
ensures consistency with the new disposable management system.
52-55
: Correct usage of disposable
in the on_next
method
The modifications in the on_next
method correctly utilize disposable
to access the observer and pending values, reflecting the refactored structure.
71-71
: zip_t
struct inheritance updated appropriately
Changing the inheritance to combining_operator_t<zip_disposable, zip_observer_strategy, TSelector, TObservables...>
aligns with the disposable-centric design and is correctly implemented.
src/rpp/rpp/operators/retry.hpp (6)
21-21
: Inheritance from rpp::composite_disposable
The retry_state_t
struct now inherits from rpp::composite_disposable
, which streamlines disposable management by allowing direct use of disposable methods. This change enhances code clarity and reduces complexity.
64-64
: Updated disposal clearing in on_error
Replaced state->disposable.clear()
with state->clear()
, leveraging the inherited clear
method from composite_disposable
. This ensures that all disposables managed by the state are properly cleared upon error.
80-80
: Simplified disposable addition in set_upstream
Changed state->disposable.add(d)
to state->add(d)
, utilizing inheritance to directly manage disposables. This improves code readability and aligns with the updated disposable management strategy.
83-83
: Updated disposal check in is_disposed
Modified the disposal check from state->disposable.is_disposed()
to state->is_disposed()
. This correctly reflects the disposal state using the inherited method from composite_disposable
.
89-89
: Adjusted loop condition in drain
function
Updated the loop condition to !state->is_disposed()
from !state->disposable.is_disposed()
. This change ensures the loop correctly checks the disposal state of the entire state
object.
126-129
: Subscription setup using disposable_wrapper_impl
The creation of the disposable d
using disposable_wrapper_impl
and locking it to obtain ptr
is appropriate. Setting the upstream disposable via ptr->observer.set_upstream(d.as_weak())
ensures that the observer holds a weak reference, preventing cyclic references and allowing proper disposal. Invoking drain(ptr)
afterwards correctly initiates the retry logic.
src/rpp/rpp/operators/take_until.hpp (1)
Line range hint 86-90
: Ensure thread-safe access to state
in on_next
In take_until_observer_strategy
, the on_next
method accesses state
without synchronization. Since state
could be modified from other threads, ensure that all accesses are thread-safe to prevent data races.
Run the following script to check for potential data races involving state
:
src/rpp/rpp/operators/combine_latest.hpp (6)
25-26
: LGTM!
The constructor has been updated to initialize combining_disposable
instead of combining_state
, reflecting the transition to disposable-based management.
43-45
: LGTM!
The base class and type aliases have been updated to use combine_latest_disposable
, aligning with the broader refactoring effort.
51-54
: LGTM!
The on_next
method has been adjusted to interact with the disposable
object, ensuring that the new class structure is utilized correctly.
58-59
: LGTM!
The apply_impl
function has been updated to reflect the new type, changing from TState
to TDisposable
, aligning with the transition to disposable-based management.
67-67
: LGTM!
The combine_latest_t
struct now inherits from combining_operator_t
using combine_latest_disposable
, reflecting the shift in how the combining operator is structured and utilized.
22-22
: Verify the renaming of combine_latest_state
to combine_latest_disposable
across the codebase.
The renaming aligns with the broader refactoring effort to transition from state-based to disposable-based management. However, ensure that all references to combine_latest_state
have been updated accordingly.
Run the following script to verify the usage of combine_latest_state
:
✅ Verification successful
Renaming from combine_latest_state
to combine_latest_disposable
is verified and complete
The git history shows that the renaming was done as part of a recent commit (e3706c8), where all occurrences of combine_latest_state
were properly updated to combine_latest_disposable
. This includes:
- The class name
- Template parameters in
combine_latest_observer_strategy
- Template parameters in
combine_latest_t
- All related member references
No remaining instances of combine_latest_state
exist in the codebase, confirming the complete transition from state-based to disposable-based management.
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Description: Verify all references to `combine_latest_state` have been updated.
# Test: Search for occurrences of `combine_latest_state`. Expect: No results found.
rg $'combine_latest_state'
Length of output: 25
Script:
#!/bin/bash
# Let's verify this more thoroughly by:
# 1. Checking for any variations of the name
# 2. Looking at git history to confirm the rename
# 3. Checking the combine_latest implementation
# Check for any case variations or partial matches
rg -i 'combine.*state'
# Look at git history for this file
git log -p src/rpp/rpp/operators/combine_latest.hpp
# Check the combine_latest implementation
ast-grep --pattern 'class combine_latest_disposable $_'
Length of output: 10935
src/rpp/rpp/operators/debounce.hpp (5)
15-15
: Appropriate inclusion of composite_disposable header.
The addition of <rpp/disposables/composite_disposable.hpp>
is necessary for debounce_disposable
to inherit from composite_disposable_impl
.
35-36
: Correct inheritance in debounce_disposable class.
The debounce_disposable
class properly inherits from composite_disposable_impl<Container>
and enable_wrapper_from_this
, ensuring correct disposable management and the ability to create weak references to itself.
141-143
: Thread safety in on_completed() method.
The on_completed()
method correctly locks and extracts any pending value to emit before signaling completion. This ensures that the last value is not lost if it was emitted shortly before completion.
170-173
: Proper setup of upstream disposable and observer creation.
The lift_with_disposable_strategy
method correctly:
- Creates a
debounce_disposable
instance. - Locks the weak pointer to get a
shared_ptr
. - Sets the upstream disposable to manage disposables correctly.
- Returns an observer with the
debounce_observer_strategy
.
120-120
: Ensure proper addition of upstream disposable.
In the set_upstream
method:
void set_upstream(const rpp::disposable_wrapper& d) const
{
disposable->add(d);
}
Make sure that adding the upstream disposable does not introduce circular references that could prevent disposables from being released.
Please verify the disposal chain to ensure there are no memory leaks. You can use the following script:
This script searches for all instances where disposables are added to ensure they are correctly managed.
✅ Verification successful
Let me analyze the code further to understand the disposal chain implementation.
After analyzing the implementation, I can now provide the final response:
The disposable chain implementation is safe and prevents circular references
The implementation in composite_disposable
has proper safeguards:
- It prevents self-referential disposables by checking
disposable.lock().get() == this
in theadd
method - Uses atomic state management to prevent race conditions during disposal
- Properly cleans up resources when disposed
- Has thread-safe implementation for adding/removing disposables
The set_upstream
implementation in debounce.hpp is correct and follows the established pattern used throughout the codebase for safe disposal chain management.
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Description: Check for potential circular references in disposables.
# Since the codebase is complex, ensure that there are no self-referential disposables.
grep -rn "add(" src/ | grep "disposable"
Length of output: 5050
Script:
#!/bin/bash
# Check the implementation of disposable_wrapper and composite_disposable
cat src/rpp/rpp/disposables/disposable_wrapper.hpp src/rpp/rpp/disposables/composite_disposable.hpp
Length of output: 14750
src/rpp/rpp/sources/concat.hpp (5)
72-72
: Ensuring Accurate Disposal Status in is_disposed
The is_disposed()
method now returns the logical OR of locally_disposed
and state->is_disposed()
. This ensures that the disposal status accurately reflects both local and state-level disposals. Review this change to confirm that it correctly represents the disposal state without introducing redundancy or race conditions.
Consider verifying thread safety and ensuring atomic operations protect shared variables if necessary.
77-77
: Clearing State Disposables on Completion
In the on_completed()
method, calling state->clear();
ensures that all disposables are released when the sequence completes. This is vital for preventing resource leaks. Validate that this operation does not interfere with other disposables or lead to premature disposal in concurrent environments.
134-137
: 🛠️ Refactor suggestion
Refactoring Initialization and Subscription Logic
Lines 134-137 refactor the way the disposable wrapper and state are created and managed:
- Line 134: Creates a disposable wrapper
d
, encapsulating the observer and container. - Line 135: Acquires a lock on the disposable wrapper to access the state.
- Line 136: Sets the upstream disposable of the observer to a weak reference of
d
. - Line 137: Initiates the
drain
process with the current state.
This refactoring enhances resource management and aligns with the updated disposable handling. Verify that the locking mechanism and weak references are correctly implemented to avoid deadlocks or dangling pointers.
Ensure that d.lock()
and d.as_weak()
are used appropriately, and consider adding thread safety measures if concurrent access is possible.
89-89
: Properly Checking Disposal Status in drain
Loop
The drain
function's loop now checks while (!state->is_disposed())
, which prevents further processing if the state has been disposed. This change enhances the robustness of the loop by preventing unnecessary operations after disposal. Ensure that disposals trigger as expected and halt the loop appropriately.
To verify the disposal mechanism, you can use the following script:
#!/bin/bash
# Description: Find all instances where `state->dispose()` is called to confirm it affects the `drain` loop.
# Test: Search for calls to `state->dispose()`
rg 'state->dispose\(\)'
70-70
: Updating set_upstream
to Use State's add
Method
The set_upstream
method now calls state->add(d);
, leveraging the inherited add
method from rpp::composite_disposable
. This modification simplifies the disposable addition process. Confirm that all upstream disposables are correctly managed with this new implementation.
You can verify the correct addition of upstream disposables with the following script:
✅ Verification successful
The previous search pattern was too strict. Let's broaden it to understand the implementation context better and verify the changes in the concat source file.
Based on the search results, I can now generate the final response as we have sufficient evidence. The search shows that the pattern of using state->add(d)
in set_upstream
is consistently used across the codebase, including in subjects (behavior_subject, publish_subject, replay_subject) and operators (take_until, retry_when). The implementation in concat.hpp follows this established pattern.
Implementation Verified: Correct Usage of state->add(d)
in set_upstream
The modification to use state->add(d)
in set_upstream
aligns with the codebase's standard pattern for disposable management. This implementation is consistent with other reactive components in the codebase that handle upstream disposables.
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Description: Ensure that all calls to `set_upstream` add disposables via `state->add(d);`
# Test: Search for `set_upstream` implementations using `state->add(d);`
rg 'void set_upstream\(.*\)\s*\{\s*state->add\('
Length of output: 196
Script:
#!/bin/bash
# Let's try a broader search pattern for set_upstream implementations
rg "set_upstream.*\{" -A 2
# Also search for the specific file and its context
rg -p "class.*concat" src/rpp/rpp/sources/concat.hpp -A 20
Length of output: 17429
src/rpp/rpp/operators/concat.hpp (8)
37-37
: Class renaming and inheritance change approved
The class concat_state_t
has been renamed to concat_disposable
, inheriting from rpp::refcount_disposable
. This improves clarity and aligns with the disposable-based management approach in the library.
40-42
: Observer initialization is correct
The constructor appropriately initializes m_observer
with the moved observer
, ensuring efficient resource management.
52-59
: Disposal and completion logic approved
The drain
method correctly handles disposal and notifies the observer upon completion. The use of is_disposed()
within the loop ensures proper termination.
108-120
: Constructor and member initialization approved
Constructors in concat_observer_strategy_base
properly initialize disposable
and refcounted
members using std::move
, efficiently managing resources.
143-143
: Forwarding value to observer is appropriate
The on_next
method correctly forwards the received value to the observer using std::forward
, maintaining perfect forwarding semantics.
165-165
: Initialization using init_state
is correct
The concat_observer_strategy
constructor appropriately initializes its base class with init_state
, encapsulating initialization logic.
173-176
: Stage management logic appears correct
The on_next
method effectively manages the stage
using atomic operations, ensuring thread-safe transitions between states and correct handling of observables.
182-184
: Ensure observer is notified upon disposal
After disposing refcounted
, the conditional check and subsequent call to on_completed()
ensure that the observer is properly notified when the disposable is disposed.
src/rpp/rpp/operators/delay.hpp (13)
16-16
: Inclusion of composite_disposable.hpp
is appropriate
The addition of #include <rpp/disposables/composite_disposable.hpp>
ensures that the composite_disposable_impl
class is available, which is necessary for the updated disposables management.
38-39
: Refactored delay_state
to delay_disposable
with disposables management
The refactoring to delay_disposable
inheriting from rpp::composite_disposable_impl<Container>
aligns with the new disposables approach. This change enhances consistency in disposable handling across the library.
Line range hint 43-50
: Proper initialization using move semantics
The constructor for delay_disposable
correctly utilizes move semantics for in_observer
and in_worker
, ensuring efficient resource management.
59-66
: Introduction of delay_disposable_wrapper
with correct interface
The delay_disposable_wrapper
encapsulates the shared pointer to delay_disposable
and provides necessary methods like is_disposed()
and on_error()
, forwarding calls appropriately to the underlying observer.
69-73
: Updated delay_observer_strategy
to utilize new disposables
The delay_observer_strategy
now holds a shared pointer to delay_disposable
and sets preferred_disposables_mode
to Auto
. This ensures disposables are managed consistently and efficiently.
77-77
: Ensuring upstream disposable is added to composite disposable
The call to disposable->add(d);
correctly adds the upstream disposable to the composite disposable, ensuring proper disposal of upstream resources.
82-82
: Correct disposal status check
The is_disposed()
method accurately reflects the disposal status by querying the underlying disposable
.
117-121
: Appropriate handling of errors with ClearOnError
When ClearOnError
is true
, the queue is cleared upon an error, and on_error
is forwarded to the observer. This ensures that no delayed emissions occur after an error, which is the desired behavior.
126-130
: Correct scheduling of delayed emissions
The time point for delayed emission is correctly calculated, and the emission is enqueued. If the disposable was not active, it becomes active and schedules the drain_queue
function.
137-159
: Efficient and thread-safe queue draining
The drain_queue
function efficiently processes the emission queue with proper locking. It handles scheduling delays, checks for completion, and dispatches the appropriate observer methods (on_next
, on_error
, on_completed
).
176-176
: Use of fixed_disposables_strategy<1>
optimizes disposables management
Changing to fixed_disposables_strategy<1>
simplifies the disposables container to a fixed size, which can improve performance and reduce overhead.
181-190
: Updated lift_with_disposable_strategy
for correct observer creation
The method now correctly creates the delay_disposable
, sets the upstream disposable, and returns a properly constructed observer with the updated strategy.
107-110
: Verify potential for cyclic references in scheduled tasks
The lambda function passed to schedule
captures disposable
. Ensure that this does not create a cyclic reference preventing the delay_disposable
from being destructed.
To confirm that there are no cyclic references affecting disposal, you can run the following script:
This script looks for patterns where std::shared_ptr
is captured in lambdas, which could create cyclic references. Review any matches to ensure that disposables can be properly released.
✅ Verification successful
The previous search wasn't specific enough. Let's analyze the delay operator's implementation to understand the ownership structure and potential cyclic references. This will help us verify if the lambda capture could create a cyclic reference.
Based on the code analysis, I can now generate the final response:
No cyclic reference issue in delay operator's scheduled task
The implementation is safe because:
- The
delay_disposable_wrapper
only holds a non-owning reference to the disposable throughstd::shared_ptr
- The lambda capture in
schedule()
is passed by value through the wrapper, not creating any circular ownership - The
drain_queue
function receives the disposable as a const reference parameter - The disposable inherits from
composite_disposable_impl
which properly manages the lifecycle
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Description: Check for potential cyclic references involving std::shared_ptr in lambdas.
# Search for lambdas capturing shared_ptr that may lead to cyclic dependencies.
rg -A 3 'std::shared_ptr<[^>]+>\s+\w+\s*=\s*[^;]+;' -g '*.hpp' -g '*.cpp'
# Alternatively, specifically check for captures of shared_ptr in lambdas.
rg -A 3 '\[.*\]\s*\([^)]*\)\s*{[^}]*shared_ptr' -g '*.hpp' -g '*.cpp'
Length of output: 3159
Script:
#!/bin/bash
# Let's examine the delay operator implementation to understand the ownership structure
ast-grep --pattern 'class delay_disposable {
$$$
}'
# Also check the delay_disposable_wrapper structure
ast-grep --pattern 'struct delay_disposable_wrapper {
$$$
}'
# And check how the disposable is stored/managed
rg -A 5 "delay_disposable" --type cpp
Length of output: 3585
src/rpp/rpp/operators/merge.hpp (5)
Line range hint 27-45
: Refactoring looks good!
The renaming of merge_state
to merge_disposable
and the inheritance from composite_disposable
align with the overall refactoring goal of transitioning from state management to disposable management. The updated member functions and variables reflect this change appropriately.
64-65
: Refactoring looks good!
The updates to the set_upstream
, is_disposed
, on_error
, and on_completed
methods to operate on the new merge_disposable
type are consistent with the overall refactoring. The changes in the on_completed
method, including the usage of decrement_on_completed
and the removal of disposables, align with the new disposable management approach.
Also applies to: 70-71, 75-76, 80-90, 94-95
106-107
: Refactoring looks good!
The update to the on_next
method in merge_observer_inner_strategy
to use the new merge_disposable
type is consistent with the overall refactoring.
115-115
: Refactoring looks good!
The changes in the merge_observer_strategy
class, including the initialization with merge_disposable
and the new private static method init_state
, align with the transition to disposable management. The updates to the on_next
method to call increment_on_completed
on merge_disposable
are consistent with the new approach.
Also applies to: 122-124, 127-133
52-53
: Verify the impact of the constructor signature change.
The constructor signature has been updated to accept merge_disposable
instead of merge_state
. Ensure that all instances of merge_observer_base_strategy
are initialized with the correct type.
Run the following script to verify the constructor usage:
Also applies to: 57-58
✅ Verification successful
Let me search for any usage of this constructor to verify the impact.
Let me check for any derived classes or usage patterns to ensure completeness.
Constructor signature change is correctly implemented and used
The verification shows that:
- The base class
merge_observer_base_strategy
has two constructors (move and const reference) that both correctly usemerge_disposable<TObserver>
- The derived class
merge_observer_inner_strategy
inherits from it and uses the constructor correctly through initialization - All usages of the constructor are consistent with the new signature, and there are no remaining references to the old
merge_state
type
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Description: Verify all constructor calls to `merge_observer_base_strategy` match the new signature.
# Test: Search for the constructor usage. Expect: Only occurrences of the new signature.
rg -A 5 $'merge_observer_base_strategy\('
Length of output: 788
Script:
#!/bin/bash
# Search for any instantiation or inheritance of merge_observer_base_strategy
ast-grep --pattern 'merge_observer_base_strategy<$_>'
# Also search for any direct constructor calls
rg "merge_observer_base_strategy\s*\([^)]*\)" --type cpp
Length of output: 375
Script:
#!/bin/bash
# Search for classes that inherit from merge_observer_base_strategy
ast-grep --pattern 'class $_ : $$$merge_observer_base_strategy<$_>$$$'
# Search for the merge_disposable type to understand the relationship
rg "merge_disposable" -A 5 -B 5 --type cpp
Length of output: 3879
const auto d = disposable_wrapper_impl<retry_when_state<std::decay_t<TObserver>, std::decay_t<TObservable>, std::decay_t<TNotifier>>>::make(std::forward<TObserver>(observer), std::forward<TObservable>(observable), notifier); | ||
auto ptr = d.lock(); | ||
|
||
ptr->observer.set_upstream(d.as_weak()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Consider adding error handling for lock() failure.
While the disposable creation and initialization look good, the lock()
call on line 161 could return a null pointer if the weak_ptr is already expired. Consider adding a check:
const auto d = disposable_wrapper_impl<retry_when_state<std::decay_t<TObserver>, std::decay_t<TObservable>, std::decay_t<TNotifier>>>::make(std::forward<TObserver>(observer), std::forward<TObservable>(observable), notifier);
auto ptr = d.lock();
+if (!ptr)
+ return;
ptr->observer.set_upstream(d.as_weak());
drain(ptr);
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
const auto d = disposable_wrapper_impl<retry_when_state<std::decay_t<TObserver>, std::decay_t<TObservable>, std::decay_t<TNotifier>>>::make(std::forward<TObserver>(observer), std::forward<TObservable>(observable), notifier); | |
auto ptr = d.lock(); | |
ptr->observer.set_upstream(d.as_weak()); | |
const auto d = disposable_wrapper_impl<retry_when_state<std::decay_t<TObserver>, std::decay_t<TObservable>, std::decay_t<TNotifier>>>::make(std::forward<TObserver>(observer), std::forward<TObservable>(observable), notifier); | |
auto ptr = d.lock(); | |
if (!ptr) | |
return; | |
ptr->observer.set_upstream(d.as_weak()); |
template<typename TDisposable> | ||
struct combining_observer_strategy | ||
{ | ||
static constexpr auto preferred_disposables_mode = rpp::details::observers::disposables_mode::Auto; | ||
|
||
std::shared_ptr<TState> state{}; | ||
std::shared_ptr<TDisposable> disposable{}; | ||
|
||
void set_upstream(const rpp::disposable_wrapper& d) const | ||
{ | ||
state->get_observer_under_lock()->set_upstream(d); | ||
disposable->add(d); | ||
} | ||
|
||
bool is_disposed() const | ||
{ | ||
return state->is_disposed(); | ||
return disposable->is_disposed(); | ||
} | ||
|
||
void on_error(const std::exception_ptr& err) const | ||
{ | ||
state->get_observer_under_lock()->on_error(err); | ||
disposable->get_observer_under_lock()->on_error(err); | ||
} | ||
|
||
void on_completed() const | ||
{ | ||
if (state->decrement_on_completed()) | ||
state->get_observer_under_lock()->on_completed(); | ||
if (disposable->decrement_on_completed()) | ||
disposable->get_observer_under_lock()->on_completed(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Potential Thread Safety Issues with Shared disposable
Member
In combining_observer_strategy
, the disposable
member is a std::shared_ptr<TDisposable>
. The methods set_upstream
, is_disposed
, on_error
, and on_completed
access or modify disposable
. Since these methods are marked const
, and disposable
may be shared across threads, ensure that access to disposable
is thread-safe to prevent data races.
Consider employing mutexes or atomic operations to safeguard access to disposable
, or adjust the constness of the methods if mutability is required.
using Disposable = TDisposable<Observer, TSelector, Type, rpp::utils::extract_observable_type_t<TObservables>...>; | ||
|
||
const auto d = rpp::disposable_wrapper_impl<State>::make(std::forward<Observer>(observer), selector); | ||
auto state = d.lock(); | ||
state->get_observer_under_lock()->set_upstream(d.as_weak()); | ||
const auto disposable = disposable_wrapper_impl<Disposable>::make(std::forward<Observer>(observer), selector); | ||
auto locked = disposable.lock(); | ||
locked->get_observer_under_lock()->set_upstream(disposable.as_weak()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Prevent Potential Memory Leaks Due to Shared Ownership
In subscribe_impl
, a shared_ptr
to Disposable
is created, and methods involving locking and setting upstream disposables are called. There is a risk of creating cyclic references if Disposable
holds shared_ptr
s to objects that, directly or indirectly, reference back to it.
To mitigate memory leaks:
- Use
std::weak_ptr
where appropriate to break strong reference cycles. - Ensure that any callbacks or stored observers within
Disposable
do not create cycles.
@@ -48,7 +49,7 @@ namespace rpp::operators::details | |||
{ | |||
static constexpr auto preferred_disposables_mode = rpp::details::observers::disposables_mode::None; | |||
|
|||
std::shared_ptr<take_until_state<TObserver>> state; | |||
std::shared_ptr<take_until_disposable<TObserver>> state; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Initialize state
member to prevent null pointer dereference
The state
member in take_until_observer_strategy_base
is used without initialization, potentially leading to null pointer dereferences in methods like on_error
, on_completed
, set_upstream
, and is_disposed
.
Ensure that state
is properly initialized before use. You can add a constructor to take_until_observer_strategy_base
to initialize state
:
struct take_until_observer_strategy_base {
static constexpr auto preferred_disposables_mode = rpp::details::observers::disposables_mode::None;
+ take_until_observer_strategy_base(const std::shared_ptr<take_until_disposable<TObserver>>& s)
+ : state(s) {}
+
std::shared_ptr<take_until_disposable<TObserver>> state;
Committable suggestion skipped: line range outside the PR's diff.
[](const debounce_disposable_wrapper<Observer, Worker, Container>& handler) -> schedulers::optional_delay_to { | ||
auto value_or_duration = handler.disposable->extract_value_or_time(); | ||
if (auto* timepoint = std::get_if<schedulers::time_point>(&value_or_duration)) | ||
return schedulers::optional_delay_to{*timepoint}; | ||
|
||
if (auto* value = std::get_if<T>(&value_or_duration)) | ||
handler.state->get_observer_under_lock()->on_next(std::move(*value)); | ||
handler.disposable->get_observer_under_lock()->on_next(std::move(*value)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Potential null shared_ptr dereference in scheduled lambda.
In the lambda function within the schedule()
method, handler.disposable
might be a null shared_ptr
if the debounce_disposable
has been disposed before the scheduled action executes. Accessing it without a null check could lead to undefined behavior.
Please add a null check before accessing handler.disposable
:
+if (!handler.disposable)
+ return std::nullopt;
auto value_or_duration = handler.disposable->extract_value_or_time();
This ensures that the lambda exits gracefully if the disposable is no longer valid.
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
[](const debounce_disposable_wrapper<Observer, Worker, Container>& handler) -> schedulers::optional_delay_to { | |
auto value_or_duration = handler.disposable->extract_value_or_time(); | |
if (auto* timepoint = std::get_if<schedulers::time_point>(&value_or_duration)) | |
return schedulers::optional_delay_to{*timepoint}; | |
if (auto* value = std::get_if<T>(&value_or_duration)) | |
handler.state->get_observer_under_lock()->on_next(std::move(*value)); | |
handler.disposable->get_observer_under_lock()->on_next(std::move(*value)); | |
[](const debounce_disposable_wrapper<Observer, Worker, Container>& handler) -> schedulers::optional_delay_to { | |
if (!handler.disposable) | |
return std::nullopt; | |
auto value_or_duration = handler.disposable->extract_value_or_time(); | |
if (auto* timepoint = std::get_if<schedulers::time_point>(&value_or_duration)) | |
return schedulers::optional_delay_to{*timepoint}; | |
if (auto* value = std::get_if<T>(&value_or_duration)) | |
handler.disposable->get_observer_under_lock()->on_next(std::move(*value)); |
private: | ||
bool handle_observable_impl(const rpp::constraint::decayed_same_as<TObservable> auto& observable, rpp::composite_disposable_wrapper refcounted) | ||
{ | ||
stage().store(ConcatStage::Draining, std::memory_order::relaxed); | ||
refcounted.clear(); | ||
observable.subscribe(concat_inner_observer_strategy<TObservable, TObserver>{this->shared_from_this(), std::move(refcounted)}); | ||
observable.subscribe(concat_inner_observer_strategy<TObservable, TObserver>{disposable_wrapper_impl<concat_disposable>{wrapper_from_this()}.lock(), std::move(refcounted)}); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Verify safe usage of wrapper_from_this()
Ensure that wrapper_from_this()
is called only when this
is managed by a shared_ptr
. Using it otherwise can lead to undefined behavior. Confirm that instances of concat_disposable
are always owned by a shared_ptr
.
static std::shared_ptr<concat_disposable<TObservable, TObserver>> init_state(TObserver&& observer) | ||
{ | ||
const auto d = disposable_wrapper_impl<concat_disposable<TObservable, TObserver>>::make(std::move(observer)); | ||
auto ptr = d.lock(); | ||
ptr->get_observer()->set_upstream(d.as_weak()); | ||
return ptr; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ensure TObserver
supports set_upstream
method
In init_state
, ptr->get_observer()->set_upstream(d.as_weak());
assumes that TObserver
has a set_upstream
method. Verify that all possible TObserver
types support this method, or consider adding a constraint to enforce this requirement.
BENCHMARK RESULTS (AUTOGENERATED)
|
name | rxcpp | rpp | prev rpp | ratio |
---|---|---|---|---|
Subscribe empty callbacks to empty observable | 300.97 ns | 1.86 ns | 1.91 ns | 0.98 |
Subscribe empty callbacks to empty observable via pipe operator | 302.13 ns | 1.85 ns | 1.85 ns | 1.00 |
Sources
name | rxcpp | rpp | prev rpp | ratio |
---|---|---|---|---|
from array of 1 - create + subscribe + immediate | 692.72 ns | 0.31 ns | 0.31 ns | 1.00 |
from array of 1 - create + subscribe + current_thread | 1029.04 ns | 3.42 ns | 3.71 ns | 0.92 |
concat_as_source of just(1 immediate) create + subscribe | 2215.90 ns | 94.95 ns | 143.09 ns | 0.66 |
defer from array of 1 - defer + create + subscribe + immediate | 731.78 ns | 0.32 ns | 0.31 ns | 1.04 |
interval - interval + take(3) + subscribe + immediate | 2183.02 ns | 59.19 ns | 59.19 ns | 1.00 |
interval - interval + take(3) + subscribe + current_thread | 3008.75 ns | 32.43 ns | 32.40 ns | 1.00 |
from array of 1 - create + as_blocking + subscribe + new_thread | 30715.03 ns | 28893.20 ns | 35689.73 ns | 0.81 |
from array of 1000 - create + as_blocking + subscribe + new_thread | 48122.12 ns | 53439.47 ns | 48367.14 ns | 1.10 |
concat_as_source of just(1 immediate) and just(1,2 immediate)create + subscribe | 3514.38 ns | 114.04 ns | 213.15 ns | 0.54 |
Filtering Operators
name | rxcpp | rpp | prev rpp | ratio |
---|---|---|---|---|
immediate_just+take(1)+subscribe | 1084.75 ns | 0.31 ns | 0.31 ns | 1.00 |
immediate_just+filter(true)+subscribe | 834.21 ns | 0.31 ns | 0.31 ns | 1.00 |
immediate_just(1,2)+skip(1)+subscribe | 1001.38 ns | 0.31 ns | 0.31 ns | 1.00 |
immediate_just(1,1,2)+distinct_until_changed()+subscribe | 886.83 ns | 0.31 ns | 0.31 ns | 1.00 |
immediate_just(1,2)+first()+subscribe | 1233.65 ns | 0.31 ns | 0.31 ns | 1.00 |
immediate_just(1,2)+last()+subscribe | 902.41 ns | 0.31 ns | 0.31 ns | 1.00 |
immediate_just+take_last(1)+subscribe | 1115.41 ns | 18.21 ns | 17.90 ns | 1.02 |
immediate_just(1,2,3)+element_at(1)+subscribe | 831.48 ns | 0.31 ns | 0.31 ns | 1.00 |
Schedulers
name | rxcpp | rpp | prev rpp | ratio |
---|---|---|---|---|
immediate scheduler create worker + schedule | 267.20 ns | 1.54 ns | 0.46 ns | 3.33 |
current_thread scheduler create worker + schedule | 388.07 ns | 4.64 ns | 4.32 ns | 1.07 |
current_thread scheduler create worker + schedule + recursive schedule | 812.46 ns | 60.61 ns | 62.03 ns | 0.98 |
Transforming Operators
name | rxcpp | rpp | prev rpp | ratio |
---|---|---|---|---|
immediate_just+map(v*2)+subscribe | 874.49 ns | 0.31 ns | 0.31 ns | 1.00 |
immediate_just+scan(10, std::plus)+subscribe | 912.34 ns | 0.31 ns | 0.31 ns | 1.00 |
immediate_just+flat_map(immediate_just(v*2))+subscribe | 2322.34 ns | 131.31 ns | 181.15 ns | 0.72 |
immediate_just+buffer(2)+subscribe | 1536.69 ns | 13.90 ns | 13.58 ns | 1.02 |
immediate_just+window(2)+subscribe + subscsribe inner | 2381.94 ns | 1086.36 ns | 1296.56 ns | 0.84 |
Conditional Operators
name | rxcpp | rpp | prev rpp | ratio |
---|---|---|---|---|
immediate_just+take_while(false)+subscribe | 848.86 ns | - | - | 0.00 |
immediate_just+take_while(true)+subscribe | 854.79 ns | 0.31 ns | 0.31 ns | 1.00 |
Utility Operators
name | rxcpp | rpp | prev rpp | ratio |
---|---|---|---|---|
immediate_just(1)+subscribe_on(immediate)+subscribe | 1975.66 ns | 0.31 ns | 0.31 ns | 1.00 |
Combining Operators
name | rxcpp | rpp | prev rpp | ratio |
---|---|---|---|---|
immediate_just(immediate_just(1), immediate_just(1)) + merge() + subscribe | 3391.23 ns | 147.75 ns | 235.49 ns | 0.63 |
immediate_just(1) + merge_with(immediate_just(2)) + subscribe | 3619.37 ns | 150.88 ns | 174.91 ns | 0.86 |
immediate_just(1) + with_latest_from(immediate_just(2)) + subscribe | - | 129.34 ns | 182.39 ns | 0.71 |
immediate_just(immediate_just(1),immediate_just(1)) + switch_on_next() + subscribe | 3575.85 ns | 919.82 ns | 1396.24 ns | 0.66 |
immediate_just(1) + zip(immediate_just(2)) + subscribe | 2157.75 ns | 256.98 ns | 224.29 ns | 1.15 |
Subjects
name | rxcpp | rpp | prev rpp | ratio |
---|---|---|---|---|
publish_subject with 1 observer - on_next | 34.56 ns | 14.66 ns | 14.68 ns | 1.00 |
subscribe 100 observers to publish_subject | 197323.40 ns | 15209.79 ns | 16221.46 ns | 0.94 |
100 on_next to 100 observers to publish_subject | 27798.49 ns | 17238.52 ns | 17423.90 ns | 0.99 |
Scenarios
name | rxcpp | rpp | prev rpp | ratio |
---|---|---|---|---|
basic sample | 1405.64 ns | 12.97 ns | 12.96 ns | 1.00 |
basic sample with immediate scheduler | 1408.12 ns | 5.55 ns | 5.24 ns | 1.06 |
Aggregating Operators
name | rxcpp | rpp | prev rpp | ratio |
---|---|---|---|---|
immediate_just+reduce(10, std::plus)+subscribe | 953.45 ns | 0.31 ns | 0.31 ns | 1.00 |
Error Handling Operators
name | rxcpp | rpp | prev rpp | ratio |
---|---|---|---|---|
create(on_next(1), on_error())+on_error_resume_next(immediate_just(2)))+subscribe | 2009.17 ns | 993.90 ns | 900.02 ns | 1.10 |
create(on_error())+retry(1)+subscribe | 587.43 ns | 98.56 ns | 120.89 ns | 0.82 |
ci-macos
General
name | rxcpp | rpp | prev rpp | ratio |
---|---|---|---|---|
Subscribe empty callbacks to empty observable | 1200.73 ns | 0.54 ns | 0.47 ns | 1.15 |
Subscribe empty callbacks to empty observable via pipe operator | 1268.79 ns | 0.57 ns | 0.47 ns | 1.23 |
Sources
name | rxcpp | rpp | prev rpp | ratio |
---|---|---|---|---|
from array of 1 - create + subscribe + immediate | 2243.56 ns | 0.28 ns | 0.23 ns | 1.19 |
from array of 1 - create + subscribe + current_thread | 2949.42 ns | 38.71 ns | 31.94 ns | 1.21 |
concat_as_source of just(1 immediate) create + subscribe | 7560.51 ns | 417.12 ns | 412.55 ns | 1.01 |
defer from array of 1 - defer + create + subscribe + immediate | 2749.75 ns | 0.31 ns | 0.23 ns | 1.35 |
interval - interval + take(3) + subscribe + immediate | 6616.61 ns | 163.71 ns | 110.60 ns | 1.48 |
interval - interval + take(3) + subscribe + current_thread | 7484.31 ns | 133.13 ns | 94.64 ns | 1.41 |
from array of 1 - create + as_blocking + subscribe + new_thread | 103975.90 ns | 102889.78 ns | 79258.86 ns | 1.30 |
from array of 1000 - create + as_blocking + subscribe + new_thread | 118261.67 ns | 110464.89 ns | 89848.46 ns | 1.23 |
concat_as_source of just(1 immediate) and just(1,2 immediate)create + subscribe | 11548.92 ns | 522.02 ns | 584.02 ns | 0.89 |
Filtering Operators
name | rxcpp | rpp | prev rpp | ratio |
---|---|---|---|---|
immediate_just+take(1)+subscribe | 2876.04 ns | 0.23 ns | 0.22 ns | 1.05 |
immediate_just+filter(true)+subscribe | 2254.13 ns | 0.24 ns | 0.22 ns | 1.08 |
immediate_just(1,2)+skip(1)+subscribe | 2746.94 ns | 0.23 ns | 0.22 ns | 1.05 |
immediate_just(1,1,2)+distinct_until_changed()+subscribe | 2103.11 ns | 0.47 ns | 0.45 ns | 1.05 |
immediate_just(1,2)+first()+subscribe | 3797.82 ns | 0.26 ns | 0.22 ns | 1.16 |
immediate_just(1,2)+last()+subscribe | 3085.35 ns | 0.30 ns | 0.22 ns | 1.34 |
immediate_just+take_last(1)+subscribe | 3488.73 ns | 0.26 ns | 0.22 ns | 1.16 |
immediate_just(1,2,3)+element_at(1)+subscribe | 2119.55 ns | 0.23 ns | 0.22 ns | 1.05 |
Schedulers
name | rxcpp | rpp | prev rpp | ratio |
---|---|---|---|---|
immediate scheduler create worker + schedule | 1167.21 ns | 1.28 ns | 0.46 ns | 2.81 |
current_thread scheduler create worker + schedule | 1288.15 ns | 39.54 ns | 32.90 ns | 1.20 |
current_thread scheduler create worker + schedule + recursive schedule | 2149.28 ns | 214.38 ns | 193.93 ns | 1.11 |
Transforming Operators
name | rxcpp | rpp | prev rpp | ratio |
---|---|---|---|---|
immediate_just+map(v*2)+subscribe | 2453.57 ns | 4.54 ns | 4.01 ns | 1.13 |
immediate_just+scan(10, std::plus)+subscribe | 2908.33 ns | 0.57 ns | 0.45 ns | 1.29 |
immediate_just+flat_map(immediate_just(v*2))+subscribe | 6827.22 ns | 425.27 ns | 474.56 ns | 0.90 |
immediate_just+buffer(2)+subscribe | 2483.81 ns | 73.37 ns | 61.98 ns | 1.18 |
immediate_just+window(2)+subscribe + subscsribe inner | 5491.50 ns | 2356.71 ns | 2275.83 ns | 1.04 |
Conditional Operators
name | rxcpp | rpp | prev rpp | ratio |
---|---|---|---|---|
immediate_just+take_while(false)+subscribe | 2332.24 ns | - | - | 0.00 |
immediate_just+take_while(true)+subscribe | 2266.48 ns | 0.23 ns | 0.22 ns | 1.05 |
Utility Operators
name | rxcpp | rpp | prev rpp | ratio |
---|---|---|---|---|
immediate_just(1)+subscribe_on(immediate)+subscribe | 4913.54 ns | 5.13 ns | 4.90 ns | 1.05 |
Combining Operators
name | rxcpp | rpp | prev rpp | ratio |
---|---|---|---|---|
immediate_just(immediate_just(1), immediate_just(1)) + merge() + subscribe | 8669.72 ns | 447.38 ns | 560.05 ns | 0.80 |
immediate_just(1) + merge_with(immediate_just(2)) + subscribe | 9231.66 ns | 454.04 ns | 475.33 ns | 0.96 |
immediate_just(1) + with_latest_from(immediate_just(2)) + subscribe | - | 455.68 ns | 549.60 ns | 0.83 |
immediate_just(immediate_just(1),immediate_just(1)) + switch_on_next() + subscribe | 7887.61 ns | 1876.96 ns | 1862.26 ns | 1.01 |
immediate_just(1) + zip(immediate_just(2)) + subscribe | 5478.57 ns | 868.46 ns | 850.41 ns | 1.02 |
Subjects
name | rxcpp | rpp | prev rpp | ratio |
---|---|---|---|---|
publish_subject with 1 observer - on_next | 78.99 ns | 51.26 ns | 47.62 ns | 1.08 |
subscribe 100 observers to publish_subject | 516060.00 ns | 44730.95 ns | 38557.89 ns | 1.16 |
100 on_next to 100 observers to publish_subject | 61647.79 ns | 25605.17 ns | 21629.23 ns | 1.18 |
Scenarios
name | rxcpp | rpp | prev rpp | ratio |
---|---|---|---|---|
basic sample | 3198.20 ns | 79.68 ns | 72.82 ns | 1.09 |
basic sample with immediate scheduler | 3415.79 ns | 20.27 ns | 18.83 ns | 1.08 |
Aggregating Operators
name | rxcpp | rpp | prev rpp | ratio |
---|---|---|---|---|
immediate_just+reduce(10, std::plus)+subscribe | 2373.53 ns | 0.23 ns | 0.23 ns | 1.02 |
Error Handling Operators
name | rxcpp | rpp | prev rpp | ratio |
---|---|---|---|---|
create(on_next(1), on_error())+on_error_resume_next(immediate_just(2)))+subscribe | 6792.05 ns | 4202.30 ns | 3777.43 ns | 1.11 |
create(on_error())+retry(1)+subscribe | 1850.55 ns | 288.55 ns | 366.79 ns | 0.79 |
ci-ubuntu-clang
General
name | rxcpp | rpp | prev rpp | ratio |
---|---|---|---|---|
Subscribe empty callbacks to empty observable | 269.65 ns | 0.64 ns | 0.63 ns | 1.02 |
Subscribe empty callbacks to empty observable via pipe operator | 269.84 ns | 0.63 ns | 0.63 ns | 1.00 |
Sources
name | rxcpp | rpp | prev rpp | ratio |
---|---|---|---|---|
from array of 1 - create + subscribe + immediate | 566.41 ns | 0.31 ns | 0.31 ns | 0.99 |
from array of 1 - create + subscribe + current_thread | 799.55 ns | 4.01 ns | 4.63 ns | 0.87 |
concat_as_source of just(1 immediate) create + subscribe | 2379.43 ns | 128.65 ns | 175.41 ns | 0.73 |
defer from array of 1 - defer + create + subscribe + immediate | 865.25 ns | 0.31 ns | 0.31 ns | 1.00 |
interval - interval + take(3) + subscribe + immediate | 2527.79 ns | 59.63 ns | 58.26 ns | 1.02 |
interval - interval + take(3) + subscribe + current_thread | 3195.66 ns | 32.26 ns | 30.88 ns | 1.04 |
from array of 1 - create + as_blocking + subscribe + new_thread | 28923.81 ns | 28605.48 ns | 27763.87 ns | 1.03 |
from array of 1000 - create + as_blocking + subscribe + new_thread | 37753.72 ns | 39900.77 ns | 36403.53 ns | 1.10 |
concat_as_source of just(1 immediate) and just(1,2 immediate)create + subscribe | 3737.18 ns | 147.77 ns | 296.56 ns | 0.50 |
Filtering Operators
name | rxcpp | rpp | prev rpp | ratio |
---|---|---|---|---|
immediate_just+take(1)+subscribe | 1220.39 ns | 0.31 ns | 0.31 ns | 1.00 |
immediate_just+filter(true)+subscribe | 865.66 ns | 0.31 ns | 0.31 ns | 1.00 |
immediate_just(1,2)+skip(1)+subscribe | 1157.95 ns | 0.31 ns | 0.31 ns | 1.00 |
immediate_just(1,1,2)+distinct_until_changed()+subscribe | 913.19 ns | 0.62 ns | 0.31 ns | 2.00 |
immediate_just(1,2)+first()+subscribe | 1399.05 ns | 0.31 ns | 0.31 ns | 1.00 |
immediate_just(1,2)+last()+subscribe | 1039.57 ns | 0.31 ns | 0.31 ns | 1.00 |
immediate_just+take_last(1)+subscribe | 1229.46 ns | 0.31 ns | 0.31 ns | 1.00 |
immediate_just(1,2,3)+element_at(1)+subscribe | 881.71 ns | 0.31 ns | 0.31 ns | 1.00 |
Schedulers
name | rxcpp | rpp | prev rpp | ratio |
---|---|---|---|---|
immediate scheduler create worker + schedule | 282.92 ns | 1.54 ns | 1.54 ns | 1.00 |
current_thread scheduler create worker + schedule | 419.27 ns | 4.01 ns | 4.32 ns | 0.93 |
current_thread scheduler create worker + schedule + recursive schedule | 872.14 ns | 55.89 ns | 55.43 ns | 1.01 |
Transforming Operators
name | rxcpp | rpp | prev rpp | ratio |
---|---|---|---|---|
immediate_just+map(v*2)+subscribe | 867.71 ns | 0.31 ns | 0.31 ns | 1.00 |
immediate_just+scan(10, std::plus)+subscribe | 995.98 ns | 0.31 ns | 0.31 ns | 1.00 |
immediate_just+flat_map(immediate_just(v*2))+subscribe | 2260.12 ns | 140.83 ns | 229.22 ns | 0.61 |
immediate_just+buffer(2)+subscribe | 1574.87 ns | 13.59 ns | 14.20 ns | 0.96 |
immediate_just+window(2)+subscribe + subscsribe inner | 2467.75 ns | 902.83 ns | 909.27 ns | 0.99 |
Conditional Operators
name | rxcpp | rpp | prev rpp | ratio |
---|---|---|---|---|
immediate_just+take_while(false)+subscribe | 876.24 ns | - | - | 0.00 |
immediate_just+take_while(true)+subscribe | 869.17 ns | 0.31 ns | 0.31 ns | 1.00 |
Utility Operators
name | rxcpp | rpp | prev rpp | ratio |
---|---|---|---|---|
immediate_just(1)+subscribe_on(immediate)+subscribe | 2022.87 ns | 0.31 ns | 0.31 ns | 1.00 |
Combining Operators
name | rxcpp | rpp | prev rpp | ratio |
---|---|---|---|---|
immediate_just(immediate_just(1), immediate_just(1)) + merge() + subscribe | 3266.78 ns | 153.81 ns | 281.66 ns | 0.55 |
immediate_just(1) + merge_with(immediate_just(2)) + subscribe | 3718.39 ns | 137.49 ns | 210.18 ns | 0.65 |
immediate_just(1) + with_latest_from(immediate_just(2)) + subscribe | - | 142.53 ns | 194.38 ns | 0.73 |
immediate_just(immediate_just(1),immediate_just(1)) + switch_on_next() + subscribe | 3382.70 ns | 829.42 ns | 833.25 ns | 1.00 |
immediate_just(1) + zip(immediate_just(2)) + subscribe | 2228.96 ns | 199.82 ns | 192.28 ns | 1.04 |
Subjects
name | rxcpp | rpp | prev rpp | ratio |
---|---|---|---|---|
publish_subject with 1 observer - on_next | 52.54 ns | 17.73 ns | 17.43 ns | 1.02 |
subscribe 100 observers to publish_subject | 209533.60 ns | 16012.59 ns | 16105.46 ns | 0.99 |
100 on_next to 100 observers to publish_subject | 46933.72 ns | 23590.61 ns | 23437.80 ns | 1.01 |
Scenarios
name | rxcpp | rpp | prev rpp | ratio |
---|---|---|---|---|
basic sample | 1312.77 ns | 11.42 ns | 12.34 ns | 0.93 |
basic sample with immediate scheduler | 1297.85 ns | 5.86 ns | 5.86 ns | 1.00 |
Aggregating Operators
name | rxcpp | rpp | prev rpp | ratio |
---|---|---|---|---|
immediate_just+reduce(10, std::plus)+subscribe | 1043.68 ns | 0.31 ns | 0.31 ns | 1.00 |
Error Handling Operators
name | rxcpp | rpp | prev rpp | ratio |
---|---|---|---|---|
create(on_next(1), on_error())+on_error_resume_next(immediate_just(2)))+subscribe | 2220.91 ns | 1156.87 ns | 995.35 ns | 1.16 |
create(on_error())+retry(1)+subscribe | 679.65 ns | 140.22 ns | 156.25 ns | 0.90 |
ci-windows
General
name | rxcpp | rpp | prev rpp | ratio |
---|---|---|---|---|
Subscribe empty callbacks to empty observable | 574.19 ns | 2.16 ns | 2.16 ns | 1.00 |
Subscribe empty callbacks to empty observable via pipe operator | 588.63 ns | 2.16 ns | 2.16 ns | 1.00 |
Sources
name | rxcpp | rpp | prev rpp | ratio |
---|---|---|---|---|
from array of 1 - create + subscribe + immediate | 1187.84 ns | 5.24 ns | 5.24 ns | 1.00 |
from array of 1 - create + subscribe + current_thread | 1447.18 ns | 15.44 ns | 15.43 ns | 1.00 |
concat_as_source of just(1 immediate) create + subscribe | 3676.70 ns | 174.97 ns | 239.25 ns | 0.73 |
defer from array of 1 - defer + create + subscribe + immediate | 1190.39 ns | 5.24 ns | 4.93 ns | 1.06 |
interval - interval + take(3) + subscribe + immediate | 3584.56 ns | 140.63 ns | 138.82 ns | 1.01 |
interval - interval + take(3) + subscribe + current_thread | 3464.19 ns | 59.80 ns | 59.23 ns | 1.01 |
from array of 1 - create + as_blocking + subscribe + new_thread | 120037.50 ns | 115322.22 ns | 115570.00 ns | 1.00 |
from array of 1000 - create + as_blocking + subscribe + new_thread | 130225.00 ns | 131655.56 ns | 132362.50 ns | 0.99 |
concat_as_source of just(1 immediate) and just(1,2 immediate)create + subscribe | 5298.45 ns | 211.58 ns | 306.04 ns | 0.69 |
Filtering Operators
name | rxcpp | rpp | prev rpp | ratio |
---|---|---|---|---|
immediate_just+take(1)+subscribe | 1803.19 ns | 19.73 ns | 19.43 ns | 1.02 |
immediate_just+filter(true)+subscribe | 1322.33 ns | 18.81 ns | 18.50 ns | 1.02 |
immediate_just(1,2)+skip(1)+subscribe | 1725.49 ns | 18.52 ns | 17.90 ns | 1.03 |
immediate_just(1,1,2)+distinct_until_changed()+subscribe | 1636.16 ns | 23.44 ns | 20.82 ns | 1.13 |
immediate_just(1,2)+first()+subscribe | 2045.36 ns | 17.29 ns | 18.21 ns | 0.95 |
immediate_just(1,2)+last()+subscribe | 1768.50 ns | 18.52 ns | 19.12 ns | 0.97 |
immediate_just+take_last(1)+subscribe | 2000.88 ns | 65.23 ns | 63.86 ns | 1.02 |
immediate_just(1,2,3)+element_at(1)+subscribe | 1336.92 ns | 21.90 ns | 20.97 ns | 1.04 |
Schedulers
name | rxcpp | rpp | prev rpp | ratio |
---|---|---|---|---|
immediate scheduler create worker + schedule | 478.86 ns | 4.32 ns | 4.32 ns | 1.00 |
current_thread scheduler create worker + schedule | 661.26 ns | 11.43 ns | 11.40 ns | 1.00 |
current_thread scheduler create worker + schedule + recursive schedule | 1093.51 ns | 99.02 ns | 101.03 ns | 0.98 |
Transforming Operators
name | rxcpp | rpp | prev rpp | ratio |
---|---|---|---|---|
immediate_just+map(v*2)+subscribe | 1316.92 ns | 18.82 ns | 18.82 ns | 1.00 |
immediate_just+scan(10, std::plus)+subscribe | 1534.33 ns | 21.69 ns | 20.98 ns | 1.03 |
immediate_just+flat_map(immediate_just(v*2))+subscribe | 3467.99 ns | 190.83 ns | 271.40 ns | 0.70 |
immediate_just+buffer(2)+subscribe | 2646.06 ns | 65.87 ns | 63.64 ns | 1.03 |
immediate_just+window(2)+subscribe + subscsribe inner | 3991.35 ns | 1297.83 ns | 1322.34 ns | 0.98 |
Conditional Operators
name | rxcpp | rpp | prev rpp | ratio |
---|---|---|---|---|
immediate_just+take_while(false)+subscribe | 1605.24 ns | 17.58 ns | 17.58 ns | 1.00 |
immediate_just+take_while(true)+subscribe | 1331.26 ns | 18.82 ns | 18.50 ns | 1.02 |
Utility Operators
name | rxcpp | rpp | prev rpp | ratio |
---|---|---|---|---|
immediate_just(1)+subscribe_on(immediate)+subscribe | 3161.61 ns | 11.10 ns | 11.10 ns | 1.00 |
Combining Operators
name | rxcpp | rpp | prev rpp | ratio |
---|---|---|---|---|
immediate_just(immediate_just(1), immediate_just(1)) + merge() + subscribe | 5057.64 ns | 193.07 ns | 296.88 ns | 0.65 |
immediate_just(1) + merge_with(immediate_just(2)) + subscribe | 5359.69 ns | 186.52 ns | 268.90 ns | 0.69 |
immediate_just(1) + with_latest_from(immediate_just(2)) + subscribe | - | 190.78 ns | 302.35 ns | 0.63 |
immediate_just(immediate_just(1),immediate_just(1)) + switch_on_next() + subscribe | 5354.93 ns | 962.92 ns | 959.20 ns | 1.00 |
immediate_just(1) + zip(immediate_just(2)) + subscribe | 3548.10 ns | 511.38 ns | 561.49 ns | 0.91 |
Subjects
name | rxcpp | rpp | prev rpp | ratio |
---|---|---|---|---|
publish_subject with 1 observer - on_next | 36.68 ns | 19.58 ns | 20.69 ns | 0.95 |
subscribe 100 observers to publish_subject | 262850.00 ns | 26452.27 ns | 28441.67 ns | 0.93 |
100 on_next to 100 observers to publish_subject | 54800.00 ns | 32606.45 ns | 35741.38 ns | 0.91 |
Scenarios
name | rxcpp | rpp | prev rpp | ratio |
---|---|---|---|---|
basic sample | 1881.10 ns | 95.68 ns | 96.80 ns | 0.99 |
basic sample with immediate scheduler | 2269.02 ns | 68.56 ns | 68.74 ns | 1.00 |
Aggregating Operators
name | rxcpp | rpp | prev rpp | ratio |
---|---|---|---|---|
immediate_just+reduce(10, std::plus)+subscribe | 1464.63 ns | 19.42 ns | 19.13 ns | 1.02 |
Error Handling Operators
name | rxcpp | rpp | prev rpp | ratio |
---|---|---|---|---|
create(on_next(1), on_error())+on_error_resume_next(immediate_just(2)))+subscribe | 2164.99 ns | 358.46 ns | 241.08 ns | 1.49 |
create(on_error())+retry(1)+subscribe | 1702.92 ns | 145.53 ns | 196.97 ns | 0.74 |
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## v2 #673 +/- ##
==========================================
- Coverage 98.48% 98.47% -0.01%
==========================================
Files 155 155
Lines 9678 9695 +17
==========================================
+ Hits 9531 9547 +16
- Misses 147 148 +1 ☔ View full report in Codecov by Sentry. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 8
🧹 Outside diff range and nitpick comments (6)
src/rpp/rpp/disposables/refcount_disposable.hpp (1)
Line range hint
94-106
: LGTM! The simplified implementation improves code clarity.The changes maintain thread safety while removing unnecessary complexity:
- Atomic operations ensure thread-safe reference counting
- Proper handling of disposed state prevents use-after-free
- Consistent use of weak references prevents reference cycles
Consider documenting the thread-safety guarantees and memory ordering requirements in the header, as these are critical for users implementing custom disposables.
src/rpp/rpp/operators/take_until.hpp (2)
Line range hint
23-44
: Well-structured disposable implementation with proper synchronization.The implementation demonstrates good practices:
- Thread-safe state management using atomic boolean
- Protected observer access using mutex
- Clear separation of concerns with composite disposable inheritance
Consider documenting the thread-safety guarantees this class provides, especially regarding the interaction between the composite disposable operations and the observer access.
107-107
: LGTM: Optimized disposables strategy.The change to
fixed_disposables_strategy<1>
provides better resource management by:
- Making the capacity requirement explicit
- Preventing potential memory leaks from unbounded growth
- Optimizing for the specific use case
This optimization assumes the operator never needs more than one disposable. Consider documenting this assumption in the code comments to prevent future misunderstandings.
src/rpp/rpp/operators/details/combining_strategy.hpp (1)
Line range hint
1-123
: Consider adding unit tests.Given the significant changes to the disposable management and the introduction of the
combining_disposable
class, it would be beneficial to add unit tests to ensure the correctness of the new implementation and prevent regressions.Do you want me to generate the unit testing code or open a GitHub issue to track this task?
src/rpp/rpp/sources/concat.hpp (2)
72-72
: Avoid redundant disposal checks inis_disposed()
.The
is_disposed()
method checks bothlocally_disposed
andstate->is_disposed()
. Given thatstate
represents the shared state and already tracks disposal, consider simplifying this method to reduce complexity.Apply this diff to streamline
is_disposed()
:bool is_disposed() const { - return locally_disposed || state->is_disposed(); + return state->is_disposed(); }
134-134
: Usemake_shared
for exception safety and efficiency.When creating shared pointers,
std::make_shared
is more efficient and provides exception safety. Consider using it when instantiatingconcat_state_t
.Apply this diff to use
make_shared
:-const auto d = disposable_wrapper_impl<concat_state_t<observer<value_type, Strategy>, PackedContainer>>::make(std::move(obs), container); +const auto d = std::make_shared<concat_state_t<observer<value_type, Strategy>, PackedContainer>>(std::move(obs), container);
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
📒 Files selected for processing (16)
src/rpp/rpp/disposables/refcount_disposable.hpp
(3 hunks)src/rpp/rpp/observables/details/chain_strategy.hpp
(1 hunks)src/rpp/rpp/operators/combine_latest.hpp
(2 hunks)src/rpp/rpp/operators/concat.hpp
(5 hunks)src/rpp/rpp/operators/debounce.hpp
(4 hunks)src/rpp/rpp/operators/delay.hpp
(5 hunks)src/rpp/rpp/operators/details/combining_strategy.hpp
(5 hunks)src/rpp/rpp/operators/merge.hpp
(5 hunks)src/rpp/rpp/operators/on_error_resume_next.hpp
(2 hunks)src/rpp/rpp/operators/retry.hpp
(4 hunks)src/rpp/rpp/operators/retry_when.hpp
(5 hunks)src/rpp/rpp/operators/take_until.hpp
(4 hunks)src/rpp/rpp/operators/with_latest_from.hpp
(5 hunks)src/rpp/rpp/operators/zip.hpp
(3 hunks)src/rpp/rpp/sources/concat.hpp
(4 hunks)src/tests/utils/disposable_observable.hpp
(1 hunks)
🔇 Additional comments (90)
src/rpp/rpp/disposables/refcount_disposable.hpp (1)
57-57
: Verify the impact of removing the Mode parameter.
The removal of the Mode
parameter from add_ref()
is a breaking change that simplifies the API. While this aligns with the goal of stabilizing disposables logic, we should verify that all call sites have been updated.
✅ Verification successful
Based on the search results, I can now generate the final response:
Breaking change verification completed - all call sites are compatible
The removal of the Mode
parameter from add_ref()
has been verified across the codebase. All existing call sites are already using the parameterless version of add_ref()
, indicating this change is safe and doesn't require any updates to the calling code.
Key findings:
- All 16 occurrences of
add_ref()
calls found in the codebase are using the method without parameters - No instances of
add_ref()
withMode
parameter were found - Usage spans across tests, operators, and observables, all compatible with the new signature
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Description: Find all calls to add_ref() to ensure they've been updated
# Test: Search for add_ref calls with Mode parameter. Expect: No results.
rg -A 2 'add_ref\s*\([^)]*Mode'
# Test: Search for all add_ref calls to verify they work with new signature
rg -A 2 'add_ref\s*\(\s*\)'
Length of output: 4837
src/rpp/rpp/observables/details/chain_strategy.hpp (1)
55-58
: LGTM! Good addition of compile-time type safety.
The static assertion ensures that TStrategy
supports the operator_subscribe
constraint when other subscription strategies are not available. This change aligns well with the PR's objective of stabilizing disposables logic by catching potential type mismatches at compile-time.
src/rpp/rpp/operators/take_until.hpp (4)
16-16
: LGTM: Required include for composite_disposable inheritance.
The addition of this include statement is necessary to support the new inheritance hierarchy.
52-52
: LGTM: Consistent with the new disposables architecture.
The type change properly reflects the shift from state-based to disposable-based management.
66-68
: LGTM: Simplified disposable management.
The changes reduce indirection and provide clearer ownership semantics by directly utilizing the disposable functionality.
112-114
: LGTM: Proper disposable initialization with clear ownership semantics.
The implementation correctly manages disposable lifecycle and observer initialization.
Let's verify that this pattern is consistently applied across other operators:
✅ Verification successful
The previous pattern might have been too strict. Let's try these variations to get a better understanding of how disposables are initialized across the codebase.
Consistent disposable initialization pattern confirmed across operators
The search results show that the disposable initialization pattern used in take_until.hpp
is consistently applied across multiple operators in the codebase, including:
with_latest_from.hpp
merge.hpp
delay.hpp
timeout.hpp
concat.hpp
switch_on_next.hpp
retry.hpp
All these operators follow the same pattern:
- Create disposable using
disposable_wrapper_impl<T>::make()
- Lock the disposable to get a pointer
- Set the upstream connection using
set_upstream()
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Search for similar disposable initialization patterns in other operators
ast-grep --pattern 'disposable_wrapper_impl<$_>::make($_)'
Length of output: 60
Script:
#!/bin/bash
# Let's try a more specific search pattern for disposable initialization
ast-grep --pattern 'const auto $_ = disposable_wrapper_impl<$_>::make($_)'
# Also search for similar patterns with different variable names
ast-grep --pattern 'auto $_ = disposable_wrapper_impl<$_>::make($_)'
# Let's also check for the general usage pattern across files
rg "disposable_wrapper_impl" -A 2
Length of output: 16681
src/tests/utils/disposable_observable.hpp (1)
Line range hint 85-97
: LGTM! Test case improvements enhance disposal timing verification.
The renamed test case and its implementation effectively verify the proper disposal timing, ensuring that:
- Resources aren't disposed prematurely
- Disposal occurs in the correct order
- Cleanup is deterministic
This aligns well with the PR's objective of stabilizing disposables logic.
src/rpp/rpp/operators/details/combining_strategy.hpp (4)
Line range hint 26-46
: LGTM!
The changes to the combining_disposable
class, including the constructor, member variables, and methods, align with the new class structure and are implemented correctly.
Line range hint 78-123
: LGTM!
The changes to combining_operator_t
, including the updated type parameters, lift
method, and subscribe_impl
method, are implemented correctly and align with the new disposable-based structure.
16-16
: Verify the impact of inheriting from composite_disposable
.
The change from inheriting base_disposable
to composite_disposable
suggests that combining_disposable
now manages multiple disposables. Ensure that this change is consistently applied throughout the codebase and aligns with the intended behavior of the class.
To verify the impact, run the following script:
#!/bin/bash
# Description: Verify the usage and behavior of `combining_disposable` after inheriting from `composite_disposable`.
# Test 1: Search for the declaration of `combining_disposable`. Expect: Should inherit from `composite_disposable`.
rg -A 5 $'class combining_disposable'
# Test 2: Search for the usage of `combining_disposable`. Expect: Should be used consistently with its new behavior.
rg -A 10 $'combining_disposable'
# Test 3: Search for the usage of `add` method. Expect: Should be called on `combining_disposable` instances to add disposables.
rg -A 5 $'combining_disposable(.*)->add'
49-76
: Verify the usage of combining_observer_strategy
with TDisposable
.
The changes to combining_observer_strategy
, replacing TState
with TDisposable
, are consistent with the overall shift towards disposable-based management. However, ensure that the usage of this struct is updated accordingly throughout the codebase.
To verify the usage, run the following script:
✅ Verification successful
The previous search revealed the existence of combining_disposable
, but we need to specifically verify the usage of combining_observer_strategy
to ensure it's being used correctly with TDisposable
. These commands will help us find direct usages and implementations.
The usage of combining_observer_strategy
with TDisposable
is correctly implemented
The verification shows that combining_observer_strategy
is properly used with disposable types in both major implementations:
- In
zip.hpp
withzip_disposable<Observer, TSelector, Args...>
- In
combine_latest.hpp
withcombine_latest_disposable<Observer, TSelector, Args...>
Both implementations inherit from combining_disposable
and use the strategy correctly with their respective disposable types, maintaining consistency with the disposable-based management approach.
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Description: Verify the usage of `combining_observer_strategy` with `TDisposable`.
# Test: Search for the usage of `combining_observer_strategy`. Expect: Should be instantiated with a disposable type.
rg -A 10 $'combining_observer_strategy'
Length of output: 4054
Script:
#!/bin/bash
# Let's search for the usage of combining_observer_strategy specifically
rg -A 5 'combining_observer_strategy'
# Let's also check where it's being used in template instantiations
ast-grep --pattern 'combining_observer_strategy<$_>'
# Let's look at the operator implementations that might use this strategy
rg -A 10 'class.*_operator.*combining'
Length of output: 2042
src/rpp/rpp/operators/on_error_resume_next.hpp (7)
20-30
: Introduction of on_error_resume_next_disposable
enhances disposable management
The new struct on_error_resume_next_disposable
encapsulates the observer and inherits from rpp::composite_disposable
, aligning with the overall design for managing disposables. The use of RPP_NO_UNIQUE_ADDRESS
before the observer member can optimize storage without affecting the functionality.
72-73
: Proper use of shared_ptr
for state
ensures shared ownership
Storing state
as a std::shared_ptr<on_error_resume_next_disposable<TObserver>>
correctly manages the lifetime and ownership, preventing premature disposal while shared across observers.
78-78
: Correctly forwarding on_next
calls to the encapsulated observer
The on_next
method appropriately forwards incoming values to state->observer.on_next
, ensuring that observers receive the emitted items as expected.
91-91
: Verify disposal timing of state
after error handling
Calling state->dispose()
immediately after subscribing to the selector observable might prematurely dispose of the state
, potentially affecting the newly subscribed observable that may rely on state
. Ensure that disposing of state
at this point does not interfere with the continuation of the error handling logic.
Please verify whether disposing state
here is intentional and does not lead to unintended side effects.
96-96
: Correctly forwarding on_completed
to the encapsulated observer
The on_completed
method properly notifies the encapsulated observer by calling state->observer.on_completed()
, ensuring that completion signals are propagated as expected.
101-101
: Proper addition of upstream disposable to the composite state
By adding the upstream disposable to state
using state->add(d)
, the implementation ensures that all disposables are managed collectively, which aids in resource management and prevents leaks.
106-112
: Ensure correct initialization in init_state
and avoid potential weak pointer issues
The init_state
function locks a weak pointer to obtain a shared_ptr
, then sets the upstream of ptr->observer
to a weak reference. While this manages the disposable relationships, there is a possibility of the weak pointers becoming expired, leading to issues when accessing the observer or disposables.
Please verify that d.lock()
will always succeed in this context and that d.as_weak()
provides a valid weak reference for the lifetime required. Consider if there's a need for additional checks or a different ownership model to prevent potential weak pointer expiration.
src/rpp/rpp/operators/zip.hpp (4)
24-28
: LGTM!
The renaming of zip_state
to zip_disposable
and the corresponding changes to the base class and constructor are consistent with the overall refactoring effort to manage disposables. The logic remains correct.
52-55
: LGTM!
The changes in the on_next
method to use disposable
instead of state
are consistent with the new class structure. The logic for retrieving the observer and managing pending values remains correct.
71-71
: Ensure the base class change does not introduce runtime issues.
The change in the base class of zip_t
from zip_state
to combining_operator_t<zip_disposable, ...>
aligns with the refactoring of the zipping mechanism. However, thoroughly test this change to ensure that it does not introduce any runtime issues or unexpected behavior.
Verify the correctness of the zip
operator by generating comprehensive unit tests:
// Test case: Zipping observables with custom selector
TEST(ZipOperatorTest, ZipWithCustomSelector) {
// Arrange
auto observable1 = rpp::source::just(1, 2, 3);
auto observable2 = rpp::source::just(4, 5, 6);
auto selector = [](int x, int y) { return std::make_pair(x, y); };
// Act
auto zipped = rpp::operators::zip(selector, observable1, observable2);
auto result = rpp::collect(zipped);
// Assert
EXPECT_EQ(result.size(), 3);
EXPECT_EQ(result[0], std::make_pair(1, 4));
EXPECT_EQ(result[1], std::make_pair(2, 5));
EXPECT_EQ(result[2], std::make_pair(3, 6));
}
// Test case: Zipping observables without custom selector
TEST(ZipOperatorTest, ZipWithoutCustomSelector) {
// Arrange
auto observable1 = rpp::source::just(1, 2, 3);
auto observable2 = rpp::source::just(4, 5, 6);
// Act
auto zipped = rpp::operators::zip(observable1, observable2);
auto result = rpp::collect(zipped);
// Assert
EXPECT_EQ(result.size(), 3);
EXPECT_EQ(result[0], std::make_tuple(1, 4));
EXPECT_EQ(result[1], std::make_tuple(2, 5));
EXPECT_EQ(result[2], std::make_tuple(3, 6));
}
These test cases cover the scenarios of zipping observables with and without a custom selector. They ensure that the zip
operator correctly combines emissions from multiple observables and emits the expected results.
45-47
: Verify the impact of renaming state
to disposable
.
The renaming of state
to disposable
aligns with the new class structure. However, ensure that this change is consistently applied throughout the codebase and does not introduce any unintended side effects or compilation errors.
Run the following script to verify the usage of state
and disposable
:
✅ Verification successful
Based on the search results, I can now generate the final response regarding the renaming of state
to disposable
:
The renaming from state
to disposable
is consistently applied and safe.
The codebase search confirms that:
- The
disposable
member is used consistently in the zip operator and other combining operators through thecombining_observer_strategy
base class - The term
state
is used in other contexts (like test states, subject states) but not in the context of the zip operator or its base classes - The renaming aligns with the codebase's pattern where
disposable
is used to represent the resource management member in observer strategies
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Description: Verify the consistent usage of `disposable` instead of `state`.
# Test 1: Search for occurrences of `state`.
# Expect: Only valid occurrences (e.g., in comments, variable names).
rg -w $'state'
# Test 2: Search for occurrences of `disposable`.
# Expect: Usage in relevant contexts, replacing previous `state` references.
rg -w $'disposable' -A 5
Length of output: 167026
src/rpp/rpp/operators/retry.hpp (6)
21-21
: Inheritance from rpp::composite_disposable
is appropriate
The retry_state_t
structure now inherits from rpp::composite_disposable
, which centralizes disposable management and enhances resource handling within the retry mechanism.
64-64
: Utilizing state->clear()
for disposable management
Replacing manual disposable handling with state->clear();
leverages the inherited functionality from composite_disposable
, ensuring consistent and efficient resource cleanup.
80-80
: Adding disposables using state->add(d)
The addition of state->add(d);
correctly incorporates the upstream disposable into the retry_state_t
, aligning with the updated disposable management approach.
83-83
: Updating is_disposed()
to include state disposal status
Including state->is_disposed()
in the is_disposed()
method ensures that both local and state disposal statuses are accounted for, providing an accurate check of the observer's disposal state.
89-89
: Using state->is_disposed()
in the drain loop condition
Updating the loop condition to while (!state->is_disposed())
ensures the drain
function exits appropriately when the retry_state_t
is disposed, preventing unnecessary iterations.
129-129
: Setting upstream disposable with d.as_weak()
Using ptr->observer.set_upstream(d.as_weak());
sets the observer's upstream disposable to a weak reference of the state disposable, preventing strong reference cycles and aiding in proper disposal.
src/rpp/rpp/operators/retry_when.hpp (6)
26-29
: Inheritance from rpp::composite_disposable
enhances resource management
The retry_when_state
struct now inherits from rpp::composite_disposable
, allowing it to effectively manage disposables within the retry mechanism.
59-59
: Properly clearing disposables in on_next
Calling state->clear();
in the on_next
method ensures that any disposables held by the state are correctly cleared when the notifier emits, preventing potential memory leaks.
79-81
: Correctly setting upstream disposable and checking disposal status
The set_upstream
method now adds the disposable to the state's composite disposable via state->add(d)
, and is_disposed
accurately reflects both local and state disposal statuses with locally_disposed || state->is_disposed()
.
116-118
: Consistent disposable management in retry_when_impl_strategy
Updating set_upstream
to add the disposable to the state's composite disposable and using state->is_disposed()
in is_disposed
ensures consistent management of disposables.
124-124
: Using state disposal status in drain
loop condition
The while (!state->is_disposed())
loop condition in drain
correctly checks for the overall disposal status, ensuring that the loop exits appropriately when disposal occurs.
160-163
: Safe initialization and setup in subscribe
method
Creating the disposable wrapper d
for the retry_when_state
and setting the observer's upstream with ptr->observer.set_upstream(d.as_weak())
ensures proper linkage and disposal management within the subscription.
src/rpp/rpp/operators/debounce.hpp (8)
15-17
: LGTM!
The added includes are necessary for the new debounce_disposable
class and the usage of disposables containers.
34-36
: LGTM!
The debounce_disposable
class now inherits from rpp::composite_disposable_impl<Container>
, which is consistent with the transition to disposable-based management seen in other operators like combine_latest
and concat
.
Line range hint 41-46
: Ensure the debounce_disposable
class maintains the core functionality.
The debounce_disposable
class appears to maintain the same core functionality as its predecessor, including methods for managing emissions and scheduling. The constructor, emplace_safe
, extract_value
, and get_observer_under_lock
methods seem to be properly adapted to the new class structure.
Verify that the debounce_disposable
class maintains the expected behavior by:
- Reviewing the usage of the class in the context of the
debounce
operator. - Examining the interaction between the
debounce_disposable
class and the updateddebounce_observer_strategy
. - Confirming that the scheduling logic remains intact and correctly handles the debounce functionality.
If any discrepancies or potential issues are identified, please provide specific feedback.
Also applies to: 48-62, 64-68, 70-72
74-75
: LGTM!
The schedule
method has been updated to work with the new debounce_disposable_wrapper
and the associated changes in the lambda function signature. The usage of get_observer_under_lock
and wrapper_from_this
is consistent with the new class structure.
Also applies to: 80-80, 84-84
Line range hint 87-102
: LGTM!
The extract_value_or_time
method remains largely unchanged, with the exception of the m_time_when_value_should_be_emitted
and m_value_to_be_emitted
member variables being properly managed within the new debounce_disposable
class.
Also applies to: 104-109
111-112
: LGTM!
The debounce_observer_strategy
has been updated to accommodate the new debounce_disposable
class. All references to the state management have been replaced with references to the disposable management, ensuring consistency with the new class structure.
Also applies to: 116-116, 120-120, 125-125, 131-131, 136-136, 141-144
158-158
: LGTM!
The lift_with_disposables_strategy
method in the debounce_t
struct has been modified to include a disposables strategy, which is consistent with the broader refactoring effort. The creation and management of the debounce_disposable
instance within this method align with the new class structure and the usage of disposables containers.
Also applies to: 163-164, 166-167, 170-173
21-22
: Verify the renaming of debounce_state
to debounce_disposable
in the codebase.
The renaming aligns with the broader refactoring effort to transition from state-based to disposable-based management. However, ensure that all references to debounce_state
have been updated to debounce_disposable
throughout the codebase.
Run the following script to verify the renaming:
Also applies to: 24-25, 27-27, 29-29, 31-31
✅ Verification successful
Based on the search results, I can generate the final response as we have sufficient information to verify the renaming.
The renaming from debounce_state
to debounce_disposable
has been completed successfully
The codebase search confirms:
- No occurrences of the old name
debounce_state
were found - All occurrences of
debounce_disposable
are valid and consistent with the new implementation insrc/rpp/rpp/operators/debounce.hpp
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Description: Verify all references to `debounce_state` have been updated to `debounce_disposable`.
# Test 1: Search for occurrences of `debounce_state`. Expect: No results.
rg $'debounce_state'
# Test 2: Search for occurrences of `debounce_disposable`. Expect: Only valid usages in the context of the new implementation.
rg $'debounce_disposable'
Length of output: 1379
src/rpp/rpp/sources/concat.hpp (1)
77-77
: Verify effects of calling state->clear()
in on_completed()
.
Calling state->clear()
disposes all upstream disposables. Ensure that this does not interfere with the ongoing disposal process or leave any upstream resources undisposed.
Run the following script to check for any disposables added after state->clear()
is called:
✅ Verification successful
From the context we've gathered, I can see that state->clear()
is called within on_completed()
after setting locally_disposed = true
and before attempting to drain any remaining items. The code shows proper synchronization with is_inside_drain
atomic flag to prevent concurrent operations. After clearing, it either returns if inside a drain operation or proceeds to drain remaining items.
Looking at the usage patterns of state
, we can see that:
- No disposables are added after the
clear()
call - The only operations after
clear()
are checking disposal state and draining - The drain operation has proper disposal checks
The use of state->clear()
in on_completed()
appears safe and properly synchronized.
The implementation ensures that:
- All upstream disposables are cleared before proceeding with drain
- No new disposables are added after clearing
- The disposal state is properly checked during drain operations
- Concurrent operations are properly handled with atomic flags
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Description: Ensure no disposables are added to `state` after `state->clear()`.
# Search for any calls to `state->add` after `state->clear()`.
rg -A5 'state->clear\(\)' src/rpp/rpp/sources/concat.hpp | rg 'state->add'
Length of output: 76
Script:
#!/bin/bash
# Let's first see the complete implementation context around state->clear()
rg -B10 -A10 'state->clear\(\)' src/rpp/rpp/sources/concat.hpp
# Also check how state is used throughout the file
rg 'state->' src/rpp/rpp/sources/concat.hpp
# Check the type/class definition of state to understand its clear() method
ast-grep --pattern 'struct composite_subscription {
$$$
void clear() {
$$$
}
$$$
}'
Length of output: 1835
Script:
#!/bin/bash
# Let's check the composite_subscription definition in other files
rg -l "composite_subscription"
# Then examine its implementation
rg -B5 -A5 "struct composite_subscription" --type cpp --type hpp
Length of output: 153
Script:
#!/bin/bash
# Let's find the subscription header file
fd subscription.hpp
# And examine its content
rg -B5 -A10 "struct composite_subscription" src/rpp/rpp/subscription.hpp || true
rg "void clear\(\)" src/rpp/rpp/subscription.hpp || true
Length of output: 299
src/rpp/rpp/operators/concat.hpp (2)
82-82
:
Verify compatibility of wrapper_from_this()
after changing inheritance.
At line 82, wrapper_from_this()
is used within concat_disposable
. Since the inheritance was changed from std::enable_shared_from_this
to rpp::refcount_disposable
, ensure that wrapper_from_this()
still functions correctly. If wrapper_from_this()
relies on std::enable_shared_from_this
, this could lead to runtime errors.
Run the following script to verify the implementation of wrapper_from_this()
in concat_disposable
:
✅ Verification successful
wrapper_from_this()
is compatible with the inheritance change
The verification shows that refcount_disposable
inherits from rpp::details::enable_wrapper_from_this<refcount_disposable>
, which provides the wrapper_from_this()
functionality. Since concat_disposable
inherits from refcount_disposable
, it correctly inherits this functionality. The implementation is safe and working as expected.
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Description: Verify that `concat_disposable` provides `wrapper_from_this()` correctly after the inheritance change.
# Test: Search for `wrapper_from_this` implementation in `concat_disposable` or its base classes.
# Expect: Confirm that `wrapper_from_this()` is implemented or appropriately inherited.
# Find the definition of `concat_disposable`
rg -A 10 'class concat_disposable' src/
# Check if `concat_disposable` or `rpp::refcount_disposable` provides `wrapper_from_this()`
rg -A 5 'wrapper_from_this' src/rpp/disposables/refcount_disposable.hpp
Length of output: 960
Script:
#!/bin/bash
# Let's search for refcount_disposable and wrapper_from_this in the entire codebase
rg -A 10 'class refcount_disposable' src/
# Search for wrapper_from_this implementation
rg -A 5 'wrapper_from_this' src/
# Let's also check if there's a different path for refcount_disposable
fd refcount_disposable src/
Length of output: 10612
183-184
:
Incorrect disposal check before notifying observer in on_completed()
.
In the on_completed()
method at lines 183-184, the code checks if (base::disposable->is_disposed())
before calling get_observer()->on_completed();
. This means on_completed()
is called when the disposable is already disposed, which may be unintended. Invert the condition to ensure on_completed()
is called only when not disposed.
Apply this diff to correct the condition:
-if (base::disposable->is_disposed())
+if (!base::disposable->is_disposed())
base::disposable->get_observer()->on_completed();
Likely invalid or redundant comment.
src/rpp/rpp/operators/delay.hpp (9)
16-16
: Addition of composite_disposable.hpp
header is appropriate
Including <rpp/disposables/composite_disposable.hpp>
is necessary for utilizing rpp::composite_disposable_impl
. This ensures that the composite disposable functionality is correctly integrated into the delay operator implementation.
38-39
: Refactoring to delay_disposable
enhances disposable management
Renaming delay_state
to delay_disposable
and inheriting from rpp::composite_disposable_impl<Container>
improves resource management by leveraging composite disposables. This change promotes better control over the lifecycle of disposables and aligns with best practices for managing complex disposable scenarios.
Line range hint 43-50
: Constructor and member initialization are correctly updated
The constructor of delay_disposable
now accepts Observer
, Worker
, and rpp::schedulers::duration
, initializing observer
, worker
, and delay
members appropriately. This update reflects the new structure and ensures that all necessary components are initialized for proper operation.
59-66
: Introduction of delay_disposable_wrapper
improves encapsulation
The new delay_disposable_wrapper
struct provides an abstraction over delay_disposable
, exposing is_disposed
and on_error
methods. This encapsulation enhances code readability and maintainability by separating concerns and providing a clear interface.
69-82
: Updates to delay_observer_strategy
align with disposable refactoring
Modifying delay_observer_strategy
to utilize the new delay_disposable
structure ensures consistency with the refactored disposable management approach. The methods set_upstream
and is_disposed
correctly interact with the disposable
, maintaining proper upstream disposable relationships and disposal checks.
107-110
: Correct scheduling of drain_queue
with worker
In the emplace
function, scheduling drain_queue
via disposable->worker.schedule
is appropriately updated. The lambda function correctly captures the delay_disposable_wrapper
and calls drain_queue
with disposable
. This ensures that emissions are processed at the correct time points.
117-130
: Thread safety ensured in emplace_safe
with mutex locking
Using std::lock_guard
to protect access to disposable->queue
and disposable->is_active
in emplace_safe
ensures thread safety when modifying shared resources. Clearing the queue on error when ClearOnError
is true
and appropriately handling the activation state are correctly implemented.
137-159
: Efficient emission processing in drain_queue
function
The drain_queue
function effectively processes queued emissions using a loop that handles different emission types via std::visit
. The use of std::unique_lock
for thread safety and checking for empty queues ensures proper synchronization. The handling of time-based scheduling with schedulers::optional_delay_to
is correctly applied.
176-190
: Enhancement of lift_with_disposables_strategy
improves flexibility
Renaming lift
to lift_with_disposables_strategy
and introducing additional template parameters for disposables strategy enhances the flexibility and modularity of the delay operator. The creation of the disposable and observer using the updated strategy allows for more customizable disposable management, aligning with best practices in design patterns.
src/rpp/rpp/operators/merge.hpp (12)
16-16
: Appropriate inclusion of composite_disposable
header
Including <rpp/disposables/composite_disposable.hpp>
is necessary due to the inheritance from composite_disposable
.
Line range hint 27-49
: Refactored merge_state
to merge_disposable
with correct inheritance
The merge_state
class has been appropriately renamed to merge_disposable
and now inherits from composite_disposable
, ensuring proper management of disposables within the merge operator.
52-58
: Updated constructors in merge_observer_base_strategy
Constructors have been correctly updated to use merge_disposable
, reflecting the refactoring changes and maintaining consistency in disposable management.
64-65
: Verify the management of disposables in set_upstream()
Adding disposables to both m_disposable
and m_disposables
may lead to duplication. Ensure that this is intentional and does not cause any side effects.
70-70
: Consistent disposal check in is_disposed()
method
Delegating the is_disposed()
check to m_disposable
aligns with the updated disposable structure.
75-75
: Proper error propagation to observer
Forwarding errors to the observer using get_observer_under_lock()->on_error(err)
ensures thread-safe error handling.
80-89
: Review disposal logic in on_completed()
The logic in on_completed()
removes disposables from m_disposable
when not all observables have completed. Verify that this behavior is intended and does not prematurely dispose of active subscriptions.
94-95
: Proper management of shared disposable
Storing m_disposable
as a std::shared_ptr<merge_disposable<TObserver>>
ensures shared ownership and correct lifetime management of the disposable.
106-106
: Correct forwarding of on_next()
to observer
Forwarding the received value to the observer under lock maintains thread safety and ensures that all emitted items are properly handled.
115-118
: Initialization of merge_observer_strategy
with init_state()
Using the init_state()
function to initialize the disposable state is a clean approach that encapsulates the setup logic and promotes code reuse.
122-124
: Incrementing completion counter before subscribing
Incrementing m_on_completed_needed
before subscribing to the inner observable ensures accurate tracking of active subscriptions, which is critical for correct completion logic.
127-132
: Ensure proper setup in init_state()
The init_state()
function initializes the merge_disposable
and sets up the upstream correctly. Using disposable_wrapper_impl
and setting the upstream via set_upstream(d.as_weak())
aligns with the overall disposal strategy.
src/rpp/rpp/operators/with_latest_from.hpp (19)
26-28
: Renaming to 'with_latest_from_disposable' enhances clarity
The renaming of the class to with_latest_from_disposable
and its inheritance from composite_disposable
improves readability and aligns with the disposable-based management approach adopted in this refactoring.
29-35
: Proper initialization in constructor
The constructor correctly initializes m_observer_with_mutex
using std::move(observer)
and initializes m_selector
with the provided selector. This ensures efficient resource management and correct setup of the disposable.
39-39
: Accessor method for selector is appropriate
The get_selector
method provides const access to the selector, which is essential for invoking the selector without modifying it.
50-51
: Use of shared pointer for disposable is correct
Defining disposable
as a std::shared_ptr
to with_latest_from_disposable
ensures proper memory management and shared ownership, which is important in asynchronous and multithreaded environments.
55-57
: Correctly adding disposables to composite
The set_upstream
method correctly adds the provided disposable d
to the composite disposable, ensuring that all disposables are properly managed and can be disposed of collectively.
60-62
: Accurate disposal status check
The is_disposed
method accurately reflects the disposal status by delegating to the underlying composite disposable's is_disposed
method.
66-68
: Thread-safe update of latest values
The on_next
implementation locks the corresponding value mutex before emplacing the new value. This ensures thread-safe updates to the latest values received from the observables.
72-74
: Proper error forwarding to observer
The on_error
method correctly forwards the error to the downstream observer, ensuring that error handling is consistent and observers are notified appropriately.
82-85
: Strategic use of disposables mode
Setting preferred_disposables_mode
to rpp::details::observers::disposables_mode::None
is appropriate here since the composite disposable is handling the disposables management.
86-88
: Initialization of disposable shared pointer
The disposable
shared pointer is properly declared and will manage the lifecycle of the disposable object within the observer strategy.
90-92
: Adding disposables ensures proper resource management
The set_upstream
method adds the disposable to the composite, aiding in effective resource disposal and preventing leaks.
95-97
: Consistent disposal status delegation
Delegating the is_disposed
check to the composite disposable maintains consistent disposal state management across the observables.
Line range hint 101-109
: Efficient combination of latest values with selector
The on_next
method efficiently combines the latest values from the observables using the selector function when all required values are present. The use of std::scoped_lock
ensures thread safety by locking all mutexes simultaneously.
110-112
: Safe emission of combined result
When a combined result is available, it is safely emitted to the downstream observer, ensuring that subscribers receive the correctly processed data.
115-117
: Error propagation is correctly handled
The on_error
method ensures that any errors encountered are properly propagated to the downstream observer, maintaining the reactive stream's integrity.
120-122
: Completion signal is appropriately forwarded
Upon completion, the on_completed
method correctly notifies the downstream observer, signaling the end of the data stream.
141-143
: Updated disposables strategy enhances management
Changing the updated_optimal_disposables_strategy
to fixed_disposables_strategy<1>
optimizes disposables management by specifying the expected number of disposables, improving resource handling.
153-157
: Correct setup of observer and disposable relationships
The subscription implementation correctly sets up the observer with the disposable, locks the pointer safely, and establishes upstream relationships, ensuring proper flow and disposal management.
164-166
: Proper subscription to additional observables
The fold expression in the subscribe
method correctly subscribes to each observable, passing the appropriate observer strategy with the disposable, ensuring that all observables are integrated into the operator's logic.
src/rpp/rpp/operators/combine_latest.hpp (5)
22-26
: Constructor Initialization is Correct
The combine_latest_disposable
class constructor properly initializes the base class and member variables. The use of std::move
for the observer
parameter and initialization of m_selector
ensures efficient resource management.
43-45
: Appropriate Inheritance and Member Access
The combine_latest_observer_strategy
class correctly inherits from combining_observer_strategy
and appropriately brings disposable
into scope using the using
declaration. This enhances code clarity and maintains access to the necessary members.
51-54
: Thread-Safe Update in on_next
Method
The on_next
method ensures thread safety by acquiring the observer under lock before updating shared values. This prevents data races and ensures that the combined values are correctly applied when all sources have emitted.
58-59
: Effective Use of Variadic Templates in apply_impl
The apply_impl
function template effectively uses variadic templates to handle an arbitrary number of arguments. This design provides flexibility and maintains type safety when combining values from multiple observables.
67-67
: Correct Operator Struct Definition
The combine_latest_t
struct correctly inherits from combining_operator_t
, supplying the appropriate template parameters. This aligns with the architectural refactoring towards improved disposable management within the library.
Old: BENCHMARK RESULTS (AUTOGENERATED)
|
name | rxcpp | rpp | prev rpp | ratio |
---|---|---|---|---|
Subscribe empty callbacks to empty observable | 303.52 ns | 1.54 ns | 1.88 ns | 0.82 |
Subscribe empty callbacks to empty observable via pipe operator | 302.85 ns | 1.54 ns | 1.86 ns | 0.83 |
Sources
name | rxcpp | rpp | prev rpp | ratio |
---|---|---|---|---|
from array of 1 - create + subscribe + immediate | 683.08 ns | 0.31 ns | 0.31 ns | 1.00 |
from array of 1 - create + subscribe + current_thread | 1025.94 ns | 3.71 ns | 3.70 ns | 1.00 |
concat_as_source of just(1 immediate) create + subscribe | 2259.51 ns | 99.82 ns | 138.50 ns | 0.72 |
defer from array of 1 - defer + create + subscribe + immediate | 723.84 ns | 0.31 ns | 0.31 ns | 1.00 |
interval - interval + take(3) + subscribe + immediate | 2124.17 ns | 59.24 ns | 59.23 ns | 1.00 |
interval - interval + take(3) + subscribe + current_thread | 2962.57 ns | 32.40 ns | 32.40 ns | 1.00 |
from array of 1 - create + as_blocking + subscribe + new_thread | 30750.79 ns | 27850.71 ns | 28596.20 ns | 0.97 |
from array of 1000 - create + as_blocking + subscribe + new_thread | 40687.11 ns | 49576.10 ns | 51823.36 ns | 0.96 |
concat_as_source of just(1 immediate) and just(1,2 immediate)create + subscribe | 3480.72 ns | 120.12 ns | 214.73 ns | 0.56 |
Filtering Operators
name | rxcpp | rpp | prev rpp | ratio |
---|---|---|---|---|
immediate_just+take(1)+subscribe | 1081.79 ns | 0.31 ns | 0.31 ns | 1.00 |
immediate_just+filter(true)+subscribe | 841.13 ns | 0.31 ns | 0.31 ns | 1.00 |
immediate_just(1,2)+skip(1)+subscribe | 976.70 ns | 0.31 ns | 0.31 ns | 1.00 |
immediate_just(1,1,2)+distinct_until_changed()+subscribe | 868.78 ns | 0.31 ns | 0.31 ns | 1.00 |
immediate_just(1,2)+first()+subscribe | 1247.09 ns | 0.31 ns | 0.31 ns | 1.00 |
immediate_just(1,2)+last()+subscribe | 902.54 ns | 0.31 ns | 0.31 ns | 1.00 |
immediate_just+take_last(1)+subscribe | 1097.30 ns | 18.22 ns | 18.36 ns | 0.99 |
immediate_just(1,2,3)+element_at(1)+subscribe | 889.99 ns | 0.31 ns | 0.31 ns | 1.00 |
Schedulers
name | rxcpp | rpp | prev rpp | ratio |
---|---|---|---|---|
immediate scheduler create worker + schedule | 282.52 ns | 0.62 ns | 1.54 ns | 0.40 |
current_thread scheduler create worker + schedule | 366.98 ns | 4.94 ns | 4.32 ns | 1.14 |
current_thread scheduler create worker + schedule + recursive schedule | 846.32 ns | 63.37 ns | 61.12 ns | 1.04 |
Transforming Operators
name | rxcpp | rpp | prev rpp | ratio |
---|---|---|---|---|
immediate_just+map(v*2)+subscribe | 862.55 ns | 0.31 ns | 0.31 ns | 1.00 |
immediate_just+scan(10, std::plus)+subscribe | 887.50 ns | 0.31 ns | 0.31 ns | 1.00 |
immediate_just+flat_map(immediate_just(v*2))+subscribe | 2343.57 ns | 160.40 ns | 185.16 ns | 0.87 |
immediate_just+buffer(2)+subscribe | 1535.31 ns | 13.90 ns | 13.90 ns | 1.00 |
immediate_just+window(2)+subscribe + subscsribe inner | 2338.17 ns | 1070.01 ns | 1328.14 ns | 0.81 |
Conditional Operators
name | rxcpp | rpp | prev rpp | ratio |
---|---|---|---|---|
immediate_just+take_while(false)+subscribe | 843.36 ns | - | - | 0.00 |
immediate_just+take_while(true)+subscribe | 835.30 ns | 0.31 ns | 0.31 ns | 1.00 |
Utility Operators
name | rxcpp | rpp | prev rpp | ratio |
---|---|---|---|---|
immediate_just(1)+subscribe_on(immediate)+subscribe | 1992.53 ns | 0.31 ns | 0.31 ns | 1.00 |
Combining Operators
name | rxcpp | rpp | prev rpp | ratio |
---|---|---|---|---|
immediate_just(immediate_just(1), immediate_just(1)) + merge() + subscribe | 3578.84 ns | 167.91 ns | 231.96 ns | 0.72 |
immediate_just(1) + merge_with(immediate_just(2)) + subscribe | 3681.27 ns | 157.75 ns | 177.79 ns | 0.89 |
immediate_just(1) + with_latest_from(immediate_just(2)) + subscribe | - | 145.12 ns | 177.69 ns | 0.82 |
immediate_just(immediate_just(1),immediate_just(1)) + switch_on_next() + subscribe | 3590.01 ns | 974.87 ns | 1270.43 ns | 0.77 |
immediate_just(1) + zip(immediate_just(2)) + subscribe | 2081.04 ns | 202.10 ns | 226.46 ns | 0.89 |
Subjects
name | rxcpp | rpp | prev rpp | ratio |
---|---|---|---|---|
publish_subject with 1 observer - on_next | 34.47 ns | 35.44 ns | 14.67 ns | 2.42 |
subscribe 100 observers to publish_subject | 200621.67 ns | 15141.80 ns | 16076.72 ns | 0.94 |
100 on_next to 100 observers to publish_subject | 26925.80 ns | 18961.18 ns | 17139.95 ns | 1.11 |
Scenarios
name | rxcpp | rpp | prev rpp | ratio |
---|---|---|---|---|
basic sample | 1386.95 ns | 13.28 ns | 12.97 ns | 1.02 |
basic sample with immediate scheduler | 1445.90 ns | 5.55 ns | 5.24 ns | 1.06 |
Aggregating Operators
name | rxcpp | rpp | prev rpp | ratio |
---|---|---|---|---|
immediate_just+reduce(10, std::plus)+subscribe | 901.86 ns | 0.31 ns | 0.31 ns | 1.00 |
Error Handling Operators
name | rxcpp | rpp | prev rpp | ratio |
---|---|---|---|---|
create(on_next(1), on_error())+on_error_resume_next(immediate_just(2)))+subscribe | 2037.02 ns | 1001.08 ns | 888.98 ns | 1.13 |
create(on_error())+retry(1)+subscribe | 584.91 ns | 102.91 ns | 121.57 ns | 0.85 |
ci-macos
General
name | rxcpp | rpp | prev rpp | ratio |
---|---|---|---|---|
Subscribe empty callbacks to empty observable | 1314.40 ns | 0.64 ns | 0.54 ns | 1.18 |
Subscribe empty callbacks to empty observable via pipe operator | 1341.85 ns | 0.63 ns | 0.64 ns | 0.99 |
Sources
name | rxcpp | rpp | prev rpp | ratio |
---|---|---|---|---|
from array of 1 - create + subscribe + immediate | 2658.21 ns | 0.32 ns | 0.35 ns | 0.91 |
from array of 1 - create + subscribe + current_thread | 3352.37 ns | 54.79 ns | 53.44 ns | 1.03 |
concat_as_source of just(1 immediate) create + subscribe | 7489.20 ns | 628.86 ns | 656.75 ns | 0.96 |
defer from array of 1 - defer + create + subscribe + immediate | 3794.29 ns | 0.60 ns | 1.58 ns | 0.38 |
interval - interval + take(3) + subscribe + immediate | 9019.52 ns | 154.25 ns | 226.13 ns | 0.68 |
interval - interval + take(3) + subscribe + current_thread | 10482.49 ns | 157.74 ns | 169.50 ns | 0.93 |
from array of 1 - create + as_blocking + subscribe + new_thread | 132098.12 ns | 116990.10 ns | 118889.20 ns | 0.98 |
from array of 1000 - create + as_blocking + subscribe + new_thread | 141182.67 ns | 131117.50 ns | 136567.12 ns | 0.96 |
concat_as_source of just(1 immediate) and just(1,2 immediate)create + subscribe | 11368.61 ns | 487.69 ns | 1482.27 ns | 0.33 |
Filtering Operators
name | rxcpp | rpp | prev rpp | ratio |
---|---|---|---|---|
immediate_just+take(1)+subscribe | 3913.31 ns | 0.32 ns | 0.23 ns | 1.35 |
immediate_just+filter(true)+subscribe | 2828.68 ns | 0.31 ns | 0.24 ns | 1.34 |
immediate_just(1,2)+skip(1)+subscribe | 4256.00 ns | 0.34 ns | 0.29 ns | 1.16 |
immediate_just(1,1,2)+distinct_until_changed()+subscribe | 2792.47 ns | 0.63 ns | 0.70 ns | 0.91 |
immediate_just(1,2)+first()+subscribe | 4320.61 ns | 0.32 ns | 0.25 ns | 1.25 |
immediate_just(1,2)+last()+subscribe | 4524.14 ns | 0.32 ns | 0.28 ns | 1.15 |
immediate_just+take_last(1)+subscribe | 4034.72 ns | 0.33 ns | 0.23 ns | 1.40 |
immediate_just(1,2,3)+element_at(1)+subscribe | 2874.88 ns | 0.32 ns | 0.35 ns | 0.91 |
Schedulers
name | rxcpp | rpp | prev rpp | ratio |
---|---|---|---|---|
immediate scheduler create worker + schedule | 1673.53 ns | 1.85 ns | 0.77 ns | 2.39 |
current_thread scheduler create worker + schedule | 2233.54 ns | 55.63 ns | 51.31 ns | 1.08 |
current_thread scheduler create worker + schedule + recursive schedule | 3762.57 ns | 314.10 ns | 298.16 ns | 1.05 |
Transforming Operators
name | rxcpp | rpp | prev rpp | ratio |
---|---|---|---|---|
immediate_just+map(v*2)+subscribe | 4210.86 ns | 8.56 ns | 4.23 ns | 2.02 |
immediate_just+scan(10, std::plus)+subscribe | 4298.68 ns | 0.77 ns | 0.47 ns | 1.66 |
immediate_just+flat_map(immediate_just(v*2))+subscribe | 7814.70 ns | 714.69 ns | 491.69 ns | 1.45 |
immediate_just+buffer(2)+subscribe | 3372.76 ns | 86.25 ns | 66.25 ns | 1.30 |
immediate_just+window(2)+subscribe + subscsribe inner | 7319.29 ns | 3216.23 ns | 2579.62 ns | 1.25 |
Conditional Operators
name | rxcpp | rpp | prev rpp | ratio |
---|---|---|---|---|
immediate_just+take_while(false)+subscribe | 2864.03 ns | - | - | 0.00 |
immediate_just+take_while(true)+subscribe | 3864.12 ns | 0.66 ns | 0.35 ns | 1.90 |
Utility Operators
name | rxcpp | rpp | prev rpp | ratio |
---|---|---|---|---|
immediate_just(1)+subscribe_on(immediate)+subscribe | 6734.64 ns | 6.91 ns | 6.72 ns | 1.03 |
Combining Operators
name | rxcpp | rpp | prev rpp | ratio |
---|---|---|---|---|
immediate_just(immediate_just(1), immediate_just(1)) + merge() + subscribe | 14643.61 ns | 773.73 ns | 833.66 ns | 0.93 |
immediate_just(1) + merge_with(immediate_just(2)) + subscribe | 13239.05 ns | 561.42 ns | 755.08 ns | 0.74 |
immediate_just(1) + with_latest_from(immediate_just(2)) + subscribe | - | 869.69 ns | 724.26 ns | 1.20 |
immediate_just(immediate_just(1),immediate_just(1)) + switch_on_next() + subscribe | 16217.38 ns | 3192.77 ns | 2449.56 ns | 1.30 |
immediate_just(1) + zip(immediate_just(2)) + subscribe | 10235.87 ns | 1238.90 ns | 1298.31 ns | 0.95 |
Subjects
name | rxcpp | rpp | prev rpp | ratio |
---|---|---|---|---|
publish_subject with 1 observer - on_next | 104.14 ns | 66.44 ns | 56.98 ns | 1.17 |
subscribe 100 observers to publish_subject | 478443.50 ns | 55117.74 ns | 46422.57 ns | 1.19 |
100 on_next to 100 observers to publish_subject | 78834.31 ns | 28668.08 ns | 28426.72 ns | 1.01 |
Scenarios
name | rxcpp | rpp | prev rpp | ratio |
---|---|---|---|---|
basic sample | 3749.12 ns | 90.93 ns | 72.09 ns | 1.26 |
basic sample with immediate scheduler | 3772.96 ns | 25.49 ns | 18.70 ns | 1.36 |
Aggregating Operators
name | rxcpp | rpp | prev rpp | ratio |
---|---|---|---|---|
immediate_just+reduce(10, std::plus)+subscribe | 3295.66 ns | 0.32 ns | 0.31 ns | 1.01 |
Error Handling Operators
name | rxcpp | rpp | prev rpp | ratio |
---|---|---|---|---|
create(on_next(1), on_error())+on_error_resume_next(immediate_just(2)))+subscribe | 9032.49 ns | 5480.66 ns | 5622.37 ns | 0.97 |
create(on_error())+retry(1)+subscribe | 2473.49 ns | 381.06 ns | 430.25 ns | 0.89 |
ci-ubuntu-clang
General
name | rxcpp | rpp | prev rpp | ratio |
---|---|---|---|---|
Subscribe empty callbacks to empty observable | 271.14 ns | 0.63 ns | 0.63 ns | 1.00 |
Subscribe empty callbacks to empty observable via pipe operator | 271.47 ns | 0.63 ns | 0.63 ns | 1.00 |
Sources
name | rxcpp | rpp | prev rpp | ratio |
---|---|---|---|---|
from array of 1 - create + subscribe + immediate | 560.29 ns | 0.31 ns | 0.31 ns | 1.00 |
from array of 1 - create + subscribe + current_thread | 799.24 ns | 4.63 ns | 4.63 ns | 1.00 |
concat_as_source of just(1 immediate) create + subscribe | 2334.02 ns | 127.78 ns | 177.50 ns | 0.72 |
defer from array of 1 - defer + create + subscribe + immediate | 769.68 ns | 0.31 ns | 0.31 ns | 1.00 |
interval - interval + take(3) + subscribe + immediate | 2218.18 ns | 58.31 ns | 58.26 ns | 1.00 |
interval - interval + take(3) + subscribe + current_thread | 3254.24 ns | 33.83 ns | 30.88 ns | 1.10 |
from array of 1 - create + as_blocking + subscribe + new_thread | 27508.62 ns | 27291.18 ns | 28128.92 ns | 0.97 |
from array of 1000 - create + as_blocking + subscribe + new_thread | 39750.72 ns | 36888.53 ns | 34340.88 ns | 1.07 |
concat_as_source of just(1 immediate) and just(1,2 immediate)create + subscribe | 3706.16 ns | 148.99 ns | 302.02 ns | 0.49 |
Filtering Operators
name | rxcpp | rpp | prev rpp | ratio |
---|---|---|---|---|
immediate_just+take(1)+subscribe | 1135.91 ns | 0.31 ns | 0.31 ns | 1.00 |
immediate_just+filter(true)+subscribe | 834.25 ns | 0.31 ns | 0.31 ns | 1.00 |
immediate_just(1,2)+skip(1)+subscribe | 1076.40 ns | 0.31 ns | 0.31 ns | 1.00 |
immediate_just(1,1,2)+distinct_until_changed()+subscribe | 878.40 ns | 0.62 ns | 0.31 ns | 2.00 |
immediate_just(1,2)+first()+subscribe | 1367.55 ns | 0.31 ns | 0.31 ns | 1.00 |
immediate_just(1,2)+last()+subscribe | 999.92 ns | 0.31 ns | 0.31 ns | 1.00 |
immediate_just+take_last(1)+subscribe | 1182.77 ns | 0.31 ns | 0.31 ns | 1.00 |
immediate_just(1,2,3)+element_at(1)+subscribe | 875.73 ns | 0.31 ns | 0.31 ns | 1.00 |
Schedulers
name | rxcpp | rpp | prev rpp | ratio |
---|---|---|---|---|
immediate scheduler create worker + schedule | 280.75 ns | 1.54 ns | 1.54 ns | 1.00 |
current_thread scheduler create worker + schedule | 397.38 ns | 4.32 ns | 4.32 ns | 1.00 |
current_thread scheduler create worker + schedule + recursive schedule | 859.40 ns | 54.84 ns | 54.85 ns | 1.00 |
Transforming Operators
name | rxcpp | rpp | prev rpp | ratio |
---|---|---|---|---|
immediate_just+map(v*2)+subscribe | 862.04 ns | 0.31 ns | 0.31 ns | 1.00 |
immediate_just+scan(10, std::plus)+subscribe | 960.08 ns | 0.31 ns | 0.31 ns | 1.00 |
immediate_just+flat_map(immediate_just(v*2))+subscribe | 2232.84 ns | 135.15 ns | 224.25 ns | 0.60 |
immediate_just+buffer(2)+subscribe | 1530.87 ns | 13.58 ns | 14.20 ns | 0.96 |
immediate_just+window(2)+subscribe + subscsribe inner | 2429.41 ns | 905.35 ns | 905.82 ns | 1.00 |
Conditional Operators
name | rxcpp | rpp | prev rpp | ratio |
---|---|---|---|---|
immediate_just+take_while(false)+subscribe | 829.30 ns | - | - | 0.00 |
immediate_just+take_while(true)+subscribe | 846.09 ns | 0.31 ns | 0.31 ns | 1.00 |
Utility Operators
name | rxcpp | rpp | prev rpp | ratio |
---|---|---|---|---|
immediate_just(1)+subscribe_on(immediate)+subscribe | 1986.70 ns | 0.31 ns | 0.31 ns | 1.00 |
Combining Operators
name | rxcpp | rpp | prev rpp | ratio |
---|---|---|---|---|
immediate_just(immediate_just(1), immediate_just(1)) + merge() + subscribe | 3233.81 ns | 154.91 ns | 281.39 ns | 0.55 |
immediate_just(1) + merge_with(immediate_just(2)) + subscribe | 3707.60 ns | 138.74 ns | 209.42 ns | 0.66 |
immediate_just(1) + with_latest_from(immediate_just(2)) + subscribe | - | 141.98 ns | 196.84 ns | 0.72 |
immediate_just(immediate_just(1),immediate_just(1)) + switch_on_next() + subscribe | 3371.82 ns | 834.14 ns | 833.29 ns | 1.00 |
immediate_just(1) + zip(immediate_just(2)) + subscribe | 2192.81 ns | 195.92 ns | 195.51 ns | 1.00 |
Subjects
name | rxcpp | rpp | prev rpp | ratio |
---|---|---|---|---|
publish_subject with 1 observer - on_next | 52.48 ns | 17.55 ns | 17.46 ns | 1.01 |
subscribe 100 observers to publish_subject | 202295.40 ns | 15971.67 ns | 17200.45 ns | 0.93 |
100 on_next to 100 observers to publish_subject | 38125.65 ns | 23514.96 ns | 23464.63 ns | 1.00 |
Scenarios
name | rxcpp | rpp | prev rpp | ratio |
---|---|---|---|---|
basic sample | 1283.23 ns | 12.06 ns | 12.36 ns | 0.98 |
basic sample with immediate scheduler | 1302.11 ns | 5.86 ns | 5.86 ns | 1.00 |
Aggregating Operators
name | rxcpp | rpp | prev rpp | ratio |
---|---|---|---|---|
immediate_just+reduce(10, std::plus)+subscribe | 1034.27 ns | 0.31 ns | 0.31 ns | 1.00 |
Error Handling Operators
name | rxcpp | rpp | prev rpp | ratio |
---|---|---|---|---|
create(on_next(1), on_error())+on_error_resume_next(immediate_just(2)))+subscribe | 2160.57 ns | 1162.19 ns | 987.19 ns | 1.18 |
create(on_error())+retry(1)+subscribe | 651.94 ns | 140.15 ns | 156.09 ns | 0.90 |
ci-windows
General
name | rxcpp | rpp | prev rpp | ratio |
---|---|---|---|---|
Subscribe empty callbacks to empty observable | 613.86 ns | 2.16 ns | 2.16 ns | 1.00 |
Subscribe empty callbacks to empty observable via pipe operator | 622.86 ns | 2.16 ns | 2.16 ns | 1.00 |
Sources
name | rxcpp | rpp | prev rpp | ratio |
---|---|---|---|---|
from array of 1 - create + subscribe + immediate | 1209.60 ns | 5.55 ns | 5.24 ns | 1.06 |
from array of 1 - create + subscribe + current_thread | 1486.79 ns | 15.45 ns | 15.49 ns | 1.00 |
concat_as_source of just(1 immediate) create + subscribe | 3873.58 ns | 201.44 ns | 239.73 ns | 0.84 |
defer from array of 1 - defer + create + subscribe + immediate | 1252.93 ns | 5.55 ns | 4.94 ns | 1.12 |
interval - interval + take(3) + subscribe + immediate | 3312.50 ns | 139.89 ns | 139.91 ns | 1.00 |
interval - interval + take(3) + subscribe + current_thread | 3554.07 ns | 59.51 ns | 60.60 ns | 0.98 |
from array of 1 - create + as_blocking + subscribe + new_thread | 126466.67 ns | 113022.22 ns | 117860.00 ns | 0.96 |
from array of 1000 - create + as_blocking + subscribe + new_thread | 136785.71 ns | 138275.00 ns | 137350.00 ns | 1.01 |
concat_as_source of just(1 immediate) and just(1,2 immediate)create + subscribe | 5510.66 ns | 260.20 ns | 312.90 ns | 0.83 |
Filtering Operators
name | rxcpp | rpp | prev rpp | ratio |
---|---|---|---|---|
immediate_just+take(1)+subscribe | 1896.42 ns | 19.44 ns | 19.43 ns | 1.00 |
immediate_just+filter(true)+subscribe | 1375.37 ns | 18.51 ns | 18.50 ns | 1.00 |
immediate_just(1,2)+skip(1)+subscribe | 1826.68 ns | 17.89 ns | 17.89 ns | 1.00 |
immediate_just(1,1,2)+distinct_until_changed()+subscribe | 1687.62 ns | 20.68 ns | 20.68 ns | 1.00 |
immediate_just(1,2)+first()+subscribe | 2097.79 ns | 18.20 ns | 18.20 ns | 1.00 |
immediate_just(1,2)+last()+subscribe | 1825.99 ns | 19.12 ns | 19.14 ns | 1.00 |
immediate_just+take_last(1)+subscribe | 2085.33 ns | 67.54 ns | 64.51 ns | 1.05 |
immediate_just(1,2,3)+element_at(1)+subscribe | 1383.93 ns | 20.97 ns | 20.99 ns | 1.00 |
Schedulers
name | rxcpp | rpp | prev rpp | ratio |
---|---|---|---|---|
immediate scheduler create worker + schedule | 508.57 ns | 4.32 ns | 4.32 ns | 1.00 |
current_thread scheduler create worker + schedule | 690.25 ns | 11.71 ns | 11.99 ns | 0.98 |
current_thread scheduler create worker + schedule + recursive schedule | 1402.23 ns | 104.22 ns | 99.76 ns | 1.04 |
Transforming Operators
name | rxcpp | rpp | prev rpp | ratio |
---|---|---|---|---|
immediate_just+map(v*2)+subscribe | 1362.47 ns | 18.82 ns | 18.81 ns | 1.00 |
immediate_just+scan(10, std::plus)+subscribe | 1497.23 ns | 20.98 ns | 20.96 ns | 1.00 |
immediate_just+flat_map(immediate_just(v*2))+subscribe | 3565.53 ns | 206.30 ns | 270.94 ns | 0.76 |
immediate_just+buffer(2)+subscribe | 2727.25 ns | 66.19 ns | 64.06 ns | 1.03 |
immediate_just+window(2)+subscribe + subscsribe inner | 4150.00 ns | 1365.99 ns | 1354.03 ns | 1.01 |
Conditional Operators
name | rxcpp | rpp | prev rpp | ratio |
---|---|---|---|---|
immediate_just+take_while(false)+subscribe | 1655.39 ns | 17.57 ns | 17.57 ns | 1.00 |
immediate_just+take_while(true)+subscribe | 1357.45 ns | 18.51 ns | 18.51 ns | 1.00 |
Utility Operators
name | rxcpp | rpp | prev rpp | ratio |
---|---|---|---|---|
immediate_just(1)+subscribe_on(immediate)+subscribe | 3301.57 ns | 11.10 ns | 11.11 ns | 1.00 |
Combining Operators
name | rxcpp | rpp | prev rpp | ratio |
---|---|---|---|---|
immediate_just(immediate_just(1), immediate_just(1)) + merge() + subscribe | 5249.30 ns | 218.61 ns | 294.56 ns | 0.74 |
immediate_just(1) + merge_with(immediate_just(2)) + subscribe | 5602.13 ns | 192.49 ns | 270.53 ns | 0.71 |
immediate_just(1) + with_latest_from(immediate_just(2)) + subscribe | - | 206.54 ns | 300.66 ns | 0.69 |
immediate_just(immediate_just(1),immediate_just(1)) + switch_on_next() + subscribe | 5577.34 ns | 979.87 ns | 961.34 ns | 1.02 |
immediate_just(1) + zip(immediate_just(2)) + subscribe | 3698.53 ns | 538.09 ns | 565.65 ns | 0.95 |
Subjects
name | rxcpp | rpp | prev rpp | ratio |
---|---|---|---|---|
publish_subject with 1 observer - on_next | 36.65 ns | 19.94 ns | 20.65 ns | 0.97 |
subscribe 100 observers to publish_subject | 265400.00 ns | 30208.82 ns | 27426.83 ns | 1.10 |
100 on_next to 100 observers to publish_subject | 54195.00 ns | 35382.14 ns | 35760.61 ns | 0.99 |
Scenarios
name | rxcpp | rpp | prev rpp | ratio |
---|---|---|---|---|
basic sample | 1973.62 ns | 96.45 ns | 96.89 ns | 1.00 |
basic sample with immediate scheduler | 2308.80 ns | 68.40 ns | 66.91 ns | 1.02 |
Aggregating Operators
name | rxcpp | rpp | prev rpp | ratio |
---|---|---|---|---|
immediate_just+reduce(10, std::plus)+subscribe | 1511.80 ns | 19.13 ns | 19.13 ns | 1.00 |
Error Handling Operators
name | rxcpp | rpp | prev rpp | ratio |
---|---|---|---|---|
create(on_next(1), on_error())+on_error_resume_next(immediate_just(2)))+subscribe | 2221.00 ns | 354.57 ns | 243.84 ns | 1.45 |
create(on_error())+retry(1)+subscribe | 1563.77 ns | 145.18 ns | 199.35 ns | 0.73 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Outside diff range and nitpick comments (6)
src/rpp/rpp/operators/details/combining_strategy.hpp (1)
Line range hint
38-41
: Improve documentation of atomic operation semantics.The comment "just need atomicity, not guarding anything" doesn't fully explain the memory ordering choice. Consider documenting why
seq_cst
ordering is necessary here, as it's the strongest (and most expensive) memory ordering.- // just need atomicity, not guarding anything + // Using sequential consistency to ensure all threads have a consistent view + // of both the counter and any prior/subsequent operations on the observersrc/rpp/rpp/operators/delay.hpp (1)
139-164
: Consider documenting drain_queue behavior.The drain_queue method handles complex queue processing logic. Consider adding documentation to explain:
- The queue processing workflow
- The timing control mechanism
- The visit pattern for handling different emission types
src/rpp/rpp/operators/merge.hpp (3)
Line range hint
27-46
: LGTM: Well-structured disposable implementation with proper thread safety.The merge_disposable class effectively combines composite disposal with thread-safe observer access and completion tracking.
Consider adding documentation comments to explain:
- The purpose of m_on_completed_needed counter
- The memory ordering requirements for the atomic operations
51-92
: Consider exception safety in on_completed method.While the implementation is generally solid, the on_completed method could leak resources if an exception occurs during disposal of the elements in m_disposables.
Consider using RAII or a scope guard to ensure proper cleanup:
void on_completed() const { if (m_disposable->decrement_on_completed()) { m_disposable->get_observer_under_lock()->on_completed(); } else { + auto disposables = std::move(m_disposables); // Take ownership for (const auto& v : disposables) { m_disposable->remove(v); v.dispose(); } } }
Line range hint
206-208
: Update performance notes in documentation.The performance notes should be updated to reflect the changes in disposables implementation:
Consider updating the performance notes to mention:
- The composite disposable allocation
- The thread synchronization overhead from the atomic operations
src/benchmarks/benchmarks.cpp (1)
692-695
: Consider consolidating conditional compilation patterns.The conditional compilation blocks protect rxcpp subject usage, but the pattern could be more consistent. Consider extracting the subject creation and subscription into a helper function to reduce duplication.
+#ifdef RPP_BUILD_RXCPP +namespace { +template<typename T, typename F> +auto create_and_subscribe_rxcpp_subject(F&& callback) { + rxcpp::subjects::subject<T> subject{}; + subject.get_observable().subscribe( + rxcpp::make_subscriber<T>(std::forward<F>(callback), [] {})); + return subject; +} +} +#endif // Usage: -#ifdef RPP_BUILD_RXCPP - rxcpp::subjects::subject<int> rxcpp_subj{}; - rxcpp_subj.get_observable().subscribe(rxcpp::make_subscriber<int>([](int v) { ankerl::nanobench::doNotOptimizeAway(v); }, [] {})); -#endif +#ifdef RPP_BUILD_RXCPP + auto rxcpp_subj = create_and_subscribe_rxcpp_subject<int>([](int v) { + ankerl::nanobench::doNotOptimizeAway(v); + }); +#endifAlso applies to: 735-741
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
📒 Files selected for processing (8)
cmake/dependencies.cmake
(1 hunks)conanfile.py
(1 hunks)src/benchmarks/benchmarks.cpp
(4 hunks)src/rpp/rpp/operators/concat.hpp
(4 hunks)src/rpp/rpp/operators/delay.hpp
(6 hunks)src/rpp/rpp/operators/details/combining_strategy.hpp
(5 hunks)src/rpp/rpp/operators/merge.hpp
(5 hunks)src/rpp/rpp/operators/take_until.hpp
(4 hunks)
🧰 Additional context used
📓 Learnings (1)
src/rpp/rpp/operators/concat.hpp (1)
Learnt from: victimsnino
PR: victimsnino/ReactivePlusPlus#673
File: src/rpp/rpp/operators/concat.hpp:59-59
Timestamp: 2024-11-04T19:42:15.257Z
Learning: In the `concat_disposable` class in `src/rpp/rpp/operators/concat.hpp`, within the `drain` method, it's acceptable to call `get_observer()->on_completed()` even when `is_disposed()` returns true, because `refcounted.dispose()` could cause disposing of `this` object.
🔇 Additional comments (23)
conanfile.py (1)
28-28
: Verify trompeloeil v48 compatibility.
The version update from 47 to 48 looks good as it aligns with the changes in cmake/dependencies.cmake. However, let's verify a few things:
Please ensure:
- All test files using trompeloeil are compatible with v48
- Review the changelog for any breaking changes between v47 and v48
✅ Verification successful
Trompeloeil v48 update is safe to proceed
The verification shows that:
- Trompeloeil v48 is properly referenced in
cmake/dependencies.cmake
using FetchContent from the official repository - The library is used consistently across test files with a clean integration through
src/tests/utils/rpp_trompeloil.hpp
- The usage is limited to test code only, with no production code dependencies
- The version is fetched directly from GitHub (rollbear/trompeloeil) rather than Conan Center, making the Conan package availability non-critical
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Description: Verify trompeloeil v48 availability and usage
# Test 1: Check if version 48 exists in conan center
conan search trompeloeil/48 --remote=conancenter
# Test 2: Look for trompeloeil usage in test files to assess impact
rg -l "trompeloeil" test/
Length of output: 195
Script:
#!/bin/bash
# Let's try a different approach to verify the codebase structure and usage
# First, let's find all test-related files in the repository
fd -t f "test"
# Check for any mentions of trompeloeil in the entire codebase
rg -l "trompeloeil"
# Look for CMake files that might reference trompeloeil
fd -e cmake
# Check the content of any dependency-related files
fd -g "*depend*.cmake" -g "*depend*.txt" -X cat
Length of output: 3012
Script:
#!/bin/bash
# Let's check the content of relevant files to understand trompeloeil usage and version requirements
# Check the content of dependencies.cmake
cat cmake/dependencies.cmake
# Check the content of test utils file that includes trompeloeil
cat src/tests/utils/rpp_trompeloil.hpp
# Check CMakeLists.txt in tests directory
cat src/tests/CMakeLists.txt
Length of output: 8664
cmake/dependencies.cmake (2)
119-119
: LGTM! Good improvement in build stability.
Switching from the floating 'main' branch to a specific version 'v48' for trompeloeil ensures consistent and reproducible builds. This is particularly important for testing infrastructure.
125-125
: LGTM! Good improvement in build stability.
Switching from the floating 'master' branch to a specific version 'v4.3.11' for nanobench ensures consistent and reproducible builds. This is particularly important for benchmarking infrastructure.
src/rpp/rpp/operators/details/combining_strategy.hpp (1)
110-112
: Review disposable ownership pattern.
The current implementation could lead to memory leaks through cyclic references. While using as_weak()
in line 112 helps, the overall pattern needs review:
Consider these improvements:
- Document the ownership model explicitly
- Consider using
std::weak_ptr
for observer storage inTDisposable
- Add RAII wrapper to ensure proper cleanup of resources
src/rpp/rpp/operators/take_until.hpp (6)
16-16
: LGTM! Good architectural improvement.
The change from a state class to a disposable class that inherits from composite_disposable
is a solid architectural improvement. This provides better encapsulation of disposal logic and aligns with the PR's objective of stabilizing disposables.
Also applies to: 23-23
26-34
: LGTM! Constructors properly handle both move and copy semantics.
The constructors are well-implemented, providing proper initialization of the mutex-protected observer for both rvalue and lvalue cases.
49-51
: Initialize state member to prevent null pointer dereference.
The state
member is still used without initialization in methods like on_error
, on_completed
, set_upstream
, and is_disposed
.
65-65
: LGTM! Good simplification of disposable operations.
The changes simplify the code by directly using the composite_disposable's functionality instead of going through the observer. This reduces indirection and improves code clarity.
Also applies to: 67-67
106-106
: LGTM! More precise disposables strategy.
The change to fixed_disposables_strategy<1>
is more precise than the previous default strategy, as it explicitly states that exactly one disposable is expected. This improves type safety and aligns with the PR's goal of stabilizing disposables logic.
111-113
: LGTM! Proper integration with disposable-based architecture.
The changes properly integrate with the new disposable-based architecture while maintaining thread safety through explicit locking. The observer setup is more direct and cleaner.
src/rpp/rpp/operators/concat.hpp (4)
37-40
: LGTM! Good refactoring of the disposable logic.
The renaming from concat_state_t
to concat_disposable
and inheritance change to rpp::refcount_disposable
better reflects the class's responsibility and provides built-in reference counting capabilities.
107-119
: LGTM! Consistent renaming in the observer strategy base.
The renaming from state
to disposable
in constructor parameters and member variables maintains consistency with the class renaming while preserving the same functionality.
147-155
: LGTM! Improved synchronization in on_completed.
The implementation now properly handles the stage transition with atomic operations and includes an assertion to verify the expected state, making the synchronization more robust.
190-195
: Verify observer type constraints for set_upstream.
The init_state
method encapsulates the initialization logic well. However, we should ensure that all possible observer types support the set_upstream
method.
Let's verify the observer type constraints:
Consider adding a concept or static_assert to enforce this requirement:
static_assert(requires(TObserver obs) {
obs.set_upstream(std::declval<disposable_wrapper>());
}, "Observer type must support set_upstream method");
✅ Verification successful
Based on the extensive search results, I can now provide a definitive response about the observer type constraints for set_upstream
:
Observer type constraints for set_upstream
are properly enforced through the observer interface.
The codebase shows that:
-
The
set_upstream
method is a core part of the observer interface, documented insrc/rpp/rpp/observers.hpp
and implemented consistently across all observer types. -
All observer implementations must satisfy the
observer_strategy
concept which includesset_upstream
as shown insrc/rpp/rpp/observers/fwd.hpp
. -
The base observer implementation in
src/rpp/rpp/observers/observer.hpp
provides a default implementation that handles disposable management. -
The pattern is consistently used across all operators and tested thoroughly as evidenced by the extensive test coverage in
src/tests/rpp/test_observers.cpp
.
Therefore, the current implementation in concat.hpp
follows the established pattern and no additional constraints are needed.
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Search for observer implementations to verify set_upstream support
ast-grep --pattern 'class $CLASS : public $OBSERVER {
$$$
void set_upstream($$$) {
$$$
}
$$$
}'
Length of output: 109
Script:
#!/bin/bash
# Let's try a different approach to find observer implementations and set_upstream usage
# First, let's find all files containing 'set_upstream'
rg -l "set_upstream"
# Then, let's look for the actual implementation with context
rg "set_upstream" -A 5 -B 5
# Also search for observer base classes or interfaces
rg "class.*observer" -i
Length of output: 105407
src/rpp/rpp/operators/delay.hpp (5)
16-16
: LGTM: Include directive aligns with new disposable-based design.
The addition of composite_disposable.hpp header supports the transition from state-based to disposable-based architecture.
38-39
: Well-designed transition to composite disposable architecture.
The refactoring improves resource management by:
- Leveraging composite disposables through inheritance
- Using constrained templates for type safety
- Maintaining clean constructor implementation
Also applies to: 43-48
59-67
: LGTM: Clean wrapper implementation.
The wrapper provides a focused interface with proper lifetime management through shared_ptr.
Line range hint 69-112
: LGTM: Robust observer strategy implementation.
The implementation properly handles:
- Resource cleanup on error/completion
- Thread-safe scheduling
- Configurable error handling behavior
178-178
: LGTM: Clean integration with disposables architecture.
The implementation properly:
- Updates disposables strategy
- Maintains type safety with worker and container handling
- Follows consistent naming conventions
Also applies to: 183-192
src/rpp/rpp/operators/merge.hpp (2)
16-16
: LGTM: Include directive addition is appropriate.
The addition of composite_disposable.hpp is consistent with the inheritance changes in merge_disposable.
115-133
: LGTM: Well-structured initialization and resource management.
The init_state method provides a clean way to initialize the disposable and set up the observer, ensuring proper resource management.
src/benchmarks/benchmarks.cpp (2)
81-87
: Consider enhancing benchmark documentation and coverage analysis.
The benchmark initialization and configuration is well-structured, but could benefit from:
- Documentation of expected performance characteristics for each benchmark category
- Automated verification of benchmark coverage against available operators
- Statistical significance thresholds for performance comparisons
Let's analyze the benchmark coverage:
#!/bin/bash
# Description: Analyze benchmark coverage
# Test: Extract all benchmark categories and operator tests
echo "Benchmark categories:"
rg "BENCHMARK\(\"([^\"]+)" -o --replace '$1'
echo "\nOperators tested:"
rg "SECTION\(\"([^\"]+)" -o --replace '$1'
Line range hint 68-77
: LGTM! The immediate_just implementation enables fair benchmarking.
The implementation correctly mirrors rpp's immediate_just functionality, ensuring consistent comparison between libraries.
Let's verify the consistent usage:
✅ Verification successful
Let me generate another verification script to specifically check the usage of immediate_just:
The immediate_just implementation is correctly used and verified across benchmarks
The verification shows that:
- The implementation is consistently used in both rpp and rxcpp namespaces
- It's used symmetrically in all benchmark test cases
- The function is properly used across various operators (merge, zip, flat_map, etc.)
- Arguments are correctly forwarded in all usage scenarios
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Description: Verify immediate_just is used consistently across benchmarks
# Test: Search for immediate_just usage in both rpp and rxcpp contexts
rg "immediate_just" -B 2 -A 2
Length of output: 4793
Script:
#!/bin/bash
# Search for immediate_just usage patterns to verify consistency
rg "immediate_just" src/benchmarks/benchmarks.cpp -A 1 -B 1
Length of output: 9564
void set_upstream(const rpp::disposable_wrapper& d) const | ||
{ | ||
state->get_observer_under_lock()->set_upstream(d); | ||
disposable->add(d); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Improve const correctness in member functions.
The methods set_upstream
, is_disposed
, on_error
, and on_completed
are marked const
but modify shared state through the disposable
member. This could lead to thread safety issues.
Consider one of these approaches:
- Remove the
const
qualifier from these methods as they effectively modify state:
- void set_upstream(const rpp::disposable_wrapper& d) const
+ void set_upstream(const rpp::disposable_wrapper& d)
- bool is_disposed() const
+ bool is_disposed()
- void on_error(const std::exception_ptr& err) const
+ void on_error(const std::exception_ptr& err)
- void on_completed() const
+ void on_completed()
- Or make the
disposable
membermutable
if these operations should be allowed in const context:
- std::shared_ptr<TDisposable> disposable{};
+ mutable std::shared_ptr<TDisposable> disposable{};
Also applies to: 63-65, 68-70, 73-76
for more information, see https://pre-commit.ci
Quality Gate passedIssues Measures |
Stabilize disposables logic: #666
Summary by CodeRabbit
Summary by CodeRabbit
New Features
disposable
structures across various operators, enhancing resource management.combine_latest
,concat
,debounce
,delay
,retry
, etc.) to utilize the new disposable management system.immediate_just
function in therxcpp
namespace for immediate emission of values.Bug Fixes
on_error_resume_next
andretry
operators.Refactor
add_ref
method and removed theMode
enumeration in therefcount_disposable
class.chain
class with a new static assertion forTStrategy
.Documentation