Skip to content

Commit

Permalink
feat concurrent: optimize MpscQueue
Browse files Browse the repository at this point in the history
Before:
```
Benchmark                                                                     Time             CPU   Iterations
---------------------------------------------------------------------------------------------------------------
producer_consumer<concurrent::MpscQueue<std::size_t>>/1/1/1000000000        419 ns          419 ns      1722668
producer_consumer<concurrent::MpscQueue<std::size_t>>/2/1/1000000000        627 ns          621 ns      1099251
producer_consumer<concurrent::MpscQueue<std::size_t>>/4/1/1000000000        959 ns          956 ns       553335
```

After:
```
Benchmark                                                                     Time             CPU   Iterations
---------------------------------------------------------------------------------------------------------------
producer_consumer<concurrent::MpscQueue<std::size_t>>/1/1/1000000000        209 ns          209 ns      3428423
producer_consumer<concurrent::MpscQueue<std::size_t>>/2/1/1000000000        534 ns          534 ns      1000000
producer_consumer<concurrent::MpscQueue<std::size_t>>/4/1/1000000000        624 ns          624 ns       984017
```

Now the overhead mostly comes from `engine::Semaphore` and `engine::SingleConsumerEvent` that are used in `MpscQueue` for awaiting non-emptiness and non-fullness.
commit_hash:cc8529376d0f83d5344648599b107a6ef268f2b5
  • Loading branch information
Anton3 committed Oct 29, 2024
1 parent a1c5182 commit 0cdf042
Show file tree
Hide file tree
Showing 20 changed files with 153 additions and 106 deletions.
4 changes: 2 additions & 2 deletions .mapping.json
Original file line number Diff line number Diff line change
Expand Up @@ -605,7 +605,9 @@
"core/include/userver/concurrent/background_task_storage_fwd.hpp":"taxi/uservices/userver/core/include/userver/concurrent/background_task_storage_fwd.hpp",
"core/include/userver/concurrent/conflated_event_channel.hpp":"taxi/uservices/userver/core/include/userver/concurrent/conflated_event_channel.hpp",
"core/include/userver/concurrent/impl/asymmetric_fence.hpp":"taxi/uservices/userver/core/include/userver/concurrent/impl/asymmetric_fence.hpp",
"core/include/userver/concurrent/impl/interference_shield.hpp":"taxi/uservices/userver/core/include/userver/concurrent/impl/interference_shield.hpp",
"core/include/userver/concurrent/impl/intrusive_hooks.hpp":"taxi/uservices/userver/core/include/userver/concurrent/impl/intrusive_hooks.hpp",
"core/include/userver/concurrent/impl/intrusive_mpsc_queue.hpp":"taxi/uservices/userver/core/include/userver/concurrent/impl/intrusive_mpsc_queue.hpp",
"core/include/userver/concurrent/impl/intrusive_stack.hpp":"taxi/uservices/userver/core/include/userver/concurrent/impl/intrusive_stack.hpp",
"core/include/userver/concurrent/impl/semaphore_capacity_control.hpp":"taxi/uservices/userver/core/include/userver/concurrent/impl/semaphore_capacity_control.hpp",
"core/include/userver/concurrent/impl/striped_read_indicator.hpp":"taxi/uservices/userver/core/include/userver/concurrent/impl/striped_read_indicator.hpp",
Expand Down Expand Up @@ -1030,10 +1032,8 @@
"core/src/concurrent/conflated_event_channel_test.cpp":"taxi/uservices/userver/core/src/concurrent/conflated_event_channel_test.cpp",
"core/src/concurrent/impl/asymmetric_fence.cpp":"taxi/uservices/userver/core/src/concurrent/impl/asymmetric_fence.cpp",
"core/src/concurrent/impl/fast_atomic.hpp":"taxi/uservices/userver/core/src/concurrent/impl/fast_atomic.hpp",
"core/src/concurrent/impl/interference_shield.hpp":"taxi/uservices/userver/core/src/concurrent/impl/interference_shield.hpp",
"core/src/concurrent/impl/interference_shield_test.cpp":"taxi/uservices/userver/core/src/concurrent/impl/interference_shield_test.cpp",
"core/src/concurrent/impl/intrusive_mpsc_queue.cpp":"taxi/uservices/userver/core/src/concurrent/impl/intrusive_mpsc_queue.cpp",
"core/src/concurrent/impl/intrusive_mpsc_queue.hpp":"taxi/uservices/userver/core/src/concurrent/impl/intrusive_mpsc_queue.hpp",
"core/src/concurrent/impl/intrusive_mpsc_queue_benchmark.cpp":"taxi/uservices/userver/core/src/concurrent/impl/intrusive_mpsc_queue_benchmark.cpp",
"core/src/concurrent/impl/intrusive_mpsc_queue_test.cpp":"taxi/uservices/userver/core/src/concurrent/impl/intrusive_mpsc_queue_test.cpp",
"core/src/concurrent/impl/latch.hpp":"taxi/uservices/userver/core/src/concurrent/impl/latch.hpp",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

#include <atomic>

#include <concurrent/impl/interference_shield.hpp>
#include <userver/concurrent/impl/interference_shield.hpp>
#include <userver/concurrent/impl/intrusive_hooks.hpp>
#include <userver/utils/not_null.hpp>

Expand Down Expand Up @@ -51,11 +51,39 @@ class IntrusiveMpscQueueImpl final {
// Returns the oldest pushed node, or `nullptr` if the queue is logically
// empty. Momentarily spins if necessary for a concurrent Push to complete.
// Can only be called from one thread at a time.
NodePtr TryPop() noexcept;
NodePtr TryPopBlocking() noexcept;

// Returns the oldest pushed not, or `nullptr` if the queue "seems to be"
// empty. Can only be called from one thread at a time.
// Unlike TryPopBlocking, never blocks, but this comes at a cost: it might
// not return an item that has been completely pushed (happens-before).
//
// The exact semantics of the ordering are as follows.
// Items are pushed in a flat-combining manner: if two items are being
// pushed concurrently, then one producer is randomly chosen to be
// responsible for pushing both items, and the other producer walks away,
// and for them the push operation is essentially pushed asynchronously.
//
// If the producers always notify the consumer after pushing,
// then TryPopWeak is enough: the consumer will be notified of all
// the pushed items by some of the producers.
//
// If for some items the consumer is not notified, and for some "urgent"
// items it is notified, then it's not a good idea to use TryPopWeak,
// because an "urgent" item may not yet be pushed completely
// upon the notification.
NodePtr TryPopWeak() noexcept;

private:
enum class PopMode {
kRarelyBlocking,
kWeak,
};

static std::atomic<NodePtr>& GetNext(NodeRef node) noexcept;

NodePtr DoTryPop(PopMode) noexcept;

// This node is put into the queue when it would otherwise be empty.
SinglyLinkedBaseHook stub_;
// Points to the oldest node not yet popped by the consumer,
Expand All @@ -77,7 +105,9 @@ class IntrusiveMpscQueue final {

void Push(T& node) noexcept { impl_.Push(IntrusiveMpscQueueImpl::NodeRef{&node}); }

T* TryPop() noexcept { return static_cast<T*>(impl_.TryPop()); }
T* TryPopBlocking() noexcept { return static_cast<T*>(impl_.TryPopBlocking()); }

T* TryPopWeak() noexcept { return static_cast<T*>(impl_.TryPopWeak()); }

private:
IntrusiveMpscQueueImpl impl_;
Expand Down
64 changes: 18 additions & 46 deletions core/include/userver/concurrent/mpsc_queue.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,7 @@
#include <limits>
#include <memory>

#include <boost/lockfree/queue.hpp>

#include <userver/concurrent/impl/intrusive_mpsc_queue.hpp>
#include <userver/concurrent/impl/semaphore_capacity_control.hpp>
#include <userver/concurrent/queue_helpers.hpp>
#include <userver/engine/deadline.hpp>
Expand All @@ -23,40 +22,11 @@ namespace concurrent {

namespace impl {

/// Helper template. Default implementation is straightforward.
template <typename T>
struct QueueHelper {
using LockFreeQueue = boost::lockfree::queue<T>;

static void Push(LockFreeQueue& queue, T&& value) {
[[maybe_unused]] bool push_result = queue.push(std::move(value));
UASSERT(push_result);
}
struct MpscQueueNode final : public SinglyLinkedBaseHook {
explicit MpscQueueNode(T&& value) : value(std::move(value)) {}

[[nodiscard]] static bool Pop(LockFreeQueue& queue, T& value) { return queue.pop(value); }

static_assert(
std::is_trivially_destructible_v<T>,
"T has non-trivial destructor. Use "
"MpscQueue<std::unique_ptr<T>> instead of MpscQueue<T>"
);
};

/// This partial specialization allows to use std::unique_ptr with Queue.
template <typename T>
struct QueueHelper<std::unique_ptr<T>> {
using LockFreeQueue = boost::lockfree::queue<T*>;

static void Push(LockFreeQueue& queue, std::unique_ptr<T>&& value) {
QueueHelper<T*>::Push(queue, value.release());
}

[[nodiscard]] static bool Pop(LockFreeQueue& queue, std::unique_ptr<T>& value) {
T* ptr{nullptr};
if (!QueueHelper<T*>::Pop(queue, ptr)) return false;
value.reset(ptr);
return true;
}
T value;
};

} // namespace impl
Expand All @@ -77,7 +47,7 @@ class MpscQueue final : public std::enable_shared_from_this<MpscQueue<T>> {
explicit EmplaceEnabler() = default;
};

using QueueHelper = impl::QueueHelper<T>;
using Node = impl::MpscQueueNode<T>;

using ProducerToken = impl::NoToken;
using ConsumerToken = impl::NoToken;
Expand Down Expand Up @@ -156,12 +126,10 @@ class MpscQueue final : public std::enable_shared_from_this<MpscQueue<T>> {
void MarkConsumerIsDead();
void MarkProducerIsDead();

// Resolves to boost::lockfree::queue<T> except for std::unique_ptr<T>
// specialization. In that case, resolves to boost::lockfree::queue<T*>
typename QueueHelper::LockFreeQueue queue_{1};
engine::SingleConsumerEvent nonempty_event_;
impl::IntrusiveMpscQueue<Node> queue_{};
engine::SingleConsumerEvent nonempty_event_{};
engine::CancellableSemaphore remaining_capacity_;
concurrent::impl::SemaphoreCapacityControl remaining_capacity_control_;
impl::SemaphoreCapacityControl remaining_capacity_control_;
std::atomic<bool> consumer_is_created_{false};
std::atomic<bool> consumer_is_created_and_dead_{false};
std::atomic<bool> producer_is_created_and_dead_{false};
Expand All @@ -173,10 +141,9 @@ template <typename T>
MpscQueue<T>::~MpscQueue() {
UASSERT(consumer_is_created_and_dead_ || !consumer_is_created_);
UASSERT(!producers_count_);
// Clear remaining items in queue. This will work for unique_ptr as well.
T value;
ConsumerToken temp_token{queue_};
while (PopNoblock(temp_token, value)) {
// Clear remaining items in queue.
while (const auto node = std::unique_ptr<Node>{queue_.TryPopBlocking()}) {
remaining_capacity_.unlock_shared();
}
}

Expand Down Expand Up @@ -234,7 +201,10 @@ bool MpscQueue<T>::DoPush(ProducerToken& /*unused*/, T&& value) {
return false;
}

QueueHelper::Push(queue_, std::move(value));
auto node = std::make_unique<Node>(std::move(value));
queue_.Push(*node);
(void)node.release();

++size_;
nonempty_event_.Send();

Expand All @@ -261,7 +231,9 @@ bool MpscQueue<T>::PopNoblock(ConsumerToken& token, T& value) {

template <typename T>
bool MpscQueue<T>::DoPop(ConsumerToken& /*unused*/, T& value) {
if (QueueHelper::Pop(queue_, value)) {
if (const auto node = std::unique_ptr<Node>{queue_.TryPopWeak()}) {
value = std::move(node->value);

--size_;
remaining_capacity_.unlock_shared();
nonempty_event_.Reset();
Expand Down
2 changes: 1 addition & 1 deletion core/src/concurrent/impl/interference_shield_test.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#include <concurrent/impl/interference_shield.hpp>
#include <userver/concurrent/impl/interference_shield.hpp>

#include <cstddef>

Expand Down
18 changes: 16 additions & 2 deletions core/src/concurrent/impl/intrusive_mpsc_queue.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#include <concurrent/impl/intrusive_mpsc_queue.hpp>
#include <userver/concurrent/impl/intrusive_mpsc_queue.hpp>

#include <compiler/relax_cpu.hpp>
#include <userver/utils/assert.hpp>
Expand Down Expand Up @@ -53,7 +53,13 @@ bool IntrusiveMpscQueueImpl::PushIfEmpty(NodeRef node) noexcept {
return false;
}

IntrusiveMpscQueueImpl::NodePtr IntrusiveMpscQueueImpl::TryPop() noexcept {
IntrusiveMpscQueueImpl::NodePtr IntrusiveMpscQueueImpl::TryPopBlocking() noexcept {
return DoTryPop(PopMode::kRarelyBlocking);
}

IntrusiveMpscQueueImpl::NodePtr IntrusiveMpscQueueImpl::TryPopWeak() noexcept { return DoTryPop(PopMode::kWeak); }

IntrusiveMpscQueueImpl::NodePtr IntrusiveMpscQueueImpl::DoTryPop(PopMode mode) noexcept {
UASSERT_MSG(!is_consuming_.exchange(true), "Multiple concurrent consumers detected");
const utils::FastScopeGuard guard([this]() noexcept {
UASSERT_MSG(is_consuming_.exchange(false), "Multiple concurrent consumers detected");
Expand All @@ -64,6 +70,10 @@ IntrusiveMpscQueueImpl::NodePtr IntrusiveMpscQueueImpl::TryPop() noexcept {

if (tail == &stub_) {
if (next == nullptr) {
if (mode == PopMode::kWeak) {
return nullptr;
}

// An addition to the base algorithm. We check if the queue is really
// empty, or if a Push is in process. We do this because other nodes may
// have already been pushed after the blocking node.
Expand Down Expand Up @@ -103,6 +113,10 @@ IntrusiveMpscQueueImpl::NodePtr IntrusiveMpscQueueImpl::TryPop() noexcept {

// A node is actually being pushed after 'tail', 'tail.next' has just not been
// corrected yet.
if (mode == PopMode::kWeak) {
return nullptr;
}

next = BlockThreadUntilNotNull(GetNext(tail));

tail_ = *next;
Expand Down
12 changes: 6 additions & 6 deletions core/src/concurrent/impl/intrusive_mpsc_queue_benchmark.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
#include <boost/lockfree/queue.hpp>

#include <compiler/relax_cpu.hpp>
#include <concurrent/impl/intrusive_mpsc_queue.hpp>
#include <userver/concurrent/impl/intrusive_mpsc_queue.hpp>
#include <userver/utils/fixed_array.hpp>

USERVER_NAMESPACE_BEGIN
Expand Down Expand Up @@ -83,15 +83,15 @@ class IntrusiveMpscQueue final {
explicit IntrusiveMpscQueue(std::size_t /*producer_count*/) {}

~IntrusiveMpscQueue() {
while (auto* const node = queue_.TryPop()) {
while (auto* const node = queue_.TryPopBlocking()) {
delete node;
}
}

void Produce(std::size_t /*producer_id*/) { queue_.Push(*new Node()); }

bool TryConsume() {
auto* const node = queue_.TryPop();
auto* const node = queue_.TryPopBlocking();
if (node) {
benchmark::DoNotOptimize(node->foo);
delete node;
Expand Down Expand Up @@ -229,7 +229,7 @@ void IntrusiveMpscQueueProduceConsumeNoAlloc(benchmark::State& state) {
while (keep_running) {
Node* node = nullptr;
while (!node) {
node = queue1.TryPop();
node = queue1.TryPopBlocking();
}
queue2.Push(*node);
}
Expand All @@ -238,7 +238,7 @@ void IntrusiveMpscQueueProduceConsumeNoAlloc(benchmark::State& state) {
for ([[maybe_unused]] auto _ : state) {
Node* node = nullptr;
while (!node) {
node = queue2.TryPop();
node = queue2.TryPopBlocking();
}
queue1.Push(*node);
}
Expand Down Expand Up @@ -274,7 +274,7 @@ void IntrusiveMpscQueueProduceConsumeNoContentionNoAlloc(benchmark::State& state

for ([[maybe_unused]] auto _ : state) {
queue.Push(node);
benchmark::DoNotOptimize(queue.TryPop());
benchmark::DoNotOptimize(queue.TryPopBlocking());
}
}

Expand Down
44 changes: 22 additions & 22 deletions core/src/concurrent/impl/intrusive_mpsc_queue_test.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#include <concurrent/impl/intrusive_mpsc_queue.hpp>
#include <userver/concurrent/impl/intrusive_mpsc_queue.hpp>

#include <atomic>
#include <chrono>
Expand Down Expand Up @@ -31,20 +31,20 @@ constexpr std::chrono::milliseconds kStressTestDuration{200};

TEST(IntrusiveMpscQueue, Empty) {
MpscQueue queue;
EXPECT_EQ(queue.TryPop(), nullptr);
EXPECT_EQ(queue.TryPop(), nullptr);
EXPECT_EQ(queue.TryPop(), nullptr);
EXPECT_EQ(queue.TryPopWeak(), nullptr);
EXPECT_EQ(queue.TryPopWeak(), nullptr);
EXPECT_EQ(queue.TryPopWeak(), nullptr);
}

TEST(IntrusiveMpscQueue, PushPopOnce) {
MpscQueue queue;
Node node1;

queue.Push(node1);
EXPECT_EQ(queue.TryPop(), &node1);
EXPECT_EQ(queue.TryPop(), nullptr);
EXPECT_EQ(queue.TryPop(), nullptr);
EXPECT_EQ(queue.TryPop(), nullptr);
EXPECT_EQ(queue.TryPopWeak(), &node1);
EXPECT_EQ(queue.TryPopWeak(), nullptr);
EXPECT_EQ(queue.TryPopWeak(), nullptr);
EXPECT_EQ(queue.TryPopWeak(), nullptr);
}

TEST(IntrusiveMpscQueue, PushPopTwice) {
Expand All @@ -54,11 +54,11 @@ TEST(IntrusiveMpscQueue, PushPopTwice) {

queue.Push(node1);
queue.Push(node2);
EXPECT_EQ(queue.TryPop(), &node1);
EXPECT_EQ(queue.TryPop(), &node2);
EXPECT_EQ(queue.TryPop(), nullptr);
EXPECT_EQ(queue.TryPop(), nullptr);
EXPECT_EQ(queue.TryPop(), nullptr);
EXPECT_EQ(queue.TryPopWeak(), &node1);
EXPECT_EQ(queue.TryPopWeak(), &node2);
EXPECT_EQ(queue.TryPopWeak(), nullptr);
EXPECT_EQ(queue.TryPopWeak(), nullptr);
EXPECT_EQ(queue.TryPopWeak(), nullptr);
}

TEST(IntrusiveMpscQueue, PushPopInterleaved) {
Expand All @@ -69,14 +69,14 @@ TEST(IntrusiveMpscQueue, PushPopInterleaved) {

queue.Push(node1);
queue.Push(node2);
EXPECT_EQ(queue.TryPop(), &node1);
EXPECT_EQ(queue.TryPopWeak(), &node1);

queue.Push(node3);
EXPECT_EQ(queue.TryPop(), &node2);
EXPECT_EQ(queue.TryPop(), &node3);
EXPECT_EQ(queue.TryPop(), nullptr);
EXPECT_EQ(queue.TryPop(), nullptr);
EXPECT_EQ(queue.TryPop(), nullptr);
EXPECT_EQ(queue.TryPopWeak(), &node2);
EXPECT_EQ(queue.TryPopWeak(), &node3);
EXPECT_EQ(queue.TryPopWeak(), nullptr);
EXPECT_EQ(queue.TryPopWeak(), nullptr);
EXPECT_EQ(queue.TryPopWeak(), nullptr);
}

TEST(IntrusiveMpscQueue, StressTest) {
Expand Down Expand Up @@ -106,7 +106,7 @@ TEST(IntrusiveMpscQueue, StressTest) {
std::size_t stop_signals_received = 0;

while (true) {
std::unique_ptr<Node> node(queue.TryPop());
std::unique_ptr<Node> node(queue.TryPopWeak());
if (!node) continue;

if (node->x == kStopSignal) {
Expand Down Expand Up @@ -143,7 +143,7 @@ TEST(IntrusiveMpscQueue, StressTestNodeReuse) {

auto worker1 = std::async([&] {
while (keep_running) {
Node* node = queue1.TryPop();
Node* node = queue1.TryPopWeak();
if (!node) continue;

EXPECT_EQ(node->x, 0);
Expand All @@ -153,7 +153,7 @@ TEST(IntrusiveMpscQueue, StressTestNodeReuse) {

auto worker2 = std::async([&] {
while (keep_running) {
Node* node = queue2.TryPop();
Node* node = queue2.TryPopWeak();
if (!node) continue;

EXPECT_EQ(node->x, 0);
Expand Down
Loading

0 comments on commit 0cdf042

Please sign in to comment.