Skip to content

Commit

Permalink
fixed race condition if thread pinning is disabled
Browse files Browse the repository at this point in the history
  • Loading branch information
kittobi1992 committed Aug 23, 2023
1 parent 3267367 commit 7eb6d26
Show file tree
Hide file tree
Showing 22 changed files with 273 additions and 52 deletions.
2 changes: 1 addition & 1 deletion mt-kahypar/datastructures/contraction_tree.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -391,7 +391,7 @@ BatchVector ContractionTree::createBatchUncontractionHierarchyForVersion(BatchIn
parallel::scalable_vector<PQ> local_pqs(num_hardware_threads);
const parallel::scalable_vector<HypernodeID>& roots = roots_of_version(version);
tbb::parallel_for(UL(0), roots.size(), [&](const size_t i) {
const int cpu_id = SCHED_GETCPU;
const int cpu_id = THREAD_ID;
push_into_pq(local_pqs[cpu_id], roots[i]);
});

Expand Down
4 changes: 2 additions & 2 deletions mt-kahypar/datastructures/streaming_vector.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ class StreamingVector {

template <class ... Args>
void stream(Args&& ... args) {
int cpu_id = SCHED_GETCPU;
int cpu_id = THREAD_ID;
_cpu_buffer[cpu_id].emplace_back(std::forward<Args>(args)...);
}

Expand Down Expand Up @@ -171,7 +171,7 @@ class StreamingVector {
const int cpu_id,
const size_t position) {
DBG << "Copy buffer of cpu" << cpu_id << "of size" << _cpu_buffer[cpu_id].size()
<< "to position" << position << "in dest ( CPU =" << SCHED_GETCPU << " )";
<< "to position" << position << "in dest ( CPU =" << THREAD_ID << " )";
if (_cpu_buffer[cpu_id].empty()) {
return;
}
Expand Down
13 changes: 11 additions & 2 deletions mt-kahypar/macros.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,21 @@

#include <type_traits>

#if defined(MT_KAHYPAR_LIBRARY_MODE) || !defined(KAHYPAR_ENABLE_THREAD_PINNING)
#include "tbb/task_arena.h"
// If we use the C or Python interface or thread pinning is disabled, the cpu ID to
// which the current thread is assigned to is not unique. We therefore use the slot index
// of the current task arena as unique thread ID. Note that the ID can be negative if
// the task scheduler is not initialized.
#define THREAD_ID std::max(0, tbb::this_task_arena::current_thread_index())
#else
#ifdef __linux__
#include <sched.h>
#define SCHED_GETCPU sched_getcpu()
#define THREAD_ID sched_getcpu()
#elif _WIN32
#include <processthreadsapi.h>
#define SCHED_GETCPU GetCurrentProcessorNumber()
#define THREAD_ID GetCurrentProcessorNumber()
#endif
#endif

#include "kahypar-resources/macros.h"
Expand Down
6 changes: 3 additions & 3 deletions mt-kahypar/parallel/thread_pinning_observer.h
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ class ThreadPinningObserver : public tbb::task_scheduler_observer {
DBG << pin_thread_message(_cpus[slot]);
if(!_is_global_thread_pool) {
std::thread::id thread_id = std::this_thread::get_id();
int current_cpu = SCHED_GETCPU;
int current_cpu = THREAD_ID;
std::lock_guard<std::mutex> lock(_mutex);
_cpu_before[thread_id] = current_cpu;
}
Expand Down Expand Up @@ -189,7 +189,7 @@ class ThreadPinningObserver : public tbb::task_scheduler_observer {
"Failed to set thread affinity to cpu" + std::to_string(cpu_id) + "." + strerror(error));
}

ASSERT(SCHED_GETCPU == cpu_id);
ASSERT(THREAD_ID == cpu_id);
DBG << "Thread with PID" << std::this_thread::get_id()
<< "successfully pinned to CPU" << cpu_id;
}
Expand All @@ -209,7 +209,7 @@ class ThreadPinningObserver : public tbb::task_scheduler_observer {
std::string unpin_thread_message() {
std::stringstream ss;
ss << "Unassign thread with PID " << std::this_thread::get_id()
<< " on CPU " << SCHED_GETCPU;
<< " on CPU " << THREAD_ID;
if ( _numa_node != -1 ) {
ss << " from NUMA node " << _numa_node;
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ class MultilevelVertexPairRater {
fillRatingMap(hypergraph, u, tmp_ratings, cluster_ids);
}

int cpu_id = SCHED_GETCPU;
int cpu_id = THREAD_ID;
const HypernodeWeight weight_u = cluster_weight[u];
const PartitionID community_u_id = hypergraph.communityID(u);
RatingType max_rating = std::numeric_limits<RatingType>::min();
Expand Down
2 changes: 1 addition & 1 deletion mt-kahypar/partition/coarsening/nlevel_vertex_pair_rater.h
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ class NLevelVertexPairRater {
fillRatingMap(hypergraph, u, tmp_ratings);
}

int cpu_id = SCHED_GETCPU;
int cpu_id = THREAD_ID;
const HypernodeWeight weight_u = hypergraph.nodeWeight(u);
const PartitionID community_u_id = hypergraph.communityID(u);
RatingType max_rating = std::numeric_limits<RatingType>::min();
Expand Down
4 changes: 2 additions & 2 deletions mt-kahypar/partition/mapping/greedy_mapping.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ HypernodeID get_node_with_minimum_weighted_degree(const ds::StaticGraph& graph)
ASSERT(min_nodes.size() > 0);
return min_nodes.size() == 1 ? min_nodes[0] :
min_nodes[utils::Randomize::instance().getRandomInt(
0, static_cast<int>(min_nodes.size() - 1), SCHED_GETCPU)];
0, static_cast<int>(min_nodes.size() - 1), THREAD_ID)];
}

template<typename CommunicationHypergraph>
Expand Down Expand Up @@ -201,7 +201,7 @@ void compute_greedy_mapping(CommunicationHypergraph& communication_hg,
ASSERT(tie_breaking.size() > 0);
const PartitionID best_process = tie_breaking.size() == 1 ? tie_breaking[0] :
tie_breaking[utils::Randomize::instance().getRandomInt(
0, static_cast<int>(tie_breaking.size() - 1), SCHED_GETCPU)];
0, static_cast<int>(tie_breaking.size() - 1), THREAD_ID)];
actual_objective += best_rating;
assign(u, best_process);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ class GainComputationBase {
PartitionID from = phg.partID(hn);
Move best_move { from, from, hn, rebalance ? std::numeric_limits<Gain>::max() : 0 };
HypernodeWeight hn_weight = phg.nodeWeight(hn);
int cpu_id = SCHED_GETCPU;
int cpu_id = THREAD_ID;
utils::Randomize& rand = utils::Randomize::instance();
auto test_and_apply = [&](const PartitionID to,
const Gain score,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ namespace mt_kahypar {
bool converged = true;
if ( _context.refinement.label_propagation.execute_sequential ) {
utils::Randomize::instance().shuffleVector(
_active_nodes, UL(0), _active_nodes.size(), SCHED_GETCPU);
_active_nodes, UL(0), _active_nodes.size(), THREAD_ID);

for ( size_t j = 0; j < _active_nodes.size(); ++j ) {
const HypernodeID hn = _active_nodes[j];
Expand Down
12 changes: 6 additions & 6 deletions mt-kahypar/utils/randomize.h
Original file line number Diff line number Diff line change
Expand Up @@ -136,15 +136,15 @@ class Randomize {
template <typename T>
void shuffleVector(std::vector<T>& vector, int cpu_id = -1) {
if (cpu_id == -1)
cpu_id = SCHED_GETCPU;
cpu_id = THREAD_ID;
ASSERT(cpu_id < (int)std::thread::hardware_concurrency());
std::shuffle(vector.begin(), vector.end(), _rand[cpu_id].getGenerator());
}

template <typename T>
void shuffleVector(parallel::scalable_vector<T>& vector, int cpu_id = -1) {
if (cpu_id == -1)
cpu_id = SCHED_GETCPU;
cpu_id = THREAD_ID;
ASSERT(cpu_id < (int)std::thread::hardware_concurrency());
std::shuffle(vector.begin(), vector.end(), _rand[cpu_id].getGenerator());
}
Expand Down Expand Up @@ -184,14 +184,14 @@ class Randomize {
tbb::parallel_for(UL(0), P, [&](const size_t k) {
const size_t start = i + k * step;
const size_t end = i + (k == P - 1 ? N : (k + 1) * step);
localizedShuffleVector(vector, start, end, SCHED_GETCPU);
localizedShuffleVector(vector, start, end, THREAD_ID);
});
} else {
// Compute blocks that should be swapped before
// random shuffling
parallel::scalable_vector<SwapBlock> swap_blocks;
parallel::scalable_vector<bool> matched_blocks(P, false);
int cpu_id = SCHED_GETCPU;
int cpu_id = THREAD_ID;
for ( size_t a = 0; a < P; ++a ) {
if ( !matched_blocks[a] ) {
matched_blocks[a] = true;
Expand All @@ -212,7 +212,7 @@ class Randomize {
const size_t end_1 = i + (block_1 == P - 1 ? N : (block_1 + 1) * step);
const size_t start_2 = i + block_2 * step;
const size_t end_2 = i + (block_2 == P - 1 ? N : (block_2 + 1) * step);
const int cpu_id = SCHED_GETCPU;
const int cpu_id = THREAD_ID;
swapBlocks(vector, start_1, end_1, start_2, end_2);
std::shuffle(vector.begin() + start_1, vector.begin() + end_1, _rand[cpu_id].getGenerator());
std::shuffle(vector.begin() + start_2, vector.begin() + end_2, _rand[cpu_id].getGenerator());
Expand All @@ -238,7 +238,7 @@ class Randomize {
}

std::mt19937& getGenerator() {
int cpu_id = SCHED_GETCPU;
int cpu_id = THREAD_ID;
return _rand[cpu_id].getGenerator();
}

Expand Down
18 changes: 9 additions & 9 deletions tests/datastructures/nlevel_smoke_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ void verifyGainCache(PartitionedHypergraph& partitioned_hypergraph,
HyperedgeWeight expected_gain = 0;
for ( const HypernodeID& hn : partitioned_hypergraph.nodes() ) {
const PartitionID from = partitioned_hypergraph.partID(hn);
PartitionID to = rand.getRandomInt(0, k - 1, SCHED_GETCPU);
PartitionID to = rand.getRandomInt(0, k - 1, THREAD_ID);
if ( from == to ) to = (to + 1) % k;
expected_gain += gain_cache.gain(hn, from, to);
partitioned_hypergraph.changeNodePart(gain_cache, hn, from, to);
Expand Down Expand Up @@ -176,16 +176,16 @@ Hypergraph generateRandomHypergraph(const HypernodeID num_hypernodes,
parallel::scalable_vector<HypernodeID> net;
if constexpr ( Hypergraph::is_graph ) {
unused(max_edge_size);
std::pair<HypernodeID, HypernodeID> edge{rand.getRandomInt(0, num_hypernodes - 1, SCHED_GETCPU),
rand.getRandomInt(0, num_hypernodes - 1, SCHED_GETCPU)};
std::pair<HypernodeID, HypernodeID> edge{rand.getRandomInt(0, num_hypernodes - 1, THREAD_ID),
rand.getRandomInt(0, num_hypernodes - 1, THREAD_ID)};
graph_edges.insert({edge});
graph_edges.insert({edge.first, edge.second});
net.push_back(edge.first);
net.push_back(edge.second);
} else {
const size_t edge_size = rand.getRandomInt(2, max_edge_size, SCHED_GETCPU);
const size_t edge_size = rand.getRandomInt(2, max_edge_size, THREAD_ID);
for ( size_t i = 0; i < edge_size; ++i ) {
const HypernodeID pin = rand.getRandomInt(0, num_hypernodes - 1, SCHED_GETCPU);
const HypernodeID pin = rand.getRandomInt(0, num_hypernodes - 1, THREAD_ID);
if ( std::find(net.begin(), net.end(), pin) == net.end() ) {
net.push_back(pin);
}
Expand All @@ -205,9 +205,9 @@ void addRandomFixedVertices(Hypergraph& hypergraph,
utils::Randomize& rand = utils::Randomize::instance();
const int threshold = percentage_fixed_vertices * 1000;
for ( const HypernodeID& hn : hypergraph.nodes() ) {
const bool is_fixed = rand.getRandomInt(0, 1000, SCHED_GETCPU) <= threshold;
const bool is_fixed = rand.getRandomInt(0, 1000, THREAD_ID) <= threshold;
if ( is_fixed ) {
fixed_vertices.fixToBlock(hn, rand.getRandomInt(0, k - 1, SCHED_GETCPU));
fixed_vertices.fixToBlock(hn, rand.getRandomInt(0, k - 1, THREAD_ID));
}
}
hypergraph.addFixedVertexSupport(std::move(fixed_vertices));
Expand All @@ -222,7 +222,7 @@ BatchVector generateRandomContractions(const HypernodeID num_hypernodes,
parallel::scalable_vector<HypernodeID> active_hns(num_hypernodes);
std::iota(active_hns.begin(), active_hns.end(), 0);
utils::Randomize& rand = utils::Randomize::instance();
const int cpu_id = SCHED_GETCPU;
const int cpu_id = THREAD_ID;
while ( tmp_num_contractions > 0 ) {
HypernodeID current_num_contractions = tmp_num_contractions;
if ( multi_versioned && current_num_contractions > 25 ) current_num_contractions /= 2;
Expand Down Expand Up @@ -251,7 +251,7 @@ void generateRandomPartition(PartitionedHypergraph& partitioned_hypergraph) {
if ( partitioned_hypergraph.isFixed(hn) ) {
partitioned_hypergraph.setOnlyNodePart(hn, partitioned_hypergraph.fixedVertexBlock(hn));
} else {
partitioned_hypergraph.setOnlyNodePart(hn, rand.getRandomInt(0, k - 1, SCHED_GETCPU));
partitioned_hypergraph.setOnlyNodePart(hn, rand.getRandomInt(0, k - 1, THREAD_ID));
}
});
}
Expand Down
4 changes: 2 additions & 2 deletions tests/datastructures/partitioned_hypergraph_smoke_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ class AConcurrentHypergraph : public Test {
underlying_hypergraph(),
hypergraph()
{
int cpu_id = SCHED_GETCPU;
int cpu_id = THREAD_ID;
underlying_hypergraph = io::readInputFile<Hypergraph>(
"../tests/instances/contracted_ibm01.hgr", FileFormat::hMetis, true);
hypergraph = PartitionedHypergraph(k, underlying_hypergraph, parallel_tag_t());
Expand Down Expand Up @@ -151,7 +151,7 @@ void moveAllNodesOfHypergraphRandom(HyperGraph& hypergraph,
HyperedgeWeight metric_before = metrics::quality(hypergraph, objective);
HighResClockTimepoint start = std::chrono::high_resolution_clock::now();
tbb::parallel_for(ID(0), hypergraph.initialNumNodes(), [&](const HypernodeID& hn) {
int cpu_id = SCHED_GETCPU;
int cpu_id = THREAD_ID;
const PartitionID from = hypergraph.partID(hn);
PartitionID to = -1;
while (to == -1 || to == from) {
Expand Down
11 changes: 11 additions & 0 deletions tests/interface/interface_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -937,4 +937,15 @@ namespace mt_kahypar {
ImprovePartition(QUALITY, 1, false);
verifyFixedVertexAssignment(HYPERGRAPH_FIX_FILE);
}

TEST_F(APartitioner, PartitionsManyHypergraphsInParallel) {
std::atomic<size_t> cnt(0);
size_t max_runs = 100;
tbb::parallel_for(0U, std::thread::hardware_concurrency(), [&](const int id) {
while ( cnt.load(std::memory_order_relaxed) < max_runs ) {
++cnt;
PartitionAnotherHypergraph("test_instances/test_instance.hgr", HMETIS, DEFAULT, 4, 0.03, KM1, false);
}
});
}
}
Loading

0 comments on commit 7eb6d26

Please sign in to comment.