Skip to content

Commit

Permalink
Added dlsm::Thread::Mask. Updated tests/perf/delays.py. Fixed defects…
Browse files Browse the repository at this point in the history
… in tests/perf/Timestamps.hpp
  • Loading branch information
pkarneliuk committed Apr 4, 2024
1 parent c4ca968 commit 3774cd3
Show file tree
Hide file tree
Showing 9 changed files with 246 additions and 84 deletions.
24 changes: 21 additions & 3 deletions include/impl/Thread.hpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
#pragma once

#include <bitset>
#include <string>
#include <vector>

#if defined(__x86_64__)
#include <emmintrin.h>
Expand All @@ -11,9 +13,25 @@ namespace dlsm::Thread {
void name(const std::string& name, std::size_t handle = 0);
std::string name(std::size_t handle = 0);

static constexpr std::size_t AllCPU = 0;
void affinity(std::size_t cpuid = AllCPU, std::size_t handle = 0);
std::size_t getaffinity(std::size_t handle = 0);
static constexpr std::size_t MaskSize = 64;
struct Mask : public std::bitset<MaskSize> {
Mask() = default;
Mask(const std::vector<std::size_t>& affinity);
operator std::vector<std::size_t>() const;
explicit operator bool() const { return count(); }

std::string to_string() const;
Mask at(std::size_t index) const;
Mask extract(std::size_t n = 1); // Extract n least significant bits and return them
};

static_assert(sizeof(Mask) == sizeof(unsigned long long));

void affinity(Mask mask, std::size_t handle = 0);
[[nodiscard]] Mask affinity(std::size_t handle = 0);
static const Mask AllCPU = affinity();
// The number of processors currently online (available)
Mask AllAvailableCPU();

[[gnu::always_inline, gnu::hot]] inline void pause() noexcept {
#if defined(__x86_64__)
Expand Down
77 changes: 66 additions & 11 deletions src/Thread.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@

#include <pthread.h>

#include <bit>
#include <cstring>
#include <ctime>
#include <system_error>
#include <thread>

namespace {
pthread_t tid(std::size_t handle = 0) { return (handle == 0) ? pthread_self() : static_cast<pthread_t>(handle); }
Expand All @@ -27,31 +29,84 @@ std::string name(std::size_t handle) {
return name;
}

void affinity(std::size_t cpuid, std::size_t handle) {
Mask::Mask(const std::vector<std::size_t>& affinity) {
for (const auto index : affinity) set(index);
}

Mask::operator std::vector<std::size_t>() const {
std::vector<std::size_t> affinity;
affinity.reserve(count());
for (std::size_t i = 0; i < size(); ++i) {
if (test(i)) affinity.emplace_back(i);
}
return affinity;
}

std::string Mask::to_string() const {
const auto msb = static_cast<std::size_t>(std::countl_zero(to_ullong()));
std::string result;
result.reserve(size() - msb);
for (std::size_t i = 0; i < (size() - msb); ++i) {
result += (*this)[i] ? std::to_string(i) : std::string{"."};
}
return result;
}

Mask Mask::at(std::size_t index) const {
if (index >= count()) throw std::runtime_error("No available bits at index:" + std::to_string(index));
Mask result;
for (std::size_t i = 0; i < size(); ++i) {
if (test(i) && index-- == 0) {
result.set(i);
break;
}
}
return result;
}

Mask Mask::extract(std::size_t n) {
if (n > count()) throw std::runtime_error("No bits for extraction:" + std::to_string(n));
Mask result;
for (std::size_t i = 0; i < n; ++i) {
const auto index = static_cast<std::size_t>(std::countr_zero(to_ullong()));
reset(index);
result.set(index);
}
return result;
}

void affinity(const Mask mask, std::size_t handle) {
cpu_set_t cpuset;
CPU_ZERO(&cpuset);
if (cpuid == AllCPU) {
for (std::size_t i = 0; i < CPU_SETSIZE; ++i) {
CPU_SET(i, &cpuset);
}
} else {
CPU_SET(cpuid, &cpuset);
for (std::size_t i = 0; i < mask.size() && i < CPU_SETSIZE; ++i) {
if (mask.test(i)) CPU_SET(i, &cpuset);
}

if (const int err = ::pthread_setaffinity_np(tid(handle), sizeof(cpuset), &cpuset)) {
throw std::system_error(err, std::system_category(), "pthread_setaffinity_np()");
}
}

std::size_t getaffinity(std::size_t handle) {
Mask affinity(std::size_t handle) {
cpu_set_t cpuset;
if (const int err = ::pthread_getaffinity_np(tid(handle), sizeof(cpuset), &cpuset)) {
throw std::system_error(err, std::system_category(), "pthread_getaffinity_np()");
}
for (std::size_t i = 0; i < CPU_SETSIZE; ++i) {
if (CPU_ISSET(i, &cpuset)) return i;

Mask mask;
for (std::size_t i = 0; i < mask.size() && i < CPU_SETSIZE; ++i) {
if (CPU_ISSET(i, &cpuset)) mask.set(i);
}
return 0;
return mask;
}

Mask AllAvailableCPU() {
const auto cpus = std::thread::hardware_concurrency();
Mask result;
result.flip(); // Inverse zeros to ones
result <<= cpus;
result.flip();
return result;
}

void NanoSleep::pause() noexcept {
Expand Down
2 changes: 1 addition & 1 deletion tests/perf/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
find_package(benchmark REQUIRED benchmark)

add_executable (perf Perf.cpp)
target_precompile_headers (perf PRIVATE Perf.hpp)
target_precompile_headers (perf PRIVATE Perf.hpp Timestamps.hpp)
target_include_directories(perf PRIVATE ../)
target_link_libraries (perf PRIVATE dlsm benchmark::benchmark)
target_sources (perf PRIVATE
Expand Down
45 changes: 32 additions & 13 deletions tests/perf/PerfTransport.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,31 +36,38 @@ void TransportPubSub(benchmark::State& state, Args&&... args) {
// clang-format on

auto runtime = dlsm::Transport<std::remove_pointer_t<decltype(type)>>(ropts);
const auto affinity = dlsm::Thread::getaffinity();
std::mutex state_mutex;
const auto synchronized = [&](auto action) {
std::lock_guard<std::mutex> guard(state_mutex);
action();
};

for (auto _ : state) {
const auto affinity = dlsm::Thread::AllCPU;
dlsm::Thread::affinity(affinity.at(0));
std::atomic_uint64_t last_sent = 0, send_failed = 0;
std::vector<std::jthread> threads{1 + subscribers};
std::barrier sync(std::ssize(threads));

auto timestamps =
Tests::Perf::Timestamps<dlsm::Clock::Monotonic, dlsm::MAdviseAllocator>{std::size(threads), tosend};

auto begin = timestamps.ts(), end = begin;

for (std::size_t i = 0; auto& t : threads) {
t = std::jthread([&, i]() {
if (affinity) dlsm::Thread::affinity(affinity + i);
if (affinity.count() > i + 1) dlsm::Thread::affinity(affinity.at(i + 1));

const auto name = (i == 0) ? "Pub"s : "Sub" + std::to_string(i);
dlsm::Thread::name(name);

auto& ts = timestamps[i];
std::uint64_t count = 0;
if (i == 0) {
dlsm::Thread::name("Pub");
auto pub = runtime.pub(popts);

sync.arrive_and_wait();
begin = timestamps.ts();

Event e{0ns, 0};
while (count < tosend) {
Expand All @@ -86,12 +93,12 @@ void TransportPubSub(benchmark::State& state, Args&&... args) {
}
count += 1;
}
synchronized([&] { state.counters["Pub"] = static_cast<double>(count); });
last_sent = count;
sync.arrive_and_wait();
end = timestamps.ts();

synchronized([&] { state.counters[name] = static_cast<double>(count); });
} else {
const auto name = "Sub" + std::to_string(i);
dlsm::Thread::name(name);
auto sub = runtime.sub(sopts);
std::size_t timeouts = 0;

Expand Down Expand Up @@ -121,28 +128,30 @@ void TransportPubSub(benchmark::State& state, Args&&... args) {
}
}
}
sync.arrive_and_wait();

synchronized([&] {
if (const auto lost = tosend - count) state.counters[name + "Lost"] = static_cast<double>(lost);
if (timeouts) state.counters[name + "TO"] = static_cast<double>(timeouts);
});
sync.arrive_and_wait();
}

// Write timestamps to files
timestamps.write(ts, state.name() + "-" + name);
});
++i;
}
threads.clear();
dlsm::Thread::affinity(dlsm::Thread::AllCPU);

// Calculate delays for all samples and percentiles
for (const auto& p : timestamps.percentiles()) {
state.counters[p.label] = std::chrono::duration<double>(p.value).count();
}

// Write timestamps to files
auto replaceAll = [](auto s, std::string_view o, std::string_view n) {
while (s.find(o) != std::string::npos) s.replace(s.find(o), o.size(), n);
return s;
};
timestamps.write(replaceAll(state.name(), "/", "-"));
const std::chrono::duration<double> seconds = end - begin;
state.SetIterationTime(seconds.count());
state.counters["per_item(avg)"] = (seconds / tosend).count();
}
}
} // namespace
Expand All @@ -166,25 +175,33 @@ const auto sopts = ",delay_ms=500,recv_timeout_ms=100,recv_buf_size="s + std::to
// Use BENCHMARK_TEMPLATE1_CAPTURE after 1.8.4+ release
BENCHMARK_CAPTURE(TransportPubSub, mem, (dlsm::ZMQ*)nullptr, "io_threads=0"s,
"endpoint=inproc://mem-pub-sub-perf"s + popts, "endpoint=inproc://mem-pub-sub-perf"s + sopts)
->MeasureProcessCPUTime()
->UseManualTime()
->Unit(benchmark::kSecond)
->Iterations(1)
->Repetitions(repeats)
->Args({4, num_msgs, 0, 1});
BENCHMARK_CAPTURE(TransportPubSub, ipc, (dlsm::ZMQ*)nullptr, "io_threads=2"s,
"endpoint=ipc://@ipc-pub-sub-perf"s + popts, "endpoint=ipc://@ipc-pub-sub-perf"s + sopts)
->MeasureProcessCPUTime()
->UseManualTime()
->Unit(benchmark::kSecond)
->Iterations(1)
->Repetitions(repeats)
->Args({4, num_msgs, 0, 1});
BENCHMARK_CAPTURE(TransportPubSub, tcp, (dlsm::ZMQ*)nullptr, "io_threads=2"s, "endpoint=tcp://127.0.0.1:5551"s + popts,
"endpoint=tcp://127.0.0.1:5551"s + sopts)
->MeasureProcessCPUTime()
->UseManualTime()
->Unit(benchmark::kSecond)
->Iterations(1)
->Repetitions(repeats)
->Args({4, num_msgs, 0, 1});

// BENCHMARK_CAPTURE(TransportPubSub, iox, (dlsm::IOX*)nullptr, "name=iox,inproc=on,pools=64x10000,log=off"s,
// "service=iox/test/perf,onfull=wait"s, "service=iox/test/perf,onfull=wait"s)
// ->MeasureProcessCPUTime()
// ->UseManualTime()
// ->Unit(benchmark::kSecond)
// ->Iterations(1)
// ->Repetitions(repeats)
Expand All @@ -194,6 +211,8 @@ BENCHMARK_CAPTURE(TransportPubSub, tcp, (dlsm::ZMQ*)nullptr, "io_threads=2"s, "e
// "name=iox,inproc=on,monitor=on,pools=32x10000000,log=verbose"s,
// "service=iox/test/perf,onfull=discard"s,
// "service=iox/test/perf,onfull=discard"s)
// ->MeasureProcessCPUTime()
// ->UseManualTime()
// ->Unit(benchmark::kSecond)
// ->Iterations(1)
// ->Repetitions(repeats)
Expand Down
2 changes: 2 additions & 0 deletions tests/perf/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ This script reads binary files with `int64` samples(nanoseconds timestamps), and

## Threads Affinity and CPU Core Isolation
```sh
lscpu # CPU summary
lscpu --all --extended --output-all # Per-core info
lstopo-no-graphics --no-io --no-legend --of txt # Display layout of available CPUs in physical packages
numactl --hardware # Display NUMA nodes
sudo grubby --update-kernel=ALL --args="isolcpus=6-11" # Isolate CPU #6 - #11 from OS scheduling
Expand Down
Loading

0 comments on commit 3774cd3

Please sign in to comment.