From 116f953ada3b0925fff38c7262ffc8b34c2fd7a7 Mon Sep 17 00:00:00 2001 From: Alexander Damian Date: Fri, 6 Mar 2020 16:04:44 -0500 Subject: [PATCH] Changed default infinite timeout to be -1 instead of 0. Updated ConditionVariable::waitFor() to match more closely the STL behavior. Signed-off-by: Alexander Damian --- .../impl/quantum_condition_variable_impl.h | 65 ++++++++++++------- quantum/impl/quantum_dispatcher_impl.h | 14 +++- quantum/quantum_condition_variable.h | 18 ++++- quantum/quantum_dispatcher.h | 4 +- quantum/util/impl/quantum_sequencer_impl.h | 4 +- quantum/util/quantum_sequencer.h | 4 +- 6 files changed, 73 insertions(+), 36 deletions(-) diff --git a/quantum/impl/quantum_condition_variable_impl.h b/quantum/impl/quantum_condition_variable_impl.h index 5bcb735..10eb03d 100644 --- a/quantum/impl/quantum_condition_variable_impl.h +++ b/quantum/impl/quantum_condition_variable_impl.h @@ -117,8 +117,12 @@ template bool ConditionVariable::waitFor(Mutex& mutex, const std::chrono::duration& time) { - auto duration = time; - return waitForImpl(nullptr, mutex, duration); + if (time == std::chrono::milliseconds(-1)) + { + waitImpl(nullptr, mutex); + return true; + } + return waitForImpl(nullptr, mutex, time); } template @@ -126,8 +130,12 @@ bool ConditionVariable::waitFor(ICoroSync::Ptr sync, Mutex& mutex, const std::chrono::duration& time) { - auto duration = time; - return waitForImpl(sync, mutex, duration); + if (time == std::chrono::milliseconds(-1)) + { + waitImpl(sync, mutex); + return true; + } + return waitForImpl(sync, mutex, time); } template @@ -156,7 +164,8 @@ void ConditionVariable::waitImpl(ICoroSync::Ptr sync, Mutex::Guard lock(sync, _thisLock); if (_destroyed) { - return; //don't release the mutex + //don't release the mutex + return; } signal = 0; //clear signal flag _waiters.push_back(&signal); @@ -165,6 +174,7 @@ void ConditionVariable::waitImpl(ICoroSync::Ptr sync, Mutex::ReverseGuard unlock(sync, mutex); while ((signal == 0) && !_destroyed) { + //wait for signal yield(sync); } signal = -1; //reset @@ -175,6 +185,7 @@ void ConditionVariable::waitImpl(ICoroSync::Ptr sync, Mutex& mutex, PREDICATE predicate) { + //see: https://en.cppreference.com/w/cpp/thread/condition_variable/wait while (!predicate() && !_destroyed) { waitImpl(sync, mutex); @@ -184,18 +195,28 @@ void ConditionVariable::waitImpl(ICoroSync::Ptr sync, template bool ConditionVariable::waitForImpl(ICoroSync::Ptr sync, Mutex& mutex, - std::chrono::duration& time) + const std::chrono::duration& time) { + if (time < std::chrono::milliseconds::zero()) + { + //invalid time setting + throw std::invalid_argument("Timeout cannot be negative"); + } std::atomic_int& signal = sync ? sync->signal() : s_threadSignal; {//========= LOCKED SCOPE ========= Mutex::Guard lock(sync, _thisLock); if (_destroyed) { - return true; //don't release the mutex + //don't release the mutex + return true; } if (time == std::chrono::duration::zero()) { - return false; //timeout + //immediate timeout. + //reset flag if necessary and return w/o releasing mutex. + int expected = 1; + signal.compare_exchange_strong(expected, -1); + return (expected==1); } signal = 0; //clear signal flag _waiters.push_back(&signal); @@ -204,26 +225,22 @@ bool ConditionVariable::waitForImpl(ICoroSync::Ptr sync, Mutex::ReverseGuard unlock(sync, mutex); auto start = std::chrono::steady_clock::now(); auto elapsed = std::chrono::duration::zero(); - bool timeout = false; //wait until signalled or times out while ((signal == 0) && !_destroyed) { + //wait for signal yield(sync); elapsed = std::chrono::duration_cast> (std::chrono::steady_clock::now() - start); if (elapsed >= time) { - timeout = true; - break; //expired time + break; //expired } } - + bool rc = (signal == 1); signal = -1; //reset signal flag - - //adjust duration or set to zero if nothing remains - time = timeout ? std::chrono::duration::zero() : time - elapsed; - return !timeout; + return rc; } template @@ -232,18 +249,16 @@ bool ConditionVariable::waitForImpl(ICoroSync::Ptr sync, const std::chrono::duration& time, PREDICATE predicate) { - if (time > std::chrono::duration(0)) { - auto duration = time; - while (!predicate() && !_destroyed) + //see: https://en.cppreference.com/w/cpp/thread/condition_variable/wait_until + while (!predicate() && !_destroyed) + { + if (!waitForImpl(sync, mutex, time)) { - if (!waitForImpl(sync, mutex, duration)) - { - //timeout - return predicate(); - } + //timeout + return predicate(); } } - return true; //duration has not yet expired + return true; } }} diff --git a/quantum/impl/quantum_dispatcher_impl.h b/quantum/impl/quantum_dispatcher_impl.h index d75fc9e..c8d393f 100644 --- a/quantum/impl/quantum_dispatcher_impl.h +++ b/quantum/impl/quantum_dispatcher_impl.h @@ -34,7 +34,7 @@ Dispatcher::Dispatcher(const Configuration& config) : inline Dispatcher::~Dispatcher() { - drain(std::chrono::milliseconds::zero(), true); + drain(std::chrono::milliseconds(-1), true); terminate(); } @@ -356,6 +356,16 @@ void Dispatcher::drain(std::chrono::milliseconds timeout, bool isFinal) { DrainGuard guard(_drain, !isFinal); + if ((timeout < std::chrono::milliseconds::zero()) && + (timeout != std::chrono::milliseconds(-1))) + { + throw std::invalid_argument("Timeout cannot be negative"); + } + + if (timeout == std::chrono::milliseconds::zero()) + { + return; //skip draining + } auto start = std::chrono::steady_clock::now(); @@ -366,7 +376,7 @@ void Dispatcher::drain(std::chrono::milliseconds timeout, yield(); //check remaining time - if (timeout != std::chrono::milliseconds::zero()) + if (timeout != std::chrono::milliseconds(-1)) { auto present = std::chrono::steady_clock::now(); if (std::chrono::duration_cast(present-start) > timeout) diff --git a/quantum/quantum_condition_variable.h b/quantum/quantum_condition_variable.h index c19a83c..a715629 100644 --- a/quantum/quantum_condition_variable.h +++ b/quantum/quantum_condition_variable.h @@ -131,6 +131,7 @@ class ConditionVariable /// @tparam PERIOD A std::ratio representing the tick period such as ticks per second. /// @param[in] mutex Mutex object which is locked by the current coroutine. /// @param[in] time Maximum duration for which to wait on this condition. + /// Using -1 is equivalent to calling wait(). Other negative values will throw. /// @return True if the mutex was acquired before 'time' expired or false otherwise. /// @note This function should be called from a regular thread not from a coroutine. template @@ -145,6 +146,7 @@ class ConditionVariable /// @param[in] sync Pointer to a coroutine synchronization object. /// @param[in] mutex Mutex object which is locked by the current coroutine. /// @param[in] time Maximum duration for which to wait on this condition. + /// Using -1 is equivalent to calling wait(). Other negative values will throw. /// @return True if the mutex was acquired before 'time' expired or false otherwise. /// @note This function should be called from a coroutine. template @@ -160,14 +162,19 @@ class ConditionVariable /// @code /// while(!predicate()) /// { - /// waitFor(mutex, time); + /// if (!waitFor(mutex, time)) + /// { + /// return predicate(); + /// } /// } + /// return true; /// @endcode /// @tparam REP An arithmetic type such as int or double representing the number of ticks. /// @tparam PERIOD A std::ratio representing the tick period such as ticks per second. /// @tparam PREDICATE Callable function type having the following signature 'bool f()'. /// @param[in] mutex Mutex object which is locked by the current coroutine. /// @param[in] time Maximum duration for which to wait on this condition. + /// Using -1 is equivalent to calling wait(). Other negative values will throw. /// @param[in] predicate Function or functor to be tested as exit condition of the endless while loop. /// @return True if the mutex was acquired before 'time' expired, otherwise the predicate result after timeout. /// @note This function should be called from a regular thread not from a coroutine. @@ -184,8 +191,12 @@ class ConditionVariable /// @code /// while(!predicate()) /// { - /// waitFor(mutex, time); + /// if (!waitFor(mutex, time)) + /// { + /// return predicate(); + /// } /// } + /// return true; /// @endcode /// @tparam REP An arithmetic type such as int or double representing the number of ticks. /// @tparam PERIOD A std::ratio representing the tick period such as ticks per second. @@ -193,6 +204,7 @@ class ConditionVariable /// @param[in] sync Pointer to a coroutine synchronization object. /// @param[in] mutex Mutex object which is locked by the current coroutine. /// @param[in] time Maximum duration for which to wait on this condition. + /// Using -1 is equivalent to calling wait(). Other negative values will throw. /// @param[in] predicate Function or functor to be tested as exit condition of the endless while loop. /// @return True if the mutex was acquired before 'time' expired, otherwise the predicate result after timeout. /// @note This function should be called from a coroutine. @@ -218,7 +230,7 @@ class ConditionVariable template bool waitForImpl(ICoroSync::Ptr sync, Mutex& mutex, - std::chrono::duration& time); + const std::chrono::duration& time); template bool waitForImpl(ICoroSync::Ptr sync, diff --git a/quantum/quantum_dispatcher.h b/quantum/quantum_dispatcher.h index ddf91a1..bef8596 100644 --- a/quantum/quantum_dispatcher.h +++ b/quantum/quantum_dispatcher.h @@ -328,11 +328,11 @@ class Dispatcher : public ITerminate int queueId = (int)IQueue::QueueId::All) const; /// @brief Drains all queues on this dispatcher object. - /// @param[in] timeout Maximum time for this function to wait. Set to 0 to wait indefinitely until all queues drain. + /// @param[in] timeout Maximum time for this function to wait. Set to -1 to wait indefinitely until all queues drain. /// @param[in] isFinal If set to true, the dispatcher will not allow any more processing after the drain completes. /// @note This function blocks until all coroutines and IO tasks have completed. During this time, posting /// of new tasks is disabled unless they are posted from within an already executing coroutine. - void drain(std::chrono::milliseconds timeout = std::chrono::milliseconds::zero(), + void drain(std::chrono::milliseconds timeout = std::chrono::milliseconds(-1), bool isFinal = false); /// @brief Returns the number of underlying coroutine threads as specified in the constructor. If -1 was passed diff --git a/quantum/util/impl/quantum_sequencer_impl.h b/quantum/util/impl/quantum_sequencer_impl.h index a71fc7e..4bad1ef 100644 --- a/quantum/util/impl/quantum_sequencer_impl.h +++ b/quantum/util/impl/quantum_sequencer_impl.h @@ -535,7 +535,7 @@ Sequencer::canTrimContext(const ICoroCon const ICoroContextBasePtr& ctxToValidate) { return !ctxToValidate || !ctxToValidate->valid() || - ctxToValidate->waitFor(ctx, std::chrono::milliseconds(0)) == std::future_status::ready; + ctxToValidate->waitFor(ctx, std::chrono::milliseconds::zero()) == std::future_status::ready; } template @@ -544,7 +544,7 @@ Sequencer::isPendingContext(const ICoroC const ICoroContextBasePtr& ctxToValidate) { return ctxToValidate && ctxToValidate->valid() && - ctxToValidate->waitFor(ctx, std::chrono::milliseconds(0)) == std::future_status::timeout; + ctxToValidate->waitFor(ctx, std::chrono::milliseconds::zero()) == std::future_status::timeout; } template diff --git a/quantum/util/quantum_sequencer.h b/quantum/util/quantum_sequencer.h index 71ae038..912d46c 100644 --- a/quantum/util/quantum_sequencer.h +++ b/quantum/util/quantum_sequencer.h @@ -228,13 +228,13 @@ class Sequencer SequenceKeyStatistics getTaskStatistics(); /// @brief Drains all sequenced tasks. - /// @param[in] timeout Maximum time for this function to wait. Set to 0 to wait indefinitely until all sequences drain. + /// @param[in] timeout Maximum time for this function to wait. Set to -1 to wait indefinitely until all sequences drain. /// @param[in] isFinal If set to true, the sequencer will not allow any more processing after the drain completes. /// @note This function blocks until all sequences have completed. During this time, posting /// of new tasks is disabled unless they are posted from within an already executing coroutine. /// Since this function posts a task which will wait on all others, getStatistics().getPostedTaskCount() /// will contain one extra count. - void drain(std::chrono::milliseconds timeout = std::chrono::milliseconds::zero(), + void drain(std::chrono::milliseconds timeout = std::chrono::milliseconds(-1), bool isFinal = false); private: