From eda63cfc808cd5c39b4f139b68abac49b9a72c85 Mon Sep 17 00:00:00 2001 From: Kirk Shoop Date: Sun, 5 Aug 2018 15:15:24 -0700 Subject: [PATCH] fix blocking_observable::subscribe removes spinning from blocking submit. ran all perf tests on osx without issue. should fix #430 and help with #451 --- Rx/v2/src/rxcpp/rx-observable.hpp | 45 ++++------------------ Rx/v2/test/operators/merge_delay_error.cpp | 6 +-- 2 files changed, 10 insertions(+), 41 deletions(-) diff --git a/Rx/v2/src/rxcpp/rx-observable.hpp b/Rx/v2/src/rxcpp/rx-observable.hpp index 496db0b5c..037f375af 100644 --- a/Rx/v2/src/rxcpp/rx-observable.hpp +++ b/Rx/v2/src/rxcpp/rx-observable.hpp @@ -173,28 +173,9 @@ class blocking_observable -> void { std::mutex lock; std::condition_variable wake; + bool disposed = false; std::exception_ptr error; - struct tracking - { - ~tracking() - { - if (!disposed || !wakened) std::terminate(); - } - tracking() - { - disposed = false; - wakened = false; - false_wakes = 0; - true_wakes = 0; - } - std::atomic_bool disposed; - std::atomic_bool wakened; - std::atomic_int false_wakes; - std::atomic_int true_wakes; - }; - auto track = std::make_shared(); - auto dest = make_subscriber(std::forward(an)...); // keep any error to rethrow at the end. @@ -213,31 +194,19 @@ class blocking_observable auto cs = scbr.get_subscription(); cs.add( - [&, track](){ - // OSX geting invalid x86 op if notify_one is after the disposed = true - // presumably because the condition_variable may already have been awakened - // and is now sitting in a while loop on disposed + [&](){ + std::unique_lock guard(lock); wake.notify_one(); - track->disposed = true; + disposed = true; }); - std::unique_lock guard(lock); source.subscribe(std::move(scbr)); + std::unique_lock guard(lock); wake.wait(guard, - [&, track](){ - // this is really not good. - // false wakeups were never followed by true wakeups so.. - - // anyways this gets triggered before disposed is set now so wait. - while (!track->disposed) { - ++track->false_wakes; - } - ++track->true_wakes; - return true; + [&](){ + return disposed; }); - track->wakened = true; - if (!track->disposed || !track->wakened) std::terminate(); if (error) {std::rethrow_exception(error);} } diff --git a/Rx/v2/test/operators/merge_delay_error.cpp b/Rx/v2/test/operators/merge_delay_error.cpp index d560b453d..b53b88405 100644 --- a/Rx/v2/test/operators/merge_delay_error.cpp +++ b/Rx/v2/test/operators/merge_delay_error.cpp @@ -7,7 +7,7 @@ const int static_onnextcalls = 1000000; //merge_delay_error must work the very same way as `merge()` except the error handling -SCENARIO("merge completes", "[merge][join][operators]"){ +SCENARIO("merge_delay_error completes", "[merge][join][operators]"){ GIVEN("1 hot observable with 3 cold observables of ints."){ auto sc = rxsc::make_test(); auto w = sc.create_worker(); @@ -117,7 +117,7 @@ SCENARIO("merge completes", "[merge][join][operators]"){ } } -SCENARIO("variadic merge completes with error", "[merge][join][operators]"){ +SCENARIO("variadic merge_delay_error completes with error", "[merge][join][operators]"){ GIVEN("1 hot observable with 3 cold observables of ints."){ auto sc = rxsc::make_test(); auto w = sc.create_worker(); @@ -211,7 +211,7 @@ SCENARIO("variadic merge completes with error", "[merge][join][operators]"){ } } -SCENARIO("variadic merge completes with 2 errors", "[merge][join][operators]"){ +SCENARIO("variadic merge_delay_error completes with 2 errors", "[merge][join][operators]"){ GIVEN("1 hot observable with 3 cold observables of ints."){ auto sc = rxsc::make_test(); auto w = sc.create_worker();