-
Notifications
You must be signed in to change notification settings - Fork 31
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement flat_map operator #374
Changes from 2 commits
4f58a84
a748a6f
bd37f38
47e25dd
9e99db8
aba97fa
627a88e
45e0ec6
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,68 @@ | ||
// ReactivePlusPlus library | ||
// | ||
// Copyright Aleksey Loginov 2023 - present. | ||
// Distributed under the Boost Software License, Version 1.0. | ||
// (See accompanying file LICENSE_1_0.txt or copy at | ||
// https://www.boost.org/LICENSE_1_0.txt) | ||
// | ||
// Project home: https://github.com/victimsnino/ReactivePlusPlus | ||
// | ||
|
||
#pragma once | ||
|
||
#include <rpp/operators/map.hpp> | ||
#include <rpp/operators/merge.hpp> | ||
#include <rpp/defs.hpp> | ||
|
||
namespace rpp::operators::details | ||
{ | ||
|
||
template<rpp::constraint::decayed_type Fn> | ||
struct flat_map_t | ||
{ | ||
RPP_NO_UNIQUE_ADDRESS Fn m_fn; | ||
|
||
template<rpp::constraint::observable TObservable, typename ValueType = rpp::utils::extract_observable_type_t<TObservable>> | ||
requires (std::invocable<Fn, ValueType> && rpp::constraint::observable<std::invoke_result_t<Fn, ValueType>>) | ||
auto operator()(TObservable&& observable) const & | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you add test to cover this overloading? (it is enough to apply operator from variable in any test case) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done |
||
{ | ||
return std::forward<TObservable>(observable) | ||
| rpp::ops::map(m_fn) | ||
| rpp::ops::merge(); | ||
|
||
} | ||
|
||
template<rpp::constraint::observable TObservable, typename ValueType = rpp::utils::extract_observable_type_t<TObservable>> | ||
requires (std::invocable<Fn, ValueType> && rpp::constraint::observable<std::invoke_result_t<Fn, ValueType>>) | ||
auto operator()(TObservable&& observable) && | ||
{ | ||
return std::forward<TObservable>(observable) | ||
| rpp::ops::map(std::move(m_fn)) | ||
| rpp::ops::merge(); | ||
} | ||
}; | ||
|
||
} // namespace rpp::operators::details | ||
|
||
namespace rpp::operators | ||
{ | ||
|
||
/** | ||
* @brief Transform the items emitted by an Observable into Observables, then flatten the emissions from those into a single Observable | ||
* | ||
* @details Note that flat_map merges the emissions of these Observables, so that they may interleave. | ||
* | ||
* @param a function that returns an observable for each item emitted by the source observable. | ||
* @warning #include <rpp/operators/flat_map.hpp> | ||
* | ||
* @ingroup transforming_operators | ||
* @see https://reactivex.io/documentation/operators/flatmap.html | ||
*/ | ||
template<typename Fn> | ||
requires (!utils::is_not_template_callable<Fn> || rpp::constraint::observable<std::invoke_result_t<Fn, utils::convertible_to_any>>) | ||
auto flat_map(Fn&& callable) | ||
{ | ||
return details::flat_map_t<std::decay_t<Fn>>{std::forward<Fn>(callable)}; | ||
} | ||
|
||
} // namespace rpp::operators |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,148 @@ | ||
// ReactivePlusPlus library | ||
// | ||
// Copyright Aleksey Loginov 2023 - present. | ||
// Distributed under the Boost Software License, Version 1.0. | ||
// (See accompanying file LICENSE_1_0.txt or copy at | ||
// https://www.boost.org/LICENSE_1_0.txt) | ||
// | ||
// Project home: https://github.com/victimsnino/ReactivePlusPlus | ||
// | ||
|
||
#include <snitch/snitch.hpp> | ||
#include <snitch/snitch_macros_check.hpp> | ||
|
||
#include <rpp/operators/flat_map.hpp> | ||
#include <rpp/sources/create.hpp> | ||
#include <rpp/sources/just.hpp> | ||
#include <rpp/sources/empty.hpp> | ||
#include <rpp/sources/error.hpp> | ||
#include <rpp/sources/never.hpp> | ||
#include <rpp/schedulers/immediate.hpp> | ||
|
||
#include "copy_count_tracker.hpp" | ||
#include "mock_observer.hpp" | ||
|
||
#include <stdexcept> | ||
#include <string> | ||
|
||
TEMPLATE_TEST_CASE("flat_map", "", rpp::memory_model::use_stack, rpp::memory_model::use_shared) | ||
{ | ||
auto mock = mock_observer_strategy<int>(); | ||
SECTION("observable of items") | ||
{ | ||
auto obs = rpp::source::just<TestType>(rpp::schedulers::immediate{}, 1, 2, 3); | ||
|
||
SECTION("subscribe using flat_map") | ||
{ | ||
obs | rpp::operators::flat_map([](int v) { return rpp::source::just(v * 2); }) | ||
| rpp::ops::subscribe(mock.get_observer()); | ||
SECTION("observer obtains values from underlying observables") | ||
{ | ||
CHECK(mock.get_received_values() == std::vector{ 2, 4, 6 }); | ||
CHECK(mock.get_on_completed_count() == 1); | ||
CHECK(mock.get_on_error_count() == 0); | ||
} | ||
} | ||
SECTION("subscribe using flat_map with error") | ||
{ | ||
obs | rpp::operators::flat_map([](int) { return rpp::source::error<int>(std::make_exception_ptr(std::runtime_error{""})); }) | ||
| rpp::ops::subscribe(mock.get_observer()); | ||
SECTION("observer obtains values from underlying observables") | ||
{ | ||
CHECK(mock.get_total_on_next_count() == 0); | ||
CHECK(mock.get_on_completed_count() == 0); | ||
CHECK(mock.get_on_error_count() == 1); | ||
} | ||
} | ||
SECTION("subscribe using flat_map with empty") | ||
{ | ||
obs | rpp::operators::flat_map([](int) { return rpp::source::empty<int>(); }) | ||
| rpp::ops::subscribe(mock.get_observer()); | ||
SECTION("observer obtains values from underlying observables") | ||
{ | ||
CHECK(mock.get_total_on_next_count() == 0); | ||
CHECK(mock.get_on_completed_count() == 1); | ||
CHECK(mock.get_on_error_count() == 0); | ||
} | ||
} | ||
SECTION("subscribe using flat_map with never") | ||
{ | ||
obs | rpp::operators::flat_map([](int) { return rpp::source::never<int>(); }) | ||
| rpp::ops::subscribe(mock.get_observer()); | ||
SECTION("observer obtains values from underlying observables") | ||
{ | ||
CHECK(mock.get_total_on_next_count() == 0); | ||
CHECK(mock.get_on_completed_count() == 0); | ||
CHECK(mock.get_on_error_count() == 0); | ||
} | ||
} | ||
SECTION("subscribe using flat_map with empty in middle") | ||
{ | ||
obs | rpp::operators::flat_map([](int v) { | ||
if (v == 2) | ||
return rpp::source::empty<int>().as_dynamic(); | ||
return rpp::source::just(v).as_dynamic(); | ||
}) | ||
| rpp::ops::subscribe(mock.get_observer()); | ||
SECTION("observer obtains values from underlying observables") | ||
{ | ||
CHECK(mock.get_received_values() == std::vector{ 1, 3 }); | ||
CHECK(mock.get_on_completed_count() == 1); | ||
CHECK(mock.get_on_error_count() == 0); | ||
} | ||
} | ||
SECTION("subscribe using flat_map with error in middle") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could you also add test over never in middle? to check that it is actually map + merge, not map + concat =) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done |
||
{ | ||
obs | rpp::operators::flat_map([](int v) { | ||
if (v == 2) | ||
return rpp::source::error<int>(std::make_exception_ptr(std::runtime_error{""})).as_dynamic(); | ||
return rpp::source::just(v).as_dynamic(); | ||
}) | ||
| rpp::ops::subscribe(mock.get_observer()); | ||
SECTION("observer obtains values from underlying observables") | ||
{ | ||
CHECK(mock.get_total_on_next_count() == 0); | ||
CHECK(mock.get_on_completed_count() == 0); | ||
CHECK(mock.get_on_error_count() == 1); | ||
} | ||
} | ||
SECTION("subscribe using flat_map with templated lambda") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. i think, you can just place it in a748a6f#diff-4ad61bca345fc2ef117d5617e164dca71340d801c636340f2c051a5bd4e6e9d2R37 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done |
||
{ | ||
obs | rpp::operators::flat_map([](auto v) { return rpp::source::just(v * 2); }) | ||
| rpp::ops::subscribe(mock.get_observer()); | ||
SECTION("observer obtains values from underlying observables") | ||
{ | ||
CHECK(mock.get_on_completed_count() == 1); | ||
} | ||
} | ||
} | ||
} | ||
|
||
TEST_CASE("flat_map copies/moves") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think, you can remove this test_case due to you are trying to test map + test merge at the same time, it is pretty hard. It is better to have corresponding tests to map and merge. I'm going to add them later. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yeah agree, done |
||
{ | ||
SECTION("flat_map doesn't produce extra copies") | ||
{ | ||
copy_count_tracker verifier{}; | ||
auto obs = rpp::source::create<copy_count_tracker>([verifier = std::move(verifier)](const auto& obs) { obs.on_next(verifier); }) | ||
| rpp::ops::map([](copy_count_tracker verifier) { return std::move(verifier); }) // copy from source to map | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. try instead auto obs = rpp::source::create<copy_count_tracker>([verifier = std::move(verifier)](const auto& obs) { obs.on_next(std::move(verifier)); }) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Though I subscribe two times here, so verifier will be in an invalid state in second subscription as it was already moved no? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I changed to capture by ref and pass by value There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
No, verifier is special class that is valid after move =) |
||
| rpp::ops::flat_map([](copy_count_tracker&& verifier) { // no copy | ||
return rpp::source::create<copy_count_tracker>([verifier = std::move(verifier)](const auto& obs) { obs.on_next(std::move(verifier)); }); | ||
}); | ||
SECTION("first subscribe") | ||
{ | ||
obs.subscribe([](const auto&){}); // subscribe by const lvalue reference so no copy | ||
SECTION("no extra copies") | ||
{ | ||
REQUIRE(verifier.get_copy_count() == 1); // only one copy from source to first operator | ||
} | ||
} | ||
SECTION("second subscribe") | ||
{ | ||
obs.subscribe([](auto){}); // subscribe by value so one additional copy | ||
SECTION("no extra copies") | ||
{ | ||
REQUIRE(verifier.get_copy_count() == 1 + 1); | ||
} | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see your attempt with ValueType to avoid duplicates, but it would affect final binaries due to it would be compiled-in as template function with 2 params instead of one =C it is better to create special concept instead
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done