diff --git a/quantum/quantum_dispatcher.h b/quantum/quantum_dispatcher.h index 6a2d1b9..a338435 100644 --- a/quantum/quantum_dispatcher.h +++ b/quantum/quantum_dispatcher.h @@ -19,6 +19,7 @@ #include #include #include +#include #include #include @@ -382,26 +383,6 @@ class Dispatcher : public ITerminate ThreadFuturePtr postAsyncIoImpl2(int queueId, bool isHighPriority, FUNC&& func, ARGS&&... args); - struct DrainGuard - { - DrainGuard(std::atomic_bool& drain, - bool reactivate = true) : - _drain(drain), - _reactivate(reactivate) - { - _drain = true; - } - ~DrainGuard() - { - if (_reactivate) - { - _drain = false; - } - } - std::atomic_bool& _drain; - bool _reactivate; - }; - //Members DispatcherCore _dispatcher; std::atomic_bool _drain; diff --git a/quantum/util/impl/quantum_sequencer_impl.h b/quantum/util/impl/quantum_sequencer_impl.h index b62558a..ee5b3ef 100644 --- a/quantum/util/impl/quantum_sequencer_impl.h +++ b/quantum/util/impl/quantum_sequencer_impl.h @@ -19,6 +19,8 @@ //#################################### IMPLEMENTATIONS ######################################### //############################################################################################## +#include +#include #include namespace Bloomberg { @@ -28,6 +30,7 @@ template Sequencer::Sequencer(Dispatcher& dispatcher, const typename Sequencer::Configuration& configuration) : _dispatcher(dispatcher), + _drain(false), _controllerQueueId(configuration.getControlQueueId()), _universalContext(), _contexts(configuration.getBucketCount(), @@ -51,6 +54,10 @@ Sequencer::enqueue( FUNC&& func, ARGS&&... args) { + if (_drain) + { + throw std::runtime_error("Sequencer is disabled"); + } _dispatcher.post2(_controllerQueueId, false, singleSequenceKeyTaskScheduler, @@ -74,11 +81,14 @@ Sequencer::enqueue( FUNC&& func, ARGS&&... args) { + if (_drain) + { + throw std::runtime_error("Sequencer is disabled"); + } if (queueId < (int)IQueue::QueueId::Any) { throw std::runtime_error("Invalid IO queue id"); } - _dispatcher.post2(_controllerQueueId, false, singleSequenceKeyTaskScheduler, @@ -99,6 +109,10 @@ Sequencer::enqueue( FUNC&& func, ARGS&&... args) { + if (_drain) + { + throw std::runtime_error("Sequencer is disabled"); + } _dispatcher.post2(_controllerQueueId, false, multiSequenceKeyTaskScheduler, @@ -122,6 +136,10 @@ Sequencer::enqueue( FUNC&& func, ARGS&&... args) { + if (_drain) + { + throw std::runtime_error("Sequencer is disabled"); + } if (queueId < (int)IQueue::QueueId::Any) { throw std::runtime_error("Invalid IO queue id"); @@ -143,6 +161,10 @@ template void Sequencer::enqueueAll(FUNC&& func, ARGS&&... args) { + if (_drain) + { + throw std::runtime_error("Sequencer is disabled"); + } _dispatcher.post2(_controllerQueueId, false, universalTaskScheduler, @@ -164,6 +186,10 @@ Sequencer::enqueueAll( FUNC&& func, ARGS&&... args) { + if (_drain) + { + throw std::runtime_error("Sequencer is disabled"); + } if (queueId < (int)IQueue::QueueId::Any) { throw std::runtime_error("Invalid IO queue id"); @@ -260,11 +286,11 @@ Sequencer::waitForTwoDependents( { universalDependent._context->wait(ctx); } + int rc = callPosted(ctx, opaque, sequencer, std::forward(func), std::forward(args)...); // update task stats dependent._stats->decrementPendingTaskCount(); sequencer._taskStats->decrementPendingTaskCount(); - callPosted(ctx, opaque, sequencer, std::forward(func), std::forward(args)...); - return 0; + return rc; } template @@ -292,14 +318,14 @@ Sequencer::waitForDependents( { universalDependent._context->wait(ctx); } - //update stats + int rc = callPosted(ctx, opaque, sequencer, std::forward(func), std::forward(args)...); + // update task stats for (const auto& dependent : dependents) { dependent._stats->decrementPendingTaskCount(); } - // update task stats sequencer._taskStats->decrementPendingTaskCount(); - return callPosted(ctx, opaque, sequencer, std::forward(func), std::forward(args)...); + return rc; } template @@ -327,10 +353,11 @@ Sequencer::waitForUniversalDependent( { universalDependent._context->wait(ctx); } - universalDependent._stats->decrementPendingTaskCount(); + int rc = callPosted(ctx, opaque, sequencer, std::forward(func), std::forward(args)...); // update task stats + universalDependent._stats->decrementPendingTaskCount(); sequencer._taskStats->decrementPendingTaskCount(); - return callPosted(ctx, opaque, sequencer, std::forward(func), std::forward(args)...); + return rc; } template @@ -520,5 +547,29 @@ Sequencer::isPendingContext(const ICoroC ctxToValidate->waitFor(ctx, std::chrono::milliseconds(0)) == std::future_status::timeout; } +template +void +Sequencer::drain(std::chrono::milliseconds timeout, + bool isFinal) +{ + std::shared_ptr> promise = std::make_shared>(); + ThreadFuturePtr future = promise->getIThreadFuture(); + + //enqueue a universal task and wait + enqueueAll([promise](VoidContextPtr)->int{ + return promise->set(0); + }); + + DrainGuard guard(_drain, !isFinal); + if (timeout == std::chrono::milliseconds::zero()) + { + future->wait(); + } + else + { + future->waitFor(timeout); + } +} + }} diff --git a/quantum/util/quantum_drain_guard.h b/quantum/util/quantum_drain_guard.h new file mode 100644 index 0000000..ba20afe --- /dev/null +++ b/quantum/util/quantum_drain_guard.h @@ -0,0 +1,46 @@ +/* +** Copyright 2018 Bloomberg Finance L.P. +** +** Licensed under the Apache License, Version 2.0 (the "License"); +** you may not use this file except in compliance with the License. +** You may obtain a copy of the License at +** +** http://www.apache.org/licenses/LICENSE-2.0 +** +** Unless required by applicable law or agreed to in writing, software +** distributed under the License is distributed on an "AS IS" BASIS, +** WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +** See the License for the specific language governing permissions and +** limitations under the License. +*/ +#ifndef BLOOMBERG_QUANTUM_DRAIN_GUARD_H +#define BLOOMBERG_QUANTUM_DRAIN_GUARD_H + +namespace Bloomberg { +namespace quantum { + +struct DrainGuard +{ + DrainGuard(std::atomic_bool& drain, + bool reactivate = true) : + _drain(drain), + _reactivate(reactivate) + { + _drain = true; + } + + ~DrainGuard() + { + if (_reactivate) + { + _drain = false; + } + } + std::atomic_bool& _drain; + bool _reactivate; +}; + +} +} + +#endif //BLOOMBERG_QUANTUM_DRAIN_GUARD_H diff --git a/quantum/util/quantum_sequencer.h b/quantum/util/quantum_sequencer.h index 153648b..71ae038 100644 --- a/quantum/util/quantum_sequencer.h +++ b/quantum/util/quantum_sequencer.h @@ -226,6 +226,16 @@ class Sequencer /// @note The difference with the previous two statistics methods is that it aggregates stats on a per-task basis, /// not on per-key basis. SequenceKeyStatistics getTaskStatistics(); + + /// @brief Drains all sequenced tasks. + /// @param[in] timeout Maximum time for this function to wait. Set to 0 to wait indefinitely until all sequences drain. + /// @param[in] isFinal If set to true, the sequencer will not allow any more processing after the drain completes. + /// @note This function blocks until all sequences have completed. During this time, posting + /// of new tasks is disabled unless they are posted from within an already executing coroutine. + /// Since this function posts a task which will wait on all others, getStatistics().getPostedTaskCount() + /// will contain one extra count. + void drain(std::chrono::milliseconds timeout = std::chrono::milliseconds::zero(), + bool isFinal = false); private: using ContextMap = std::unordered_map; @@ -297,6 +307,7 @@ class Sequencer const ICoroContextBasePtr& ctxToValidate); Dispatcher& _dispatcher; + std::atomic_bool _drain; int _controllerQueueId; SequenceKeyData _universalContext; ContextMap _contexts; diff --git a/tests/quantum_sequencer_tests.cpp b/tests/quantum_sequencer_tests.cpp index ad75ca6..0aac8f0 100644 --- a/tests/quantum_sequencer_tests.cpp +++ b/tests/quantum_sequencer_tests.cpp @@ -148,7 +148,7 @@ TEST_P(SequencerTest, BasicTaskOrder) sequenceKeys[sequenceKey].push_back(id); sequencer.enqueue(sequenceKey, testData.makeTask(id)); } - getDispatcher().drain(); + sequencer.drain(); EXPECT_EQ(testData.results().size(), (size_t)taskCount); @@ -178,7 +178,7 @@ TEST_P(SequencerTest, TrimKeys) SequencerTestData::SequenceKey sequenceKey = id % sequenceKeyCount; sequencer.enqueue(sequenceKey, testData.makeTask(id)); } - getDispatcher().drain(); + sequencer.drain(); EXPECT_EQ(sequencer.getSequenceKeyCount(), (size_t)sequenceKeyCount); EXPECT_EQ(sequencer.trimSequenceKeys(), 0u); @@ -250,7 +250,7 @@ TEST_P(SequencerTest, ExceptionHandler) sequencer.enqueue(&sequenceKeys[id], (int)IQueue::QueueId::Any, false, sequenceKey, testData.makeTask(id)); } } - getDispatcher().drain(); + sequencer.drain(); EXPECT_EQ(generatedExceptionCount, exceptionCallbackCallCount); } @@ -328,7 +328,7 @@ TEST_P(SequencerTest, SequenceKeyStats) } } - getDispatcher().drain(); + sequencer.drain(); // check the final stats postedCount = 0; @@ -344,9 +344,9 @@ TEST_P(SequencerTest, SequenceKeyStats) pendingCount += universalStatsAfter.getPendingTaskCount(); EXPECT_EQ(sequenceKeyCount, (int)sequencer.getSequenceKeyCount()); - EXPECT_EQ((unsigned int)taskCount, postedCount); + EXPECT_EQ((unsigned int)taskCount, postedCount-1); //-1 for drain() EXPECT_EQ(0u, pendingCount); - EXPECT_EQ(taskCount, (int)sequencer.getTaskStatistics().getPostedTaskCount()); + EXPECT_EQ(taskCount, (int)sequencer.getTaskStatistics().getPostedTaskCount()-1); //-1 for drain() EXPECT_EQ(0u, sequencer.getTaskStatistics().getPendingTaskCount()); } @@ -379,7 +379,7 @@ TEST_P(SequencerTest, TaskOrderWithUniversal) sequencer.enqueue(sequenceKey, testData.makeTask(id)); } } - getDispatcher().drain(); + sequencer.drain(); EXPECT_EQ((int)testData.results().size(), taskCount); EXPECT_EQ((int)sequencer.getSequenceKeyCount(), sequenceKeyCount); @@ -441,7 +441,7 @@ TEST_P(SequencerTest, MultiSequenceKeyTasks) // save the task id for this sequenceKey sequencer.enqueue(sequenceKeys, testData.makeTask(id)); } - getDispatcher().drain(); + sequencer.drain(); EXPECT_EQ((int)testData.results().size(), taskCount); EXPECT_EQ((int)sequencer.getSequenceKeyCount(), sequenceKeyCount); @@ -514,7 +514,7 @@ TEST_P(SequencerTest, CustomHashFunction) // enqueue the task with the real sequenceKey id sequencer.enqueue(std::move(sequenceKey), testData.makeTask(id)); } - getDispatcher().drain(); + sequencer.drain(); EXPECT_EQ((int)testData.results().size(), taskCount); EXPECT_EQ((int)sequencer.getSequenceKeyCount(), restrictedSequenceKeyCount); diff --git a/tests/quantum_tests.cpp b/tests/quantum_tests.cpp index 0a54444..c060405 100644 --- a/tests/quantum_tests.cpp +++ b/tests/quantum_tests.cpp @@ -22,6 +22,7 @@ #include #include #include +#include using namespace quantum; using ms = std::chrono::milliseconds; @@ -151,6 +152,14 @@ int DummyIoTask(ThreadPromise::Ptr promise) return 0; } +struct Dummy +{ + int MemberCoro(CoroContext::Ptr ctx) + { + return ctx->set("test"); + } +}; + #ifdef BOOST_USE_VALGRIND int fibInput = 10; #else