Skip to content

Commit

Permalink
simplify
Browse files Browse the repository at this point in the history
  • Loading branch information
victimsnino committed Dec 16, 2022
1 parent 935ceb5 commit 81cb505
Show file tree
Hide file tree
Showing 4 changed files with 47 additions and 21 deletions.
18 changes: 17 additions & 1 deletion src/rpp/rpp/operators/filter.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ IMPLEMENTATION_FILE(filter_tag);
namespace rpp::details
{
template<constraint::decayed_type Type, std::predicate<const Type&> Predicate>
struct filter_impl
struct filter_impl_on_next
{
RPP_NO_UNIQUE_ADDRESS Predicate predicate;

Expand All @@ -35,4 +35,20 @@ struct filter_impl
}
};

template<constraint::decayed_type Type, std::predicate<const Type&> Predicate>
struct filter_impl
{
RPP_NO_UNIQUE_ADDRESS filter_impl_on_next<Type, Predicate> on_next;

template<constraint::subscriber TSub>
auto operator()(TSub&& subscriber) const
{
auto subscription = subscriber.get_subscription();
return create_subscriber_with_state<Type>(std::move(subscription),
on_next,
utils::forwarding_on_error{},
utils::forwarding_on_completed{},
std::forward<TSub>(subscriber));
}
};
} // namespace rpp::details
24 changes: 6 additions & 18 deletions src/rpp/rpp/operators/lift.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,23 +22,6 @@ IMPLEMENTATION_FILE(lift_tag);

namespace rpp::details
{
template<constraint::decayed_type Type,
constraint::subscriber TSub,
std::invocable<Type, TSub> OnNext,
std::invocable<std::exception_ptr, TSub> OnError = utils::forwarding_on_error,
std::invocable<TSub> OnCompleted = utils::forwarding_on_completed>
auto create_subscriber_in_lift(TSub&& subscriber,
OnNext&& on_next = {},
OnError&& on_error = {},
OnCompleted&& on_completed = {})
{
auto subscription = subscriber.get_subscription();
return create_subscriber_with_state<Type>(std::move(subscription),
std::forward<OnNext>(on_next),
std::forward<OnError>(on_error),
std::forward<OnCompleted>(on_completed),
std::forward<TSub>(subscriber));
}
template<constraint::decayed_type Type, constraint::decayed_type OnNext, constraint::decayed_type OnError, constraint::decayed_type OnCompleted>
struct lift_action_by_callbacks
{
Expand All @@ -49,7 +32,12 @@ struct lift_action_by_callbacks
template<constraint::subscriber TSub>
auto operator()(TSub&& subscriber) const
{
return create_subscriber_in_lift<Type>(std::forward<TSub>(subscriber), on_next, on_error, on_completed);
auto subscription = subscriber.get_subscription();
return create_subscriber_with_state<Type>(std::move(subscription),
on_next,
on_error,
on_completed,
std::forward<TSub>(subscriber));
}
};

Expand Down
7 changes: 6 additions & 1 deletion src/rpp/rpp/operators/map.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,12 @@ struct map_impl
template<constraint::subscriber TSub>
auto operator()(TSub&& subscriber) const
{
return create_subscriber_in_lift<Type>(std::forward<TSub>(subscriber), on_next);
auto subscription = subscriber.get_subscription();
return create_subscriber_with_state<Type>(std::move(subscription),
on_next,
utils::forwarding_on_error{},
utils::forwarding_on_completed{},
std::forward<TSub>(subscriber));
}
};
} // namespace rpp::details
19 changes: 18 additions & 1 deletion src/rpp/rpp/operators/take_while.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ IMPLEMENTATION_FILE(take_while_tag);
namespace rpp::details
{
template<constraint::decayed_type Type, std::predicate<const Type&> Predicate>
struct take_while_impl
struct take_while_impl_on_next
{
RPP_NO_UNIQUE_ADDRESS Predicate predicate;

Expand All @@ -35,4 +35,21 @@ struct take_while_impl
subscriber.on_completed();
}
};

template<constraint::decayed_type Type, std::predicate<const Type&> Predicate>
struct take_while_impl
{
RPP_NO_UNIQUE_ADDRESS take_while_impl_on_next<Type, Predicate> on_next;

template<constraint::subscriber TSub>
auto operator()(TSub&& subscriber) const
{
auto subscription = subscriber.get_subscription();
return create_subscriber_with_state<Type>(std::move(subscription),
on_next,
utils::forwarding_on_error{},
utils::forwarding_on_completed{},
std::forward<TSub>(subscriber));
}
};
} // namespace rpp::details

0 comments on commit 81cb505

Please sign in to comment.