diff --git a/quantum/impl/quantum_contiguous_pool_manager_impl.h b/quantum/impl/quantum_contiguous_pool_manager_impl.h index 37e6fcb..3029868 100644 --- a/quantum/impl/quantum_contiguous_pool_manager_impl.h +++ b/quantum/impl/quantum_contiguous_pool_manager_impl.h @@ -134,6 +134,7 @@ template typename ContiguousPoolManager::pointer ContiguousPoolManager::allocate(size_type n, const_pointer) { + assert(bufferStart()); { SpinLock::Guard lock(_control->_spinlock); if (findContiguous(static_cast(n))) @@ -153,6 +154,7 @@ void ContiguousPoolManager::deallocate(pointer p, size_type n) if (p == nullptr) { return; } + assert(bufferStart()); if (isManaged(p)) { //find index of the block and return the individual blocks to the free pool SpinLock::Guard lock(_control->_spinlock); @@ -235,7 +237,7 @@ typename ContiguousPoolManager::pointer ContiguousPoolManager::bufferEnd() template bool ContiguousPoolManager::isManaged(pointer p) { - return !bufferStart() || (bufferStart() && (bufferStart() <= p) && (p < bufferEnd())); + return (bufferStart() <= p) && (p < bufferEnd()); } template diff --git a/quantum/impl/quantum_coroutine_pool_allocator_impl.h b/quantum/impl/quantum_coroutine_pool_allocator_impl.h index 7290f58..701f1fa 100644 --- a/quantum/impl/quantum_coroutine_pool_allocator_impl.h +++ b/quantum/impl/quantum_coroutine_pool_allocator_impl.h @@ -127,10 +127,12 @@ void CoroutinePoolAllocator::deallocate(const boost::context::stac #if defined(BOOST_USE_VALGRIND) VALGRIND_STACK_DEREGISTER(ctx.valgrind_stack_id); #endif + int bi = blockIndex(ctx); + assert(bi >= -1 && bi < _size); //guard against coroutine stack overflow or corruption if (isManaged(ctx)) { //find index of the block SpinLock::Guard lock(_spinlock); - _freeBlocks[++_freeBlockIndex] = blockIndex(ctx); + _freeBlocks[++_freeBlockIndex] = bi; } else { delete[] (char*)getHeader(ctx); diff --git a/quantum/impl/quantum_task_queue_impl.h b/quantum/impl/quantum_task_queue_impl.h index 95f8293..c1e62b9 100644 --- a/quantum/impl/quantum_task_queue_impl.h +++ b/quantum/impl/quantum_task_queue_impl.h @@ -263,6 +263,8 @@ inline void TaskQueue::doEnqueue(ITask::Ptr task) { //NOTE: _queueIt remains unchanged following this operation + _stats.incPostedCount(); + _stats.incNumElements(); bool isEmpty = _waitQueue.empty(); if (task->isHighPriority()) { @@ -280,8 +282,6 @@ void TaskQueue::doEnqueue(ITask::Ptr task) { _stats.incHighPriorityCount(); } - _stats.incPostedCount(); - _stats.incNumElements(); if (isEmpty) { //signal on transition from 0 to 1 element only @@ -302,12 +302,11 @@ ITask::Ptr TaskQueue::tryDequeue(std::atomic_bool& hint) } inline -ITask::Ptr TaskQueue::doDequeue(std::atomic_bool& hint, TaskListIter iter) +ITask::Ptr TaskQueue::doDequeue(std::atomic_bool&, TaskListIter iter) { //========================= LOCKED SCOPE ========================= SpinLock::Guard lock(_runQueueLock); - hint = (iter == _runQueue.end()); - if (hint) + if (iter == _runQueue.end()) { return nullptr; } @@ -335,13 +334,13 @@ ITask::Ptr TaskQueue::doDequeue(std::atomic_bool& hint, TaskListIter iter) inline size_t TaskQueue::size() const { - return _stats.numElements(); + return _isIdle ? _stats.numElements() : _stats.numElements() + 1; } inline bool TaskQueue::empty() const { - return _stats.numElements() == 0; + return size() == 0; } inline @@ -464,9 +463,6 @@ inline bool TaskQueue::handleSuccess(const WorkItem& workItem) { ITaskContinuation::Ptr nextTask; - //Coroutine ended normally with "return 0" statement - _stats.incCompletedCount(); - //check if there's another task scheduled to run after this one nextTask = workItem._task->getNextTask(); if (nextTask && (nextTask->getType() == ITask::Type::ErrorHandler)) @@ -478,6 +474,8 @@ bool TaskQueue::handleSuccess(const WorkItem& workItem) //queue next task and de-queue current one enqueue(nextTask); doDequeue(_isIdle, workItem._iter); + //Coroutine ended normally with "return 0" statement + _stats.incCompletedCount(); return true; } @@ -485,13 +483,13 @@ inline bool TaskQueue::handleError(const WorkItem& workItem) { ITaskContinuation::Ptr nextTask; - //Coroutine ended with explicit user error - _stats.incErrorCount(); //Check if we have a final task to run nextTask = workItem._task->getErrorHandlerOrFinalTask(); //queue next task and de-queue current one enqueue(nextTask); doDequeue(_isIdle, workItem._iter); + //Coroutine ended with explicit user error + _stats.incErrorCount(); #ifdef __QUANTUM_PRINT_DEBUG std::lock_guard guard(Util::LogMutex()); if (rc == (int)ITask::RetCode::Exception) @@ -547,11 +545,11 @@ TaskQueue::grabWorkItem() acquireWaiting(); } _isAdvanced = false; //reset flag + _isIdle = _runQueue.empty(); if (_runQueue.empty()) { return WorkItem(nullptr, _runQueue.end(), _isBlocked, _queueRound); } - return WorkItem((*_queueIt), _queueIt, false, 0); } diff --git a/quantum/quantum_contiguous_pool_manager.h b/quantum/quantum_contiguous_pool_manager.h index 8ffbd09..fe98c74 100644 --- a/quantum/quantum_contiguous_pool_manager.h +++ b/quantum/quantum_contiguous_pool_manager.h @@ -80,7 +80,7 @@ struct ContiguousPoolManager static ContiguousPoolManager select_on_container_copy_construction(const ContiguousPoolManager& other) { - return ContiguousPoolManager(other.size()); + return ContiguousPoolManager(other); } bool operator==(const this_type& other) const { return _control && other._control && (_control->_buffer == other._control->_buffer); diff --git a/tests/quantum_fixture.h b/tests/quantum_fixture.h index 26cd93e..a9fd0ab 100644 --- a/tests/quantum_fixture.h +++ b/tests/quantum_fixture.h @@ -106,6 +106,7 @@ class DispatcherFixture : public ::testing::TestWithParam void SetUp() { _dispatcher = &DispatcherSingleton::instance(GetParam()); + //Don't drain in the TearDown() because of the final CleanupTest::DeleteDispatcherInstance() _dispatcher->drain(); _dispatcher->resetStats(); } diff --git a/tests/quantum_tests.cpp b/tests/quantum_tests.cpp index b444812..02850f7 100644 --- a/tests/quantum_tests.cpp +++ b/tests/quantum_tests.cpp @@ -1488,7 +1488,7 @@ TEST(SharedQueueTest, PerformanceTest1) { const TestConfiguration noCoroSharingConfig(false, false); quantum::Dispatcher& dispatcher = DispatcherSingleton::instance(noCoroSharingConfig); - + dispatcher.drain(); auto start = std::chrono::steady_clock::now(); enqueue_sleep_tasks(dispatcher, sleepTimes); dispatcher.drain(); @@ -1499,7 +1499,7 @@ TEST(SharedQueueTest, PerformanceTest1) { const TestConfiguration coroSharingConfig(false, true); quantum::Dispatcher& dispatcher = DispatcherSingleton::instance(coroSharingConfig); - + dispatcher.drain(); auto start = std::chrono::steady_clock::now(); enqueue_sleep_tasks(dispatcher, sleepTimes); dispatcher.drain();