Skip to content

Commit

Permalink
Changed default infinite timeout to be -1 instead of 0. Updated Condi…
Browse files Browse the repository at this point in the history
…tionVariable::waitFor() to match more closely the STL behavior.

Signed-off-by: Alexander Damian <adamian@bloomberg.net>
  • Loading branch information
Alexander Damian committed Mar 6, 2020
1 parent 7fa1f25 commit 116f953
Show file tree
Hide file tree
Showing 6 changed files with 73 additions and 36 deletions.
65 changes: 40 additions & 25 deletions quantum/impl/quantum_condition_variable_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -117,17 +117,25 @@ template <class REP, class PERIOD>
bool ConditionVariable::waitFor(Mutex& mutex,
const std::chrono::duration<REP, PERIOD>& time)
{
auto duration = time;
return waitForImpl(nullptr, mutex, duration);
if (time == std::chrono::milliseconds(-1))
{
waitImpl(nullptr, mutex);
return true;
}
return waitForImpl(nullptr, mutex, time);
}

template <class REP, class PERIOD>
bool ConditionVariable::waitFor(ICoroSync::Ptr sync,
Mutex& mutex,
const std::chrono::duration<REP, PERIOD>& time)
{
auto duration = time;
return waitForImpl(sync, mutex, duration);
if (time == std::chrono::milliseconds(-1))
{
waitImpl(sync, mutex);
return true;
}
return waitForImpl(sync, mutex, time);
}

template <class REP, class PERIOD, class PREDICATE>
Expand Down Expand Up @@ -156,7 +164,8 @@ void ConditionVariable::waitImpl(ICoroSync::Ptr sync,
Mutex::Guard lock(sync, _thisLock);
if (_destroyed)
{
return; //don't release the mutex
//don't release the mutex
return;
}
signal = 0; //clear signal flag
_waiters.push_back(&signal);
Expand All @@ -165,6 +174,7 @@ void ConditionVariable::waitImpl(ICoroSync::Ptr sync,
Mutex::ReverseGuard unlock(sync, mutex);
while ((signal == 0) && !_destroyed)
{
//wait for signal
yield(sync);
}
signal = -1; //reset
Expand All @@ -175,6 +185,7 @@ void ConditionVariable::waitImpl(ICoroSync::Ptr sync,
Mutex& mutex,
PREDICATE predicate)
{
//see: https://en.cppreference.com/w/cpp/thread/condition_variable/wait
while (!predicate() && !_destroyed)
{
waitImpl(sync, mutex);
Expand All @@ -184,18 +195,28 @@ void ConditionVariable::waitImpl(ICoroSync::Ptr sync,
template <class REP, class PERIOD>
bool ConditionVariable::waitForImpl(ICoroSync::Ptr sync,
Mutex& mutex,
std::chrono::duration<REP, PERIOD>& time)
const std::chrono::duration<REP, PERIOD>& time)
{
if (time < std::chrono::milliseconds::zero())
{
//invalid time setting
throw std::invalid_argument("Timeout cannot be negative");
}
std::atomic_int& signal = sync ? sync->signal() : s_threadSignal;
{//========= LOCKED SCOPE =========
Mutex::Guard lock(sync, _thisLock);
if (_destroyed)
{
return true; //don't release the mutex
//don't release the mutex
return true;
}
if (time == std::chrono::duration<REP, PERIOD>::zero())
{
return false; //timeout
//immediate timeout.
//reset flag if necessary and return w/o releasing mutex.
int expected = 1;
signal.compare_exchange_strong(expected, -1);
return (expected==1);
}
signal = 0; //clear signal flag
_waiters.push_back(&signal);
Expand All @@ -204,26 +225,22 @@ bool ConditionVariable::waitForImpl(ICoroSync::Ptr sync,
Mutex::ReverseGuard unlock(sync, mutex);
auto start = std::chrono::steady_clock::now();
auto elapsed = std::chrono::duration<REP, PERIOD>::zero();
bool timeout = false;

//wait until signalled or times out
while ((signal == 0) && !_destroyed)
{
//wait for signal
yield(sync);
elapsed = std::chrono::duration_cast<std::chrono::duration<REP, PERIOD>>
(std::chrono::steady_clock::now() - start);
if (elapsed >= time)
{
timeout = true;
break; //expired time
break; //expired
}
}

bool rc = (signal == 1);
signal = -1; //reset signal flag

//adjust duration or set to zero if nothing remains
time = timeout ? std::chrono::duration<REP, PERIOD>::zero() : time - elapsed;
return !timeout;
return rc;
}

template <class REP, class PERIOD, class PREDICATE>
Expand All @@ -232,18 +249,16 @@ bool ConditionVariable::waitForImpl(ICoroSync::Ptr sync,
const std::chrono::duration<REP, PERIOD>& time,
PREDICATE predicate)
{
if (time > std::chrono::duration<REP, PERIOD>(0)) {
auto duration = time;
while (!predicate() && !_destroyed)
//see: https://en.cppreference.com/w/cpp/thread/condition_variable/wait_until
while (!predicate() && !_destroyed)
{
if (!waitForImpl(sync, mutex, time))
{
if (!waitForImpl(sync, mutex, duration))
{
//timeout
return predicate();
}
//timeout
return predicate();
}
}
return true; //duration has not yet expired
return true;
}

}}
14 changes: 12 additions & 2 deletions quantum/impl/quantum_dispatcher_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ Dispatcher::Dispatcher(const Configuration& config) :
inline
Dispatcher::~Dispatcher()
{
drain(std::chrono::milliseconds::zero(), true);
drain(std::chrono::milliseconds(-1), true);
terminate();
}

Expand Down Expand Up @@ -356,6 +356,16 @@ void Dispatcher::drain(std::chrono::milliseconds timeout,
bool isFinal)
{
DrainGuard guard(_drain, !isFinal);
if ((timeout < std::chrono::milliseconds::zero()) &&
(timeout != std::chrono::milliseconds(-1)))
{
throw std::invalid_argument("Timeout cannot be negative");
}

if (timeout == std::chrono::milliseconds::zero())
{
return; //skip draining
}

auto start = std::chrono::steady_clock::now();

Expand All @@ -366,7 +376,7 @@ void Dispatcher::drain(std::chrono::milliseconds timeout,
yield();

//check remaining time
if (timeout != std::chrono::milliseconds::zero())
if (timeout != std::chrono::milliseconds(-1))
{
auto present = std::chrono::steady_clock::now();
if (std::chrono::duration_cast<std::chrono::milliseconds>(present-start) > timeout)
Expand Down
18 changes: 15 additions & 3 deletions quantum/quantum_condition_variable.h
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ class ConditionVariable
/// @tparam PERIOD A std::ratio representing the tick period such as ticks per second.
/// @param[in] mutex Mutex object which is locked by the current coroutine.
/// @param[in] time Maximum duration for which to wait on this condition.
/// Using -1 is equivalent to calling wait(). Other negative values will throw.
/// @return True if the mutex was acquired before 'time' expired or false otherwise.
/// @note This function should be called from a regular thread not from a coroutine.
template <class REP, class PERIOD>
Expand All @@ -145,6 +146,7 @@ class ConditionVariable
/// @param[in] sync Pointer to a coroutine synchronization object.
/// @param[in] mutex Mutex object which is locked by the current coroutine.
/// @param[in] time Maximum duration for which to wait on this condition.
/// Using -1 is equivalent to calling wait(). Other negative values will throw.
/// @return True if the mutex was acquired before 'time' expired or false otherwise.
/// @note This function should be called from a coroutine.
template <class REP, class PERIOD>
Expand All @@ -160,14 +162,19 @@ class ConditionVariable
/// @code
/// while(!predicate())
/// {
/// waitFor(mutex, time);
/// if (!waitFor(mutex, time))
/// {
/// return predicate();
/// }
/// }
/// return true;
/// @endcode
/// @tparam REP An arithmetic type such as int or double representing the number of ticks.
/// @tparam PERIOD A std::ratio representing the tick period such as ticks per second.
/// @tparam PREDICATE Callable function type having the following signature 'bool f()'.
/// @param[in] mutex Mutex object which is locked by the current coroutine.
/// @param[in] time Maximum duration for which to wait on this condition.
/// Using -1 is equivalent to calling wait(). Other negative values will throw.
/// @param[in] predicate Function or functor to be tested as exit condition of the endless while loop.
/// @return True if the mutex was acquired before 'time' expired, otherwise the predicate result after timeout.
/// @note This function should be called from a regular thread not from a coroutine.
Expand All @@ -184,15 +191,20 @@ class ConditionVariable
/// @code
/// while(!predicate())
/// {
/// waitFor(mutex, time);
/// if (!waitFor(mutex, time))
/// {
/// return predicate();
/// }
/// }
/// return true;
/// @endcode
/// @tparam REP An arithmetic type such as int or double representing the number of ticks.
/// @tparam PERIOD A std::ratio representing the tick period such as ticks per second.
/// @tparam PREDICATE Callable function type having the following signature 'bool f()'.
/// @param[in] sync Pointer to a coroutine synchronization object.
/// @param[in] mutex Mutex object which is locked by the current coroutine.
/// @param[in] time Maximum duration for which to wait on this condition.
/// Using -1 is equivalent to calling wait(). Other negative values will throw.
/// @param[in] predicate Function or functor to be tested as exit condition of the endless while loop.
/// @return True if the mutex was acquired before 'time' expired, otherwise the predicate result after timeout.
/// @note This function should be called from a coroutine.
Expand All @@ -218,7 +230,7 @@ class ConditionVariable
template <class REP, class PERIOD>
bool waitForImpl(ICoroSync::Ptr sync,
Mutex& mutex,
std::chrono::duration<REP, PERIOD>& time);
const std::chrono::duration<REP, PERIOD>& time);

template <class REP, class PERIOD, class PREDICATE = bool()>
bool waitForImpl(ICoroSync::Ptr sync,
Expand Down
4 changes: 2 additions & 2 deletions quantum/quantum_dispatcher.h
Original file line number Diff line number Diff line change
Expand Up @@ -328,11 +328,11 @@ class Dispatcher : public ITerminate
int queueId = (int)IQueue::QueueId::All) const;

/// @brief Drains all queues on this dispatcher object.
/// @param[in] timeout Maximum time for this function to wait. Set to 0 to wait indefinitely until all queues drain.
/// @param[in] timeout Maximum time for this function to wait. Set to -1 to wait indefinitely until all queues drain.
/// @param[in] isFinal If set to true, the dispatcher will not allow any more processing after the drain completes.
/// @note This function blocks until all coroutines and IO tasks have completed. During this time, posting
/// of new tasks is disabled unless they are posted from within an already executing coroutine.
void drain(std::chrono::milliseconds timeout = std::chrono::milliseconds::zero(),
void drain(std::chrono::milliseconds timeout = std::chrono::milliseconds(-1),
bool isFinal = false);

/// @brief Returns the number of underlying coroutine threads as specified in the constructor. If -1 was passed
Expand Down
4 changes: 2 additions & 2 deletions quantum/util/impl/quantum_sequencer_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -535,7 +535,7 @@ Sequencer<SequenceKey, Hash, KeyEqual, Allocator>::canTrimContext(const ICoroCon
const ICoroContextBasePtr& ctxToValidate)
{
return !ctxToValidate || !ctxToValidate->valid() ||
ctxToValidate->waitFor(ctx, std::chrono::milliseconds(0)) == std::future_status::ready;
ctxToValidate->waitFor(ctx, std::chrono::milliseconds::zero()) == std::future_status::ready;
}

template <class SequenceKey, class Hash, class KeyEqual, class Allocator>
Expand All @@ -544,7 +544,7 @@ Sequencer<SequenceKey, Hash, KeyEqual, Allocator>::isPendingContext(const ICoroC
const ICoroContextBasePtr& ctxToValidate)
{
return ctxToValidate && ctxToValidate->valid() &&
ctxToValidate->waitFor(ctx, std::chrono::milliseconds(0)) == std::future_status::timeout;
ctxToValidate->waitFor(ctx, std::chrono::milliseconds::zero()) == std::future_status::timeout;
}

template <class SequenceKey, class Hash, class KeyEqual, class Allocator>
Expand Down
4 changes: 2 additions & 2 deletions quantum/util/quantum_sequencer.h
Original file line number Diff line number Diff line change
Expand Up @@ -228,13 +228,13 @@ class Sequencer
SequenceKeyStatistics getTaskStatistics();

/// @brief Drains all sequenced tasks.
/// @param[in] timeout Maximum time for this function to wait. Set to 0 to wait indefinitely until all sequences drain.
/// @param[in] timeout Maximum time for this function to wait. Set to -1 to wait indefinitely until all sequences drain.
/// @param[in] isFinal If set to true, the sequencer will not allow any more processing after the drain completes.
/// @note This function blocks until all sequences have completed. During this time, posting
/// of new tasks is disabled unless they are posted from within an already executing coroutine.
/// Since this function posts a task which will wait on all others, getStatistics().getPostedTaskCount()
/// will contain one extra count.
void drain(std::chrono::milliseconds timeout = std::chrono::milliseconds::zero(),
void drain(std::chrono::milliseconds timeout = std::chrono::milliseconds(-1),
bool isFinal = false);

private:
Expand Down

0 comments on commit 116f953

Please sign in to comment.