diff --git a/src/rpp/rpp/operators/filter.hpp b/src/rpp/rpp/operators/filter.hpp index 09355df28..57f7c0ae1 100644 --- a/src/rpp/rpp/operators/filter.hpp +++ b/src/rpp/rpp/operators/filter.hpp @@ -23,7 +23,7 @@ IMPLEMENTATION_FILE(filter_tag); namespace rpp::details { template Predicate> -struct filter_impl +struct filter_impl_on_next { RPP_NO_UNIQUE_ADDRESS Predicate predicate; @@ -35,4 +35,20 @@ struct filter_impl } }; +template Predicate> +struct filter_impl +{ + RPP_NO_UNIQUE_ADDRESS filter_impl_on_next on_next; + + template + auto operator()(TSub&& subscriber) const + { + auto subscription = subscriber.get_subscription(); + return create_subscriber_with_state(std::move(subscription), + on_next, + utils::forwarding_on_error{}, + utils::forwarding_on_completed{}, + std::forward(subscriber)); + } +}; } // namespace rpp::details diff --git a/src/rpp/rpp/operators/lift.hpp b/src/rpp/rpp/operators/lift.hpp index 022eb15fa..c04d6a380 100644 --- a/src/rpp/rpp/operators/lift.hpp +++ b/src/rpp/rpp/operators/lift.hpp @@ -22,23 +22,6 @@ IMPLEMENTATION_FILE(lift_tag); namespace rpp::details { -template OnNext, - std::invocable OnError = utils::forwarding_on_error, - std::invocable OnCompleted = utils::forwarding_on_completed> -auto create_subscriber_in_lift(TSub&& subscriber, - OnNext&& on_next = {}, - OnError&& on_error = {}, - OnCompleted&& on_completed = {}) -{ - auto subscription = subscriber.get_subscription(); - return create_subscriber_with_state(std::move(subscription), - std::forward(on_next), - std::forward(on_error), - std::forward(on_completed), - std::forward(subscriber)); -} template struct lift_action_by_callbacks { @@ -49,7 +32,12 @@ struct lift_action_by_callbacks template auto operator()(TSub&& subscriber) const { - return create_subscriber_in_lift(std::forward(subscriber), on_next, on_error, on_completed); + auto subscription = subscriber.get_subscription(); + return create_subscriber_with_state(std::move(subscription), + on_next, + on_error, + on_completed, + std::forward(subscriber)); } }; diff --git a/src/rpp/rpp/operators/map.hpp b/src/rpp/rpp/operators/map.hpp index bd19dbd70..e36f2ab0f 100644 --- a/src/rpp/rpp/operators/map.hpp +++ b/src/rpp/rpp/operators/map.hpp @@ -43,7 +43,12 @@ struct map_impl template auto operator()(TSub&& subscriber) const { - return create_subscriber_in_lift(std::forward(subscriber), on_next); + auto subscription = subscriber.get_subscription(); + return create_subscriber_with_state(std::move(subscription), + on_next, + utils::forwarding_on_error{}, + utils::forwarding_on_completed{}, + std::forward(subscriber)); } }; } // namespace rpp::details diff --git a/src/rpp/rpp/operators/take_while.hpp b/src/rpp/rpp/operators/take_while.hpp index 815789f9e..3ec29d5b8 100644 --- a/src/rpp/rpp/operators/take_while.hpp +++ b/src/rpp/rpp/operators/take_while.hpp @@ -22,7 +22,7 @@ IMPLEMENTATION_FILE(take_while_tag); namespace rpp::details { template Predicate> -struct take_while_impl +struct take_while_impl_on_next { RPP_NO_UNIQUE_ADDRESS Predicate predicate; @@ -35,4 +35,21 @@ struct take_while_impl subscriber.on_completed(); } }; + +template Predicate> +struct take_while_impl +{ + RPP_NO_UNIQUE_ADDRESS take_while_impl_on_next on_next; + + template + auto operator()(TSub&& subscriber) const + { + auto subscription = subscriber.get_subscription(); + return create_subscriber_with_state(std::move(subscription), + on_next, + utils::forwarding_on_error{}, + utils::forwarding_on_completed{}, + std::forward(subscriber)); + } +}; } // namespace rpp::details