Skip to content

Commit

Permalink
feat(fm): implement minimal parallelism option
Browse files Browse the repository at this point in the history
  • Loading branch information
DanielSeemaier committed Sep 24, 2024
1 parent 9124cf8 commit 3777844
Show file tree
Hide file tree
Showing 6 changed files with 33 additions and 1 deletion.
5 changes: 5 additions & 0 deletions kaminpar-cli/kaminpar_arguments.cc
Original file line number Diff line number Diff line change
Expand Up @@ -528,6 +528,9 @@ CLI::Option_group *create_kway_fm_refinement_options(CLI::App *app, Context &ctx
)
->capture_default_str();

fm->add_option("--r-fm-minimal-parallelism-pm", ctx.refinement.kway_fm.minimal_parallelism)
->capture_default_str();

// Flags for debugging / analysis
fm->add_flag(
"--r-fm-dbg-batch-stats",
Expand All @@ -536,6 +539,8 @@ CLI::Option_group *create_kway_fm_refinement_options(CLI::App *app, Context &ctx
"compute on some irregular graphs)."
)
->capture_default_str();
fm->add_flag("--r-fm-dbg-report-progress", ctx.refinement.kway_fm.dbg_report_progress)
->capture_default_str();

return fm;
}
Expand Down
3 changes: 3 additions & 0 deletions kaminpar-shm/kaminpar.h
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,10 @@ struct KwayFMRefinementContext {
EdgeID constant_high_degree_threshold;
double k_based_high_degree_threshold;

int minimal_parallelism;

bool dbg_compute_batch_stats;
bool dbg_report_progress;
};

struct JetRefinementContext {
Expand Down
3 changes: 3 additions & 0 deletions kaminpar-shm/presets.cc
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,10 @@ Context create_default_context() {
.constant_high_degree_threshold = 0,
.k_based_high_degree_threshold = 1.0,

.minimal_parallelism = std::numeric_limits<int>::max(),

.dbg_compute_batch_stats = false,
.dbg_report_progress = false,
},
.balancer = {},
.jet =
Expand Down
4 changes: 4 additions & 0 deletions kaminpar-shm/refinement/fm/border_nodes.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,10 @@ template <typename GainCache> class BorderNodes {
return _next_border_node < _border_nodes.size();
}

[[nodiscard]] std::size_t remaining() const {
return _border_nodes.size() - std::min<std::size_t>(_border_nodes.size(), _next_border_node);
}

[[nodiscard]] std::size_t size() const {
return _border_nodes.size();
}
Expand Down
18 changes: 17 additions & 1 deletion kaminpar-shm/refinement/fm/fm_refiner.cc
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ template <typename GainCache> struct SharedData {
StaticArray<std::size_t> shared_pq_handles;
StaticArray<BlockID> target_blocks;
GlobalStatistics stats;

std::atomic<std::uint8_t> abort = 0;
};

template <typename Graph, typename GainCache> class LocalizedFMRefiner {
Expand Down Expand Up @@ -125,7 +127,10 @@ template <typename Graph, typename GainCache> class LocalizedFMRefiner {
EdgeWeight current_total_gain = 0;
EdgeWeight best_total_gain = 0;

while (update_block_pq() && !_stopping_policy.should_stop()) {
int steps = 0;

while (update_block_pq() && !_stopping_policy.should_stop() &&
((++steps %= 64) || !_shared.abort.load(std::memory_order_relaxed))) {
const BlockID block_from = _block_pq.peek_id();
KASSERT(block_from < _p_graph.k());

Expand Down Expand Up @@ -509,6 +514,9 @@ class FMRefinerCore : public Refiner {
} else {
START_TIMER("Localized searches, coarse level");
}

std::atomic<int> num_finished_workers = 0;

tbb::parallel_for<int>(0, _ctx.parallel.num_threads, [&](int) {
auto &expected_gain = expected_gain_ets.local();
auto &localized_refiner = *localized_fm_refiner_ets.local();
Expand All @@ -517,6 +525,10 @@ class FMRefinerCore : public Refiner {
// that are still available, continuing this process until there are
// no more border nodes
while (_shared->border_nodes.has_more()) {
if (_fm_ctx.dbg_report_progress) {
LLOG << " " << _shared->border_nodes.remaining();
}

const auto expected_batch_gain = localized_refiner.run_batch();
expected_gain += expected_batch_gain;

Expand All @@ -529,6 +541,10 @@ class FMRefinerCore : public Refiner {
)
);
}

if (++num_finished_workers >= _fm_ctx.minimal_parallelism) {
_shared->abort = 1;
}
});
STOP_TIMER();

Expand Down
1 change: 1 addition & 0 deletions kaminpar-shm/refinement/fm/stopping_policies.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#pragma once

#include <cmath>
#include <iostream>

#include "kaminpar-shm/kaminpar.h"

Expand Down

0 comments on commit 3777844

Please sign in to comment.