Skip to content

Commit

Permalink
Merge pull request #110 from accelerated/master
Browse files Browse the repository at this point in the history
Added drain functionality to the Sequencer
  • Loading branch information
Alex Damian authored Aug 23, 2019
2 parents 6bb6e73 + 437de94 commit 8675304
Show file tree
Hide file tree
Showing 6 changed files with 135 additions and 37 deletions.
21 changes: 1 addition & 20 deletions quantum/quantum_dispatcher.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include <quantum/quantum_context.h>
#include <quantum/quantum_configuration.h>
#include <quantum/quantum_macros.h>
#include <quantum/util/quantum_drain_guard.h>
#include <iterator>
#include <chrono>

Expand Down Expand Up @@ -382,26 +383,6 @@ class Dispatcher : public ITerminate
ThreadFuturePtr<RET>
postAsyncIoImpl2(int queueId, bool isHighPriority, FUNC&& func, ARGS&&... args);

struct DrainGuard
{
DrainGuard(std::atomic_bool& drain,
bool reactivate = true) :
_drain(drain),
_reactivate(reactivate)
{
_drain = true;
}
~DrainGuard()
{
if (_reactivate)
{
_drain = false;
}
}
std::atomic_bool& _drain;
bool _reactivate;
};

//Members
DispatcherCore _dispatcher;
std::atomic_bool _drain;
Expand Down
67 changes: 59 additions & 8 deletions quantum/util/impl/quantum_sequencer_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
//#################################### IMPLEMENTATIONS #########################################
//##############################################################################################

#include <quantum/util/quantum_drain_guard.h>
#include <quantum/quantum_promise.h>
#include <stdexcept>

namespace Bloomberg {
Expand All @@ -28,6 +30,7 @@ template <class SequenceKey, class Hash, class KeyEqual, class Allocator>
Sequencer<SequenceKey, Hash, KeyEqual, Allocator>::Sequencer(Dispatcher& dispatcher,
const typename Sequencer<SequenceKey, Hash, KeyEqual, Allocator>::Configuration& configuration) :
_dispatcher(dispatcher),
_drain(false),
_controllerQueueId(configuration.getControlQueueId()),
_universalContext(),
_contexts(configuration.getBucketCount(),
Expand All @@ -51,6 +54,10 @@ Sequencer<SequenceKey, Hash, KeyEqual, Allocator>::enqueue(
FUNC&& func,
ARGS&&... args)
{
if (_drain)
{
throw std::runtime_error("Sequencer is disabled");
}
_dispatcher.post2(_controllerQueueId,
false,
singleSequenceKeyTaskScheduler<FUNC, ARGS...>,
Expand All @@ -74,11 +81,14 @@ Sequencer<SequenceKey, Hash, KeyEqual, Allocator>::enqueue(
FUNC&& func,
ARGS&&... args)
{
if (_drain)
{
throw std::runtime_error("Sequencer is disabled");
}
if (queueId < (int)IQueue::QueueId::Any)
{
throw std::runtime_error("Invalid IO queue id");
}

_dispatcher.post2(_controllerQueueId,
false,
singleSequenceKeyTaskScheduler<FUNC, ARGS...>,
Expand All @@ -99,6 +109,10 @@ Sequencer<SequenceKey, Hash, KeyEqual, Allocator>::enqueue(
FUNC&& func,
ARGS&&... args)
{
if (_drain)
{
throw std::runtime_error("Sequencer is disabled");
}
_dispatcher.post2(_controllerQueueId,
false,
multiSequenceKeyTaskScheduler<FUNC, ARGS...>,
Expand All @@ -122,6 +136,10 @@ Sequencer<SequenceKey, Hash, KeyEqual, Allocator>::enqueue(
FUNC&& func,
ARGS&&... args)
{
if (_drain)
{
throw std::runtime_error("Sequencer is disabled");
}
if (queueId < (int)IQueue::QueueId::Any)
{
throw std::runtime_error("Invalid IO queue id");
Expand All @@ -143,6 +161,10 @@ template <class FUNC, class ... ARGS>
void
Sequencer<SequenceKey, Hash, KeyEqual, Allocator>::enqueueAll(FUNC&& func, ARGS&&... args)
{
if (_drain)
{
throw std::runtime_error("Sequencer is disabled");
}
_dispatcher.post2(_controllerQueueId,
false,
universalTaskScheduler<FUNC, ARGS...>,
Expand All @@ -164,6 +186,10 @@ Sequencer<SequenceKey, Hash, KeyEqual, Allocator>::enqueueAll(
FUNC&& func,
ARGS&&... args)
{
if (_drain)
{
throw std::runtime_error("Sequencer is disabled");
}
if (queueId < (int)IQueue::QueueId::Any)
{
throw std::runtime_error("Invalid IO queue id");
Expand Down Expand Up @@ -260,11 +286,11 @@ Sequencer<SequenceKey, Hash, KeyEqual, Allocator>::waitForTwoDependents(
{
universalDependent._context->wait(ctx);
}
int rc = callPosted(ctx, opaque, sequencer, std::forward<FUNC>(func), std::forward<ARGS>(args)...);
// update task stats
dependent._stats->decrementPendingTaskCount();
sequencer._taskStats->decrementPendingTaskCount();
callPosted(ctx, opaque, sequencer, std::forward<FUNC>(func), std::forward<ARGS>(args)...);
return 0;
return rc;
}

template <class SequenceKey, class Hash, class KeyEqual, class Allocator>
Expand Down Expand Up @@ -292,14 +318,14 @@ Sequencer<SequenceKey, Hash, KeyEqual, Allocator>::waitForDependents(
{
universalDependent._context->wait(ctx);
}
//update stats
int rc = callPosted(ctx, opaque, sequencer, std::forward<FUNC>(func), std::forward<ARGS>(args)...);
// update task stats
for (const auto& dependent : dependents)
{
dependent._stats->decrementPendingTaskCount();
}
// update task stats
sequencer._taskStats->decrementPendingTaskCount();
return callPosted(ctx, opaque, sequencer, std::forward<FUNC>(func), std::forward<ARGS>(args)...);
return rc;
}

template <class SequenceKey, class Hash, class KeyEqual, class Allocator>
Expand Down Expand Up @@ -327,10 +353,11 @@ Sequencer<SequenceKey, Hash, KeyEqual, Allocator>::waitForUniversalDependent(
{
universalDependent._context->wait(ctx);
}
universalDependent._stats->decrementPendingTaskCount();
int rc = callPosted(ctx, opaque, sequencer, std::forward<FUNC>(func), std::forward<ARGS>(args)...);
// update task stats
universalDependent._stats->decrementPendingTaskCount();
sequencer._taskStats->decrementPendingTaskCount();
return callPosted(ctx, opaque, sequencer, std::forward<FUNC>(func), std::forward<ARGS>(args)...);
return rc;
}

template <class SequenceKey, class Hash, class KeyEqual, class Allocator>
Expand Down Expand Up @@ -520,5 +547,29 @@ Sequencer<SequenceKey, Hash, KeyEqual, Allocator>::isPendingContext(const ICoroC
ctxToValidate->waitFor(ctx, std::chrono::milliseconds(0)) == std::future_status::timeout;
}

template <class SequenceKey, class Hash, class KeyEqual, class Allocator>
void
Sequencer<SequenceKey, Hash, KeyEqual, Allocator>::drain(std::chrono::milliseconds timeout,
bool isFinal)
{
std::shared_ptr<Promise<int>> promise = std::make_shared<Promise<int>>();
ThreadFuturePtr<int> future = promise->getIThreadFuture();

//enqueue a universal task and wait
enqueueAll([promise](VoidContextPtr)->int{
return promise->set(0);
});

DrainGuard guard(_drain, !isFinal);
if (timeout == std::chrono::milliseconds::zero())
{
future->wait();
}
else
{
future->waitFor(timeout);
}
}


}}
46 changes: 46 additions & 0 deletions quantum/util/quantum_drain_guard.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
** Copyright 2018 Bloomberg Finance L.P.
**
** Licensed under the Apache License, Version 2.0 (the "License");
** you may not use this file except in compliance with the License.
** You may obtain a copy of the License at
**
** http://www.apache.org/licenses/LICENSE-2.0
**
** Unless required by applicable law or agreed to in writing, software
** distributed under the License is distributed on an "AS IS" BASIS,
** WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
** See the License for the specific language governing permissions and
** limitations under the License.
*/
#ifndef BLOOMBERG_QUANTUM_DRAIN_GUARD_H
#define BLOOMBERG_QUANTUM_DRAIN_GUARD_H

namespace Bloomberg {
namespace quantum {

struct DrainGuard
{
DrainGuard(std::atomic_bool& drain,
bool reactivate = true) :
_drain(drain),
_reactivate(reactivate)
{
_drain = true;
}

~DrainGuard()
{
if (_reactivate)
{
_drain = false;
}
}
std::atomic_bool& _drain;
bool _reactivate;
};

}
}

#endif //BLOOMBERG_QUANTUM_DRAIN_GUARD_H
11 changes: 11 additions & 0 deletions quantum/util/quantum_sequencer.h
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,16 @@ class Sequencer
/// @note The difference with the previous two statistics methods is that it aggregates stats on a per-task basis,
/// not on per-key basis.
SequenceKeyStatistics getTaskStatistics();

/// @brief Drains all sequenced tasks.
/// @param[in] timeout Maximum time for this function to wait. Set to 0 to wait indefinitely until all sequences drain.
/// @param[in] isFinal If set to true, the sequencer will not allow any more processing after the drain completes.
/// @note This function blocks until all sequences have completed. During this time, posting
/// of new tasks is disabled unless they are posted from within an already executing coroutine.
/// Since this function posts a task which will wait on all others, getStatistics().getPostedTaskCount()
/// will contain one extra count.
void drain(std::chrono::milliseconds timeout = std::chrono::milliseconds::zero(),
bool isFinal = false);

private:
using ContextMap = std::unordered_map<SequenceKey, SequenceKeyData, Hash, KeyEqual, Allocator>;
Expand Down Expand Up @@ -297,6 +307,7 @@ class Sequencer
const ICoroContextBasePtr& ctxToValidate);

Dispatcher& _dispatcher;
std::atomic_bool _drain;
int _controllerQueueId;
SequenceKeyData _universalContext;
ContextMap _contexts;
Expand Down
18 changes: 9 additions & 9 deletions tests/quantum_sequencer_tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ TEST_P(SequencerTest, BasicTaskOrder)
sequenceKeys[sequenceKey].push_back(id);
sequencer.enqueue(sequenceKey, testData.makeTask(id));
}
getDispatcher().drain();
sequencer.drain();

EXPECT_EQ(testData.results().size(), (size_t)taskCount);

Expand Down Expand Up @@ -178,7 +178,7 @@ TEST_P(SequencerTest, TrimKeys)
SequencerTestData::SequenceKey sequenceKey = id % sequenceKeyCount;
sequencer.enqueue(sequenceKey, testData.makeTask(id));
}
getDispatcher().drain();
sequencer.drain();

EXPECT_EQ(sequencer.getSequenceKeyCount(), (size_t)sequenceKeyCount);
EXPECT_EQ(sequencer.trimSequenceKeys(), 0u);
Expand Down Expand Up @@ -250,7 +250,7 @@ TEST_P(SequencerTest, ExceptionHandler)
sequencer.enqueue(&sequenceKeys[id], (int)IQueue::QueueId::Any, false, sequenceKey, testData.makeTask(id));
}
}
getDispatcher().drain();
sequencer.drain();

EXPECT_EQ(generatedExceptionCount, exceptionCallbackCallCount);
}
Expand Down Expand Up @@ -328,7 +328,7 @@ TEST_P(SequencerTest, SequenceKeyStats)
}
}

getDispatcher().drain();
sequencer.drain();

// check the final stats
postedCount = 0;
Expand All @@ -344,9 +344,9 @@ TEST_P(SequencerTest, SequenceKeyStats)
pendingCount += universalStatsAfter.getPendingTaskCount();

EXPECT_EQ(sequenceKeyCount, (int)sequencer.getSequenceKeyCount());
EXPECT_EQ((unsigned int)taskCount, postedCount);
EXPECT_EQ((unsigned int)taskCount, postedCount-1); //-1 for drain()
EXPECT_EQ(0u, pendingCount);
EXPECT_EQ(taskCount, (int)sequencer.getTaskStatistics().getPostedTaskCount());
EXPECT_EQ(taskCount, (int)sequencer.getTaskStatistics().getPostedTaskCount()-1); //-1 for drain()
EXPECT_EQ(0u, sequencer.getTaskStatistics().getPendingTaskCount());
}

Expand Down Expand Up @@ -379,7 +379,7 @@ TEST_P(SequencerTest, TaskOrderWithUniversal)
sequencer.enqueue(sequenceKey, testData.makeTask(id));
}
}
getDispatcher().drain();
sequencer.drain();

EXPECT_EQ((int)testData.results().size(), taskCount);
EXPECT_EQ((int)sequencer.getSequenceKeyCount(), sequenceKeyCount);
Expand Down Expand Up @@ -441,7 +441,7 @@ TEST_P(SequencerTest, MultiSequenceKeyTasks)
// save the task id for this sequenceKey
sequencer.enqueue(sequenceKeys, testData.makeTask(id));
}
getDispatcher().drain();
sequencer.drain();

EXPECT_EQ((int)testData.results().size(), taskCount);
EXPECT_EQ((int)sequencer.getSequenceKeyCount(), sequenceKeyCount);
Expand Down Expand Up @@ -514,7 +514,7 @@ TEST_P(SequencerTest, CustomHashFunction)
// enqueue the task with the real sequenceKey id
sequencer.enqueue(std::move(sequenceKey), testData.makeTask(id));
}
getDispatcher().drain();
sequencer.drain();

EXPECT_EQ((int)testData.results().size(), taskCount);
EXPECT_EQ((int)sequencer.getSequenceKeyCount(), restrictedSequenceKeyCount);
Expand Down
9 changes: 9 additions & 0 deletions tests/quantum_tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include <unordered_map>
#include <list>
#include <memory>
#include <functional>

using namespace quantum;
using ms = std::chrono::milliseconds;
Expand Down Expand Up @@ -151,6 +152,14 @@ int DummyIoTask(ThreadPromise<int>::Ptr promise)
return 0;
}

struct Dummy
{
int MemberCoro(CoroContext<std::string>::Ptr ctx)
{
return ctx->set("test");
}
};

#ifdef BOOST_USE_VALGRIND
int fibInput = 10;
#else
Expand Down

0 comments on commit 8675304

Please sign in to comment.