Skip to content

Commit

Permalink
Correctly share current_thread with new_thread (#542)
Browse files Browse the repository at this point in the history
* add test

* fix new_thread vs current_thread

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

---------

Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
  • Loading branch information
victimsnino and pre-commit-ci[bot] authored Mar 12, 2024
1 parent b2a8748 commit a15df30
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 23 deletions.
30 changes: 7 additions & 23 deletions src/rpp/rpp/schedulers/new_thread.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,25 +40,9 @@ namespace rpp::schedulers
};
}

~disposable() override
{
if (!m_thread.joinable())
return;

{
std::lock_guard lock{m_state->mutex};
m_state->is_destroying.store(true, std::memory_order::relaxed);
}
m_state->cv.notify_all();
m_thread.detach();
}

template<rpp::schedulers::constraint::schedulable_handler Handler, typename... Args, constraint::schedulable_fn<Handler, Args...> Fn>
void defer_to(time_point time_point, Fn&& fn, Handler&& handler, Args&&... args)
{
if (is_disposed())
return;

std::lock_guard lock{m_state->mutex};
// guarded by lock
if (const auto queue = m_state->queue_ptr.load(std::memory_order::seq_cst))
Expand Down Expand Up @@ -87,24 +71,23 @@ namespace rpp::schedulers
{
std::atomic<details::schedulables_queue<current_thread::worker_strategy>*> queue_ptr{};
std::atomic_bool is_disposed{};
std::atomic_bool is_destroying{};
};

static void data_thread(std::shared_ptr<state_t> state)
{
auto& queue = current_thread::s_queue;
state->queue_ptr.store(&queue.emplace(state), std::memory_order::seq_cst);

while (!state->is_disposed.load(std::memory_order::seq_cst))
while (true)
{
std::unique_lock lock{state->mutex};

if (state->is_destroying.load(std::memory_order::seq_cst) && queue->is_empty())
if (queue->is_empty() && state->is_disposed.load(std::memory_order::seq_cst))
break;

state->cv.wait(lock, [&] { return state->is_disposed.load(std::memory_order::seq_cst) || !queue->is_empty() || state->is_destroying.load(std::memory_order::seq_cst); });
state->cv.wait(lock, [&] { return !queue->is_empty() || state->is_disposed.load(std::memory_order::seq_cst); });

if (state->is_disposed.load(std::memory_order::seq_cst) || state->is_destroying.load(std::memory_order::seq_cst))
if (queue->is_empty())
break;

if (queue->top()->is_disposed())
Expand All @@ -117,7 +100,7 @@ namespace rpp::schedulers
{
if (const auto now = worker_strategy::now(); now < queue->top()->get_timepoint())
{
state->cv.wait_for(lock, queue->top()->get_timepoint() - now, [&] { return state->is_disposed.load(std::memory_order::seq_cst) || state->is_destroying.load(std::memory_order::seq_cst) || worker_strategy::now() >= queue->top()->get_timepoint(); });
state->cv.wait_for(lock, queue->top()->get_timepoint() - now, [&] { return queue->top()->is_disposed() || worker_strategy::now() >= queue->top()->get_timepoint(); });
continue;
}
}
Expand All @@ -126,7 +109,8 @@ namespace rpp::schedulers
lock.unlock();

if (const auto timepoint = (*top)())
queue->emplace(timepoint.value(), std::move(top));
if (!top->is_disposed())
queue->emplace(timepoint.value(), std::move(top));
}

std::unique_lock lock{state->mutex};
Expand Down
50 changes: 50 additions & 0 deletions src/tests/rpp/test_scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -895,4 +895,54 @@ TEST_CASE("different delaying strategies")
CHECK(scheduler.get_schedulings() == std::vector{now, now + delay});
CHECK(scheduler.get_executions() == std::vector{now});
}
}

TEST_CASE("current_thread inside new_thread")
{
auto worker = std::optional{rpp::schedulers::new_thread{}.create_worker()};
auto d = rpp::composite_disposable_wrapper::make();
auto obs = std::optional{mock_observer_strategy<int>{}.get_observer(d).as_dynamic()};
auto started = std::make_shared<std::atomic_bool>();
auto done = std::make_shared<std::atomic_bool>();

worker->schedule([&](const auto&) {
thread_local rpp::utils::finally_action th{[done] {
done->store(true);
}};
return rpp::schedulers::optional_delay_from_now{};
},
obs.value());

auto current_thread_invoked = std::make_shared<std::atomic_bool>();

worker->schedule([&](const auto& obs) {
worker->get_disposable().dispose();

rpp::schedulers::current_thread{}.create_worker().schedule([current_thread_invoked](const auto&) {
current_thread_invoked->store(true);
return rpp::schedulers::optional_delay_from_now{};
},
obs);

if (current_thread_invoked->load())
throw std::runtime_error{"current_thread was invoked"};

started->store(true);

return rpp::schedulers::optional_delay_from_now{};
},
obs.value());

while (!started->load())
{
}

worker.reset();
obs.reset();
d = rpp::composite_disposable_wrapper::empty();

std::this_thread::sleep_for(std::chrono::seconds{1});

REQUIRE(done->load());
CHECK(current_thread_invoked->load());
}

1 comment on commit a15df30

@github-actions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BENCHMARK RESULTS (AUTOGENERATED)

ci-ubuntu-gcc

General

name rxcpp rpp prev rpp ratio
Subscribe empty callbacks to empty observable 302.83 ns 2.16 ns 2.16 ns 1.00
Subscribe empty callbacks to empty observable via pipe operator 307.65 ns 2.16 ns 2.16 ns 1.00

Sources

name rxcpp rpp prev rpp ratio
from array of 1 - create + subscribe + immediate 702.74 ns 0.31 ns 0.62 ns 0.50
from array of 1 - create + subscribe + current_thread 1044.14 ns 5.25 ns 4.64 ns 1.13
concat_as_source of just(1 immediate) create + subscribe 2225.26 ns 114.39 ns 112.55 ns 1.02
defer from array of 1 - defer + create + subscribe + immediate 747.05 ns 0.31 ns 0.31 ns 1.00
interval - interval + take(3) + subscribe + immediate 2160.47 ns 59.23 ns 59.19 ns 1.00
interval - interval + take(3) + subscribe + current_thread 3021.48 ns 32.74 ns 32.74 ns 1.00

Filtering Operators

name rxcpp rpp prev rpp ratio
immediate_just+take(1)+subscribe 1075.64 ns 0.31 ns 0.31 ns 1.00
immediate_just+filter(true)+subscribe 862.69 ns 0.31 ns 0.31 ns 1.00
immediate_just(1,2)+skip(1)+subscribe 985.05 ns 0.31 ns 0.31 ns 1.00
immediate_just(1,1,2)+distinct_until_changed()+subscribe 862.91 ns 0.31 ns 0.31 ns 1.00
immediate_just(1,2)+first()+subscribe 1244.09 ns 0.31 ns 0.31 ns 1.00
immediate_just(1,2)+last()+subscribe 909.75 ns 0.31 ns 0.31 ns 1.00
immediate_just+take_last(1)+subscribe 1114.44 ns 18.22 ns 17.40 ns 1.05

Schedulers

name rxcpp rpp prev rpp ratio
immediate scheduler create worker + schedule 263.73 ns 2.16 ns 2.16 ns 1.00
current_thread scheduler create worker + schedule 365.91 ns 7.11 ns 6.49 ns 1.10
current_thread scheduler create worker + schedule + recursive schedule 831.07 ns 64.95 ns 65.18 ns 1.00

Transforming Operators

name rxcpp rpp prev rpp ratio
immediate_just+map(v*2)+subscribe 845.52 ns 0.31 ns 0.31 ns 1.00
immediate_just+scan(10, std::plus)+subscribe 903.23 ns 0.31 ns 0.31 ns 1.00
immediate_just+flat_map(immediate_just(v*2))+subscribe 2387.07 ns 159.07 ns 173.31 ns 0.92
immediate_just+buffer(2)+subscribe 1541.05 ns 13.90 ns 13.60 ns 1.02
immediate_just+window(2)+subscribe + subscsribe inner 2327.65 ns 1047.67 ns 1121.12 ns 0.93

Conditional Operators

name rxcpp rpp prev rpp ratio
immediate_just+take_while(false)+subscribe 822.19 ns - - 0.00
immediate_just+take_while(true)+subscribe 837.14 ns 0.31 ns 0.31 ns 1.00

Utility Operators

name rxcpp rpp prev rpp ratio
immediate_just(1)+subscribe_on(immediate)+subscribe 1982.81 ns 0.31 ns 0.31 ns 1.00

Combining Operators

name rxcpp rpp prev rpp ratio
immediate_just(immediate_just(1), immediate_just(1)) + merge() + subscribe 3463.48 ns 177.02 ns 180.56 ns 0.98
immediate_just(1) + merge_with(immediate_just(2)) + subscribe 3692.25 ns 170.72 ns 172.20 ns 0.99
immediate_just(1) + with_latest_from(immediate_just(2)) + subscribe - 139.21 ns 136.52 ns 1.02
immediate_just(immediate_just(1),immediate_just(1)) + switch_on_next() + subscribe 3608.95 ns 930.28 ns 1103.02 ns 0.84
immediate_just(1) + zip(immediate_just(2)) + subscribe 2123.79 ns 216.41 ns 224.79 ns 0.96

Subjects

name rxcpp rpp prev rpp ratio
publish_subject with 1 observer - on_next 34.49 ns 12.03 ns 11.71 ns 1.03

Scenarios

name rxcpp rpp prev rpp ratio
basic sample 1464.13 ns 14.51 ns 13.59 ns 1.07
basic sample with immediate scheduler 1366.19 ns 5.55 ns 5.55 ns 1.00

Aggregating Operators

name rxcpp rpp prev rpp ratio
immediate_just+reduce(10, std::plus)+subscribe 921.69 ns 0.31 ns 0.31 ns 1.00

Error Handling Operators

name rxcpp rpp prev rpp ratio
create(on_next(1), on_error())+on_error_resume_next(immediate_just(2)))+subscribe 1046.55 ns 122.87 ns 126.31 ns 0.97

ci-macos

General

name rxcpp rpp prev rpp ratio
Subscribe empty callbacks to empty observable 978.75 ns 4.05 ns 3.85 ns 1.05
Subscribe empty callbacks to empty observable via pipe operator 972.95 ns 4.05 ns 3.87 ns 1.05

Sources

name rxcpp rpp prev rpp ratio
from array of 1 - create + subscribe + immediate 1928.73 ns 0.23 ns 0.23 ns 1.00
from array of 1 - create + subscribe + current_thread 2442.72 ns 25.47 ns 25.35 ns 1.00
concat_as_source of just(1 immediate) create + subscribe 5518.91 ns 327.94 ns 327.75 ns 1.00
defer from array of 1 - defer + create + subscribe + immediate 2072.62 ns 0.23 ns 0.23 ns 1.01
interval - interval + take(3) + subscribe + immediate 5469.24 ns 122.78 ns 114.03 ns 1.08
interval - interval + take(3) + subscribe + current_thread 6855.89 ns 117.12 ns 105.26 ns 1.11

Filtering Operators

name rxcpp rpp prev rpp ratio
immediate_just+take(1)+subscribe 2998.10 ns 0.25 ns 0.23 ns 1.08
immediate_just+filter(true)+subscribe 2214.92 ns 0.25 ns 0.23 ns 1.08
immediate_just(1,2)+skip(1)+subscribe 2936.95 ns 0.25 ns 0.23 ns 1.07
immediate_just(1,1,2)+distinct_until_changed()+subscribe 2352.95 ns 0.52 ns 0.48 ns 1.08
immediate_just(1,2)+first()+subscribe 3389.31 ns 0.25 ns 0.23 ns 1.07
immediate_just(1,2)+last()+subscribe 2527.87 ns 0.25 ns 0.24 ns 1.07
immediate_just+take_last(1)+subscribe 3292.45 ns 79.26 ns 68.56 ns 1.16

Schedulers

name rxcpp rpp prev rpp ratio
immediate scheduler create worker + schedule 861.37 ns 4.85 ns 3.84 ns 1.26
current_thread scheduler create worker + schedule 1192.92 ns 38.48 ns 36.79 ns 1.05
current_thread scheduler create worker + schedule + recursive schedule 1999.02 ns 218.91 ns 210.36 ns 1.04

Transforming Operators

name rxcpp rpp prev rpp ratio
immediate_just+map(v*2)+subscribe 2454.17 ns 0.27 ns 0.23 ns 1.14
immediate_just+scan(10, std::plus)+subscribe 2475.27 ns 0.48 ns 0.47 ns 1.03
immediate_just+flat_map(immediate_just(v*2))+subscribe 5761.31 ns 458.29 ns 416.38 ns 1.10
immediate_just+buffer(2)+subscribe 2717.69 ns 78.15 ns 67.71 ns 1.15
immediate_just+window(2)+subscribe + subscsribe inner 5776.13 ns 2534.48 ns 2303.10 ns 1.10

Conditional Operators

name rxcpp rpp prev rpp ratio
immediate_just+take_while(false)+subscribe 2371.41 ns - - 0.00
immediate_just+take_while(true)+subscribe 2388.30 ns 0.27 ns 0.23 ns 1.16

Utility Operators

name rxcpp rpp prev rpp ratio
immediate_just(1)+subscribe_on(immediate)+subscribe 5556.23 ns 0.27 ns 0.23 ns 1.14

Combining Operators

name rxcpp rpp prev rpp ratio
immediate_just(immediate_just(1), immediate_just(1)) + merge() + subscribe 7489.08 ns 452.85 ns 450.84 ns 1.00
immediate_just(1) + merge_with(immediate_just(2)) + subscribe 8678.46 ns 447.38 ns 447.37 ns 1.00
immediate_just(1) + with_latest_from(immediate_just(2)) + subscribe - 477.81 ns 474.04 ns 1.01
immediate_just(immediate_just(1),immediate_just(1)) + switch_on_next() + subscribe 8628.39 ns 2001.28 ns 1895.16 ns 1.06
immediate_just(1) + zip(immediate_just(2)) + subscribe 5595.18 ns 901.45 ns 860.67 ns 1.05

Subjects

name rxcpp rpp prev rpp ratio
publish_subject with 1 observer - on_next 77.77 ns 52.03 ns 49.55 ns 1.05

Scenarios

name rxcpp rpp prev rpp ratio
basic sample 2916.14 ns 116.81 ns 109.76 ns 1.06
basic sample with immediate scheduler 2843.63 ns 16.11 ns 15.21 ns 1.06

Aggregating Operators

name rxcpp rpp prev rpp ratio
immediate_just+reduce(10, std::plus)+subscribe 2381.45 ns 0.24 ns 0.23 ns 1.05

Error Handling Operators

name rxcpp rpp prev rpp ratio
create(on_next(1), on_error())+on_error_resume_next(immediate_just(2)))+subscribe 6783.26 ns 4169.46 ns 4108.04 ns 1.01

ci-ubuntu-clang

General

name rxcpp rpp prev rpp ratio
Subscribe empty callbacks to empty observable 277.12 ns 0.88 ns 1.56 ns 0.56
Subscribe empty callbacks to empty observable via pipe operator 286.50 ns 0.88 ns 1.56 ns 0.56

Sources

name rxcpp rpp prev rpp ratio
from array of 1 - create + subscribe + immediate 573.35 ns 0.31 ns 0.31 ns 1.00
from array of 1 - create + subscribe + current_thread 802.16 ns 5.55 ns 5.55 ns 1.00
concat_as_source of just(1 immediate) create + subscribe 1915.85 ns 114.17 ns 113.06 ns 1.01
defer from array of 1 - defer + create + subscribe + immediate 594.32 ns 0.31 ns 0.31 ns 1.00
interval - interval + take(3) + subscribe + immediate 1548.82 ns 58.26 ns 58.39 ns 1.00
interval - interval + take(3) + subscribe + current_thread 2108.18 ns 31.47 ns 31.48 ns 1.00

Filtering Operators

name rxcpp rpp prev rpp ratio
immediate_just+take(1)+subscribe 908.41 ns 0.31 ns 0.31 ns 1.00
immediate_just+filter(true)+subscribe 664.62 ns 0.31 ns 0.31 ns 1.00
immediate_just(1,2)+skip(1)+subscribe 855.72 ns 0.31 ns 0.31 ns 1.00
immediate_just(1,1,2)+distinct_until_changed()+subscribe 701.24 ns 0.31 ns 0.31 ns 1.00
immediate_just(1,2)+first()+subscribe 1111.32 ns 0.31 ns 0.31 ns 1.00
immediate_just(1,2)+last()+subscribe 761.28 ns 0.31 ns 0.31 ns 1.00
immediate_just+take_last(1)+subscribe 957.11 ns 0.31 ns 0.31 ns 1.00

Schedulers

name rxcpp rpp prev rpp ratio
immediate scheduler create worker + schedule 204.83 ns 0.88 ns 1.56 ns 0.56
current_thread scheduler create worker + schedule 330.31 ns 5.89 ns 5.57 ns 1.06
current_thread scheduler create worker + schedule + recursive schedule 630.15 ns 59.57 ns 58.83 ns 1.01

Transforming Operators

name rxcpp rpp prev rpp ratio
immediate_just+map(v*2)+subscribe 652.42 ns 0.31 ns 0.31 ns 1.00
immediate_just+scan(10, std::plus)+subscribe 708.45 ns 0.31 ns 0.31 ns 1.00
immediate_just+flat_map(immediate_just(v*2))+subscribe 1836.87 ns 119.13 ns 120.26 ns 0.99
immediate_just+buffer(2)+subscribe 1363.36 ns 14.50 ns 13.89 ns 1.04
immediate_just+window(2)+subscribe + subscsribe inner 2120.28 ns 811.84 ns 827.37 ns 0.98

Conditional Operators

name rxcpp rpp prev rpp ratio
immediate_just+take_while(false)+subscribe 702.09 ns - - 0.00
immediate_just+take_while(true)+subscribe 626.19 ns 0.31 ns 0.31 ns 1.00

Utility Operators

name rxcpp rpp prev rpp ratio
immediate_just(1)+subscribe_on(immediate)+subscribe 1615.23 ns 0.31 ns 0.31 ns 1.00

Combining Operators

name rxcpp rpp prev rpp ratio
immediate_just(immediate_just(1), immediate_just(1)) + merge() + subscribe 2571.95 ns 126.48 ns 125.81 ns 1.01
immediate_just(1) + merge_with(immediate_just(2)) + subscribe 3005.20 ns 121.78 ns 122.10 ns 1.00
immediate_just(1) + with_latest_from(immediate_just(2)) + subscribe - 110.24 ns 110.23 ns 1.00
immediate_just(immediate_just(1),immediate_just(1)) + switch_on_next() + subscribe 2642.32 ns 726.85 ns 732.25 ns 0.99
immediate_just(1) + zip(immediate_just(2)) + subscribe 1810.06 ns 165.34 ns 162.99 ns 1.01

Subjects

name rxcpp rpp prev rpp ratio
publish_subject with 1 observer - on_next 26.19 ns 13.87 ns 13.88 ns 1.00

Scenarios

name rxcpp rpp prev rpp ratio
basic sample 1060.57 ns 13.28 ns 13.27 ns 1.00
basic sample with immediate scheduler 1019.52 ns 6.17 ns 6.17 ns 1.00

Aggregating Operators

name rxcpp rpp prev rpp ratio
immediate_just+reduce(10, std::plus)+subscribe 748.87 ns 0.31 ns 0.31 ns 1.00

Error Handling Operators

name rxcpp rpp prev rpp ratio
create(on_next(1), on_error())+on_error_resume_next(immediate_just(2)))+subscribe 868.40 ns 125.86 ns 124.08 ns 1.01

ci-windows

General

name rxcpp rpp prev rpp ratio
Subscribe empty callbacks to empty observable 582.66 ns 4.94 ns 4.93 ns 1.00
Subscribe empty callbacks to empty observable via pipe operator 597.36 ns 4.93 ns 4.94 ns 1.00

Sources

name rxcpp rpp prev rpp ratio
from array of 1 - create + subscribe + immediate 1397.73 ns 5.55 ns 5.55 ns 1.00
from array of 1 - create + subscribe + current_thread 1693.05 ns 18.52 ns 18.51 ns 1.00
concat_as_source of just(1 immediate) create + subscribe 4873.22 ns 169.19 ns 169.60 ns 1.00
defer from array of 1 - defer + create + subscribe + immediate 1446.48 ns 5.55 ns 5.55 ns 1.00
interval - interval + take(3) + subscribe + immediate 3290.22 ns 133.72 ns 133.65 ns 1.00
interval - interval + take(3) + subscribe + current_thread 4008.56 ns 58.71 ns 58.72 ns 1.00

Filtering Operators

name rxcpp rpp prev rpp ratio
immediate_just+take(1)+subscribe 1853.11 ns 12.88 ns 12.85 ns 1.00
immediate_just+filter(true)+subscribe 1327.06 ns 11.71 ns 12.01 ns 0.98
immediate_just(1,2)+skip(1)+subscribe 1741.69 ns 13.13 ns 13.11 ns 1.00
immediate_just(1,1,2)+distinct_until_changed()+subscribe 1367.00 ns 15.81 ns 15.77 ns 1.00
immediate_just(1,2)+first()+subscribe 2079.27 ns 12.95 ns 12.97 ns 1.00
immediate_just(1,2)+last()+subscribe 1465.13 ns 14.10 ns 14.11 ns 1.00
immediate_just+take_last(1)+subscribe 2037.38 ns 59.18 ns 61.35 ns 0.96

Schedulers

name rxcpp rpp prev rpp ratio
immediate scheduler create worker + schedule 494.89 ns 6.17 ns 6.18 ns 1.00
current_thread scheduler create worker + schedule 875.46 ns 18.21 ns 18.20 ns 1.00
current_thread scheduler create worker + schedule + recursive schedule 1320.99 ns 116.49 ns 115.51 ns 1.01

Transforming Operators

name rxcpp rpp prev rpp ratio
immediate_just+map(v*2)+subscribe 1340.35 ns 11.19 ns 11.22 ns 1.00
immediate_just+scan(10, std::plus)+subscribe 1431.22 ns 21.60 ns 21.59 ns 1.00
immediate_just+flat_map(immediate_just(v*2))+subscribe 3536.27 ns 233.82 ns 227.30 ns 1.03
immediate_just+buffer(2)+subscribe 2327.43 ns 59.46 ns 59.92 ns 0.99
immediate_just+window(2)+subscribe + subscsribe inner 4404.45 ns 1501.79 ns 1534.08 ns 0.98

Conditional Operators

name rxcpp rpp prev rpp ratio
immediate_just+take_while(false)+subscribe 1322.05 ns 11.45 ns 11.43 ns 1.00
immediate_just+take_while(true)+subscribe 1665.14 ns 11.72 ns 11.68 ns 1.00

Utility Operators

name rxcpp rpp prev rpp ratio
immediate_just(1)+subscribe_on(immediate)+subscribe 3525.51 ns 7.71 ns 7.71 ns 1.00

Combining Operators

name rxcpp rpp prev rpp ratio
immediate_just(immediate_just(1), immediate_just(1)) + merge() + subscribe 6035.88 ns 257.03 ns 252.13 ns 1.02
immediate_just(1) + merge_with(immediate_just(2)) + subscribe 6415.73 ns 240.31 ns 234.82 ns 1.02
immediate_just(1) + with_latest_from(immediate_just(2)) + subscribe - 233.58 ns 240.74 ns 0.97
immediate_just(immediate_just(1),immediate_just(1)) + switch_on_next() + subscribe 5506.28 ns 974.08 ns 959.33 ns 1.02
immediate_just(1) + zip(immediate_just(2)) + subscribe 3626.06 ns 533.96 ns 534.22 ns 1.00

Subjects

name rxcpp rpp prev rpp ratio
publish_subject with 1 observer - on_next 36.57 ns 26.51 ns 26.51 ns 1.00

Scenarios

name rxcpp rpp prev rpp ratio
basic sample 2149.16 ns 59.67 ns 59.58 ns 1.00
basic sample with immediate scheduler 1871.61 ns 35.50 ns 35.48 ns 1.00

Aggregating Operators

name rxcpp rpp prev rpp ratio
immediate_just+reduce(10, std::plus)+subscribe 1481.88 ns 19.99 ns 19.98 ns 1.00

Error Handling Operators

name rxcpp rpp prev rpp ratio
create(on_next(1), on_error())+on_error_resume_next(immediate_just(2)))+subscribe 1938.21 ns 338.19 ns 332.73 ns 1.02

Please sign in to comment.