Skip to content

Commit

Permalink
Library functions for terminating the thread pool
Browse files Browse the repository at this point in the history
  • Loading branch information
kittobi1992 committed Oct 2, 2024
1 parent 39e06c7 commit e8bac18
Show file tree
Hide file tree
Showing 15 changed files with 70 additions and 30 deletions.
2 changes: 1 addition & 1 deletion external_tools/kahypar-shared-resources
13 changes: 12 additions & 1 deletion include/libmtkahypar.h
Original file line number Diff line number Diff line change
Expand Up @@ -100,11 +100,22 @@ MT_KAHYPAR_API void mt_kahypar_set_individual_target_block_weights(mt_kahypar_co
const mt_kahypar_hypernode_weight_t* block_weights);


// ####################### Thread Pool Initialization #######################
// ####################### Thread Pool Initialization/Termination #######################

/**
* Initializes the TBB thread pool with the given number of threads, and activates the
* interleaved memory allocation policy if set.
*/
MT_KAHYPAR_API void mt_kahypar_initialize_thread_pool(const size_t num_threads,
const bool interleaved_allocations);

/**
* Terminates the TBB thread pool.
*
* \return `true` if the thread pool was successfully terminated, `false` otherwise.
*/
MT_KAHYPAR_API bool mt_kahypar_terminate_thread_pool();

// ####################### Load/Construct Hypergraph #######################

/**
Expand Down
6 changes: 5 additions & 1 deletion lib/libmtkahypar.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ void mt_kahypar_initialize_thread_pool(const size_t num_threads,
}

// Initialize TBB task arenas on numa nodes
TBBInitializer::instance(P);
TBBInitializer::instance().initialize(P);

if ( interleaved_allocations ) {
// We set the membind policy to interleaved allocations in order to
Expand All @@ -214,6 +214,10 @@ void mt_kahypar_initialize_thread_pool(const size_t num_threads,
}
}

bool mt_kahypar_terminate_thread_pool() {
return TBBInitializer::instance().terminate();
}

mt_kahypar_hypergraph_t mt_kahypar_read_hypergraph_from_file(const char* file_name,
const mt_kahypar_preset_type_t preset,
const mt_kahypar_file_format_type_t file_format) {
Expand Down
2 changes: 1 addition & 1 deletion mt-kahypar/application/mt_kahypar.cc.in
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ int main(int argc, char* argv[]) {
}

// Initialize TBB task arenas on numa nodes
TBBInitializer::instance(context.shared_memory.num_threads);
TBBInitializer::instance().initialize(context.shared_memory.num_threads);

// We set the membind policy to interleaved allocations in order to
// distribute allocations evenly across NUMA nodes
Expand Down
45 changes: 28 additions & 17 deletions mt-kahypar/parallel/tbb_initializer.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
#include <hwloc.h>
#include <mutex>
#include <memory>
#include <new>
#include <optional>
#include <shared_mutex>
#include <functional>

Expand Down Expand Up @@ -62,8 +64,8 @@ class TBBInitializer {
TBBInitializer(TBBInitializer&&) = delete;
TBBInitializer & operator= (TBBInitializer &&) = delete;

static TBBInitializer& instance(const size_t num_threads = std::thread::hardware_concurrency()) {
static TBBInitializer instance(num_threads);
static TBBInitializer& instance() {
static TBBInitializer instance;
return instance;
}

Expand All @@ -90,19 +92,12 @@ class TBBInitializer {
return cpuset;
}

void terminate() {
if ( _global_observer ) {
_global_observer->observe(false);
}
}
void initialize(std::size_t num_threads) {
// Initialize new `global_control` object with the desired number of threads.
_num_threads = num_threads;
_gc.reset();
_gc.emplace(tbb::global_control::max_allowed_parallelism, num_threads);

private:
explicit TBBInitializer(const int num_threads) :
_num_threads(num_threads),
_gc(tbb::global_control::max_allowed_parallelism, num_threads),
_global_observer(nullptr),
_cpus(),
_numa_node_to_cpu_id() {
HwTopology& topology = HwTopology::instance();
int num_numa_nodes = topology.num_numa_nodes();
DBG << "Initialize TBB with" << num_threads << "threads";
Expand All @@ -128,20 +123,36 @@ class TBBInitializer {
}
_global_observer = std::make_unique<ThreadPinningObserver>(_cpus);

_numa_node_to_cpu_id.clear();
_numa_node_to_cpu_id.resize(num_numa_nodes);
for ( const int cpu_id : _cpus ) {
int node = topology.numa_node_of_cpu(cpu_id);
ASSERT(node < static_cast<int>(_numa_node_to_cpu_id.size()));
_numa_node_to_cpu_id[node].push_back(cpu_id);
}

while( !_numa_node_to_cpu_id.empty() && _numa_node_to_cpu_id.back().empty() ) {
_numa_node_to_cpu_id.pop_back();
}
}

int _num_threads;
tbb::global_control _gc;
std::unique_ptr<ThreadPinningObserver> _global_observer;
bool terminate() {
if ( _global_observer ) {
_global_observer->observe(false);
}

// Waits until the last worker threads have finished, and
// then terminates the task scheduler.
oneapi::tbb::task_scheduler_handle handle(oneapi::tbb::attach{});
return oneapi::tbb::finalize(handle, std::nothrow_t{});
}

private:
explicit TBBInitializer() = default;

int _num_threads = 0;
std::optional<tbb::global_control> _gc;
std::unique_ptr<ThreadPinningObserver> _global_observer = nullptr;
std::vector<int> _cpus;
std::vector<std::vector<int>> _numa_node_to_cpu_id;
};
Expand Down
11 changes: 9 additions & 2 deletions python/module.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,14 +67,18 @@ namespace {
}

// Initialize TBB task arenas on numa nodes
mt_kahypar::TBBInitializer::instance(P);
mt_kahypar::TBBInitializer::instance().initialize(P);
// We set the membind policy to interleaved allocations in order to
// distribute allocations evenly across NUMA nodes
hwloc_cpuset_t cpuset = mt_kahypar::TBBInitializer::instance().used_cpuset();
mt_kahypar::parallel::HardwareTopology<>::instance().activate_interleaved_membind_policy(cpuset);
hwloc_bitmap_free(cpuset);
}

bool terminate_thread_pool() {
mt_kahypar::TBBInitializer::instance().terminate();
}

template<typename PartitionedHypergraph>
double imbalance(const PartitionedHypergraph& partitioned_graph) {
const mt_kahypar::HypernodeWeight perfectly_balanced_weight =
Expand Down Expand Up @@ -227,12 +231,15 @@ PYBIND11_MODULE(mtkahypar, m) {
.value("KM1", Objective::km1)
.value("SOED", Objective::soed);

// ####################### Initialize Thread Pool #######################
// ####################### Initialize/Terminate Thread Pool #######################

m.def("initializeThreadPool", &initialize_thread_pool,
"Initializes the thread pool with the given number of threads",
py::arg("number of threads"));

m.def("terminateThreadPool", &terminate_thread_pool,
"Terminates the thread pool.");

// ####################### Initialize Random Number Generator #######################

m.def("setSeed", [&](const int seed) {
Expand Down
3 changes: 3 additions & 0 deletions python/tests/test_mtkahypar.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ class MainTest(unittest.TestCase):
def setUp(self):
mtkahypar.initializeThreadPool(multiprocessing.cpu_count())

def tearDown(self):
mtkahypar.terminateThreadPool()

def test_set_partitioning_parameters_in_context(self):
context = mtkahypar.Context()
context.setPartitioningParameters(2, 0.03, mtkahypar.Objective.KM1)
Expand Down
4 changes: 4 additions & 0 deletions tests/interface/interface_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,12 @@

#include "gmock/gmock.h"

#include <chrono>
#include <new>
#include <thread>

#include <tbb/parallel_invoke.h>
#include <tbb/parallel_for.h>

#include "libmtkahypar.h"
#include "mt-kahypar/macros.h"
Expand Down Expand Up @@ -510,6 +513,7 @@ namespace mt_kahypar {
mt_kahypar_free_hypergraph(hypergraph);
mt_kahypar_free_partitioned_hypergraph(partitioned_hg);
mt_kahypar_free_target_graph(target_graph);
mt_kahypar_terminate_thread_pool();
}

mt_kahypar_context_t* context;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ class APoolInitialPartitionerTest : public Test {
}

static void SetUpTestSuite() {
TBBInitializer::instance(HardwareTopology::instance().num_cpus());
TBBInitializer::instance().initialize(HardwareTopology::instance().num_cpus());
}

Hypergraph hypergraph;
Expand Down
2 changes: 1 addition & 1 deletion tests/partition/refinement/advanced_rebalancer_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ class RebalancerTest : public Test {
context(),
gain_cache(),
rebalancer(nullptr) {
TBBInitializer::instance(std::thread::hardware_concurrency());
TBBInitializer::instance().initialize(std::thread::hardware_concurrency());
context.partition.mode = Mode::direct;
context.partition.epsilon = 0.05;
context.partition.k = Config::K;
Expand Down
2 changes: 1 addition & 1 deletion tests/partition/refinement/multitry_fm_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ class MultiTryFMTest : public Test {
gain_cache(),
refiner(nullptr),
metrics() {
TBBInitializer::instance(std::thread::hardware_concurrency());
TBBInitializer::instance().initialize(std::thread::hardware_concurrency());
context.partition.graph_filename = "../tests/instances/contracted_ibm01.hgr";
context.partition.graph_community_filename = "../tests/instances/contracted_ibm01.hgr.community";
context.partition.mode = Mode::direct;
Expand Down
2 changes: 1 addition & 1 deletion tests/run_tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
int main(int argc, char** argv) {
::testing::InitGoogleTest(&argc, argv);

mt_kahypar::TBBInitializer::instance(std::thread::hardware_concurrency());
mt_kahypar::TBBInitializer::instance().initialize(std::thread::hardware_concurrency());
const int result = RUN_ALL_TESTS();
mt_kahypar::TBBInitializer::instance().terminate();

Expand Down
2 changes: 1 addition & 1 deletion tools/one_to_one_mapping.cc
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ int main(int argc, char* argv[]) {
context.mapping.large_he_threshold = 0.0;

utils::Randomize::instance().setSeed(context.partition.seed);
TBBInitializer::instance(context.shared_memory.num_threads);
TBBInitializer::instance().initialize(context.shared_memory.num_threads);

// Read Hypergraph
Hypergraph hg = io::readInputFile<Hypergraph>(
Expand Down
2 changes: 1 addition & 1 deletion tools/verify_process_mapping_partition.cc
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ int main(int argc, char* argv[]) {
context.partition.epsilon = 0.03;
context.shared_memory.num_threads = std::thread::hardware_concurrency();
context.mapping.max_steiner_tree_size = 4;
TBBInitializer::instance(context.shared_memory.num_threads);
TBBInitializer::instance().initialize(context.shared_memory.num_threads);

// Read Hypergraph
Hypergraph hg = io::readInputFile<Hypergraph>(
Expand Down

0 comments on commit e8bac18

Please sign in to comment.