-
Notifications
You must be signed in to change notification settings - Fork 31
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
2ae50e7
commit 97144f7
Showing
13 changed files
with
387 additions
and
3 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,42 @@ | ||
#include <rpp/rpp.hpp> | ||
|
||
#include <iostream> | ||
|
||
/** | ||
* \example timeout.cpp | ||
**/ | ||
int main() | ||
{ | ||
//! [timeout] | ||
rpp::subjects::publish_subject<int> subj{}; | ||
subj.get_observable() | ||
.timeout(std::chrono::milliseconds{450}, rpp::schedulers::new_thread{}) | ||
.subscribe([](int v) { std::cout << "new value " << v << std::endl; }, | ||
[](std::exception_ptr err) | ||
{ | ||
try | ||
{ | ||
std::rethrow_exception(err); | ||
} | ||
catch (const std::exception& exc) | ||
{ | ||
std::cout << "ERR: " << exc.what() << std::endl; | ||
} | ||
}, | ||
[]() { std::cout << "completed" << std::endl; }); | ||
for (int i = 0; i < 10; ++i) | ||
{ | ||
std::this_thread::sleep_for(std::chrono::milliseconds{i * 100}); | ||
subj.get_subscriber().on_next(i); | ||
} | ||
|
||
// Output: | ||
// new value 0 | ||
// new value 1 | ||
// new value 2 | ||
// new value 3 | ||
// new value 4 | ||
// ERR : Timeout reached | ||
//! [timeout] | ||
return 0; | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,61 @@ | ||
// ReactivePlusPlus library | ||
// | ||
// Copyright Aleksey Loginov 2022 - present. | ||
// Distributed under the Boost Software License, Version 1.0. | ||
// (See accompanying file LICENSE_1_0.txt or copy at | ||
// https://www.boost.org/LICENSE_1_0.txt) | ||
// | ||
// Project home: https://github.com/victimsnino/ReactivePlusPlus | ||
// | ||
|
||
#pragma once | ||
|
||
#include <rpp/schedulers/constraints.hpp> | ||
|
||
#include <rpp/observables/details/member_overload.hpp> | ||
|
||
namespace rpp::details | ||
{ | ||
struct timeout_tag; | ||
} | ||
|
||
namespace rpp::details | ||
{ | ||
template<constraint::decayed_type Type, schedulers::constraint::scheduler TScheduler> | ||
struct timeout_impl; | ||
|
||
template<constraint::decayed_type Type, typename SpecificObservable> | ||
struct member_overload<Type, SpecificObservable, timeout_tag> | ||
{ | ||
/** | ||
* \brief Forwards emissions from original observable, but emit error if no any events during specified period of time (since last emission) | ||
* | ||
* \marble timeout | ||
{ | ||
source observable : +--1-2-3-4------5-| | ||
operator "timeout(4)" : +--1-2-3-4----# | ||
} | ||
* \param period is maximum duration between emitted items before a timeout occurs | ||
* \param scheduler is scheduler used to run timer for timeout | ||
* \return new specific_observable with the timeout operator as most recent operator. | ||
* \warning #include <rpp/operators/timeout.hpp> | ||
* | ||
* \par Example | ||
* \snippet timeout.cpp timeout | ||
* | ||
* \ingroup utility_operators | ||
* \see https://reactivex.io/documentation/operators/timeout.html | ||
*/ | ||
template<schedulers::constraint::scheduler TScheduler> | ||
auto timeout(schedulers::duration period, const TScheduler& scheduler = TScheduler{}) const & requires is_header_included<timeout_tag, TScheduler> | ||
{ | ||
return static_cast<const SpecificObservable*>(this)->template lift<Type>(timeout_impl<Type, TScheduler>{period, scheduler}); | ||
} | ||
|
||
template<schedulers::constraint::scheduler TScheduler> | ||
auto timeout(schedulers::duration period, const TScheduler& scheduler = TScheduler{}) && requires is_header_included<timeout_tag, TScheduler> | ||
{ | ||
return std::move(*static_cast<SpecificObservable*>(this)).template lift<Type>(timeout_impl<Type, TScheduler>{period, scheduler}); | ||
} | ||
}; | ||
} // namespace rpp::details |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,119 @@ | ||
// ReactivePlusPlus library | ||
// | ||
// Copyright Aleksey Loginov 2022 - present. | ||
// Distributed under the Boost Software License, Version 1.0. | ||
// (See accompanying file LICENSE_1_0.txt or copy at | ||
// https://www.boost.org/LICENSE_1_0.txt) | ||
// | ||
// Project home: https://github.com/victimsnino/ReactivePlusPlus | ||
// | ||
|
||
#pragma once | ||
|
||
#include <rpp/operators/lift.hpp> // required due to operator uses lift | ||
#include <rpp/operators/details/early_unsubscribe.hpp> | ||
#include <rpp/operators/details/serialized_subscriber.hpp> | ||
#include <rpp/operators/details/subscriber_with_state.hpp> | ||
#include <rpp/operators/fwd/timeout.hpp> | ||
#include <rpp/subscribers/constraints.hpp> | ||
#include <rpp/utils/exceptions.hpp> | ||
|
||
#include <rpp/utils/spinlock.hpp> | ||
|
||
#include <atomic> | ||
|
||
IMPLEMENTATION_FILE(timeout_tag); | ||
|
||
namespace rpp::details | ||
{ | ||
struct timeout_state : early_unsubscribe_state | ||
{ | ||
using early_unsubscribe_state::early_unsubscribe_state; | ||
|
||
std::atomic<schedulers::time_point> last_emission_time{}; | ||
|
||
static constexpr schedulers::time_point s_timeout_reached = schedulers::time_point::min(); | ||
}; | ||
|
||
template<typename Worker> | ||
struct timeout_on_next | ||
{ | ||
template<typename Value> | ||
void operator()(Value&& v, const auto& subscriber, const std::shared_ptr<timeout_state>& state) const | ||
{ | ||
if (state->last_emission_time.exchange(Worker::now(), std::memory_order_acq_rel) != timeout_state::s_timeout_reached) | ||
subscriber.on_next(std::forward<Value>(v)); | ||
} | ||
}; | ||
|
||
using timeout_on_error = early_unsubscribe_on_error; | ||
using timeout_on_completed = early_unsubscribe_on_completed; | ||
|
||
struct timeout_state_with_serialized_spinlock : timeout_state | ||
{ | ||
using timeout_state::timeout_state; | ||
|
||
// spinlock because most part of time there is only one thread would be active | ||
utils::spinlock spinlock{}; | ||
}; | ||
|
||
template<constraint::decayed_type Type, schedulers::constraint::scheduler TScheduler> | ||
struct timeout_impl | ||
{ | ||
schedulers::duration period; | ||
TScheduler scheduler; | ||
|
||
template<constraint::subscriber_of_type<Type> TSub> | ||
auto operator()(TSub&& in_subscriber) const | ||
{ | ||
auto state = std::make_shared<timeout_state_with_serialized_spinlock>(in_subscriber.get_subscription()); | ||
// change subscriber to serialized to avoid manual using of mutex | ||
auto subscriber = make_serialized_subscriber(std::forward<TSub>(in_subscriber), | ||
std::shared_ptr<utils::spinlock>{state, &state->spinlock}); | ||
|
||
const auto worker = scheduler.create_worker(state->children_subscriptions); | ||
state->last_emission_time.store(worker.now(), std::memory_order_relaxed); | ||
|
||
const auto last_emission_time = state->last_emission_time.load(std::memory_order_relaxed); | ||
worker.schedule(last_emission_time + period, | ||
[period = period, prev_emission_time = last_emission_time, subscriber, state]() mutable -> schedulers::optional_duration | ||
{ | ||
while (true) | ||
{ | ||
// last emission time still same value -> timeout reached, else -> prev_emission_time | ||
// would be update to actual emission time | ||
if (state->last_emission_time.compare_exchange_strong(prev_emission_time, | ||
timeout_state::s_timeout_reached, | ||
std::memory_order_acq_rel)) | ||
return time_is_out(state, subscriber); | ||
|
||
// if we still need to wait a bit more -> let's wait | ||
if (const auto diff_to_schedule = (prev_emission_time + period) - decltype(worker)::now(); | ||
diff_to_schedule > rpp::schedulers::duration{0}) | ||
return diff_to_schedule; | ||
|
||
// okay, we here because: | ||
// 1) last_emission_time was not equal to prev_emission_time | ||
// 2) last_emission_time + period before now -> we are still in timeout state | ||
// 3) prev_emission_time updated to last_emission_time | ||
// So we can return to begin | ||
} | ||
}); | ||
|
||
return create_subscriber_with_state<Type>(state->children_subscriptions, | ||
timeout_on_next<decltype(worker)>{}, | ||
timeout_on_error{}, | ||
timeout_on_completed{}, | ||
std::move(subscriber), | ||
std::move(state)); | ||
} | ||
|
||
private: | ||
static schedulers::optional_duration time_is_out(const auto& state, const auto& subscriber) | ||
{ | ||
state->children_subscriptions.unsubscribe(); | ||
subscriber.on_error(std::make_exception_ptr(utils::timeout{"Timeout reached"})); | ||
return std::nullopt; | ||
} | ||
}; | ||
} // namespace rpp::details |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
97144f7
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BENCHMARK RESULTS (AUTOGENERATED)
ci-ubuntu-clang
Observable construction
Table
Observable lift
Table
Observable subscribe
Table
Observable subscribe #2
Table
Observer construction
Table
OnNext
Table
Subscriber construction
Table
Subscription
Table
buffer
Table
chains creation test
Table
combine_latest
Table
concat
Table
distinct_until_changed
Table
first
Table
foundamental sources
Table
from
Table
immediate scheduler
Table
just
Table
last
Table
map
Table
merge
Table
observe_on
Table
on_error_resume_next
Table
publish_subject callbacks
Table
publish_subject routines
Table
repeat
Table
scan
Table
single-threaded locks
Table
skip
Table
switch_on_next
Table
take
Table
take_last
Table
take_until
Table
timeout
Table
trampoline scheduler
Table
window
Table
with_latest_from
Table
ci-ubuntu-gcc
Observable construction
Table
Observable lift
Table
Observable subscribe
Table
Observable subscribe #2
Table
Observer construction
Table
OnNext
Table
Subscriber construction
Table
Subscription
Table
buffer
Table
chains creation test
Table
combine_latest
Table
concat
Table
distinct_until_changed
Table
first
Table
foundamental sources
Table
from
Table
immediate scheduler
Table
just
Table
last
Table
map
Table
merge
Table
observe_on
Table
on_error_resume_next
Table
publish_subject callbacks
Table
publish_subject routines
Table
repeat
Table
scan
Table
single-threaded locks
Table
skip
Table
switch_on_next
Table
take
Table
take_last
Table
take_until
Table
timeout
Table
trampoline scheduler
Table
window
Table
with_latest_from
Table
ci-windows
Observable construction
Table
Observable lift
Table
Observable subscribe
Table
Observable subscribe #2
Table
Observer construction
Table
OnNext
Table
Subscriber construction
Table
Subscription
Table
buffer
Table
chains creation test
Table
combine_latest
Table
concat
Table
distinct_until_changed
Table
first
Table
foundamental sources
Table
from
Table
immediate scheduler
Table
just
Table
last
Table
map
Table
merge
Table
observe_on
Table
on_error_resume_next
Table
publish_subject callbacks
Table
publish_subject routines
Table
repeat
Table
scan
Table
single-threaded locks
Table
skip
Table
switch_on_next
Table
take
Table
take_last
Table
take_until
Table
timeout
Table
trampoline scheduler
Table
window
Table
with_latest_from
Table