Skip to content

Commit

Permalink
[FM refactoring] remove FM version running on shared partition
Browse files Browse the repository at this point in the history
  • Loading branch information
larsgottesbueren committed Sep 4, 2023
1 parent f423340 commit 5b9bcfb
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 175 deletions.
9 changes: 1 addition & 8 deletions mt-kahypar/partition/refinement/fm/fm_commons.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ struct GlobalMoveTracker {
}
}

MoveID insertMove(Move &m) {
MoveID insertMove(const Move &m) {
const MoveID move_id = runningMoveID.fetch_add(1, std::memory_order_relaxed);
assert(move_id - firstMoveID < moveOrder.size());
moveOrder[move_id - firstMoveID] = m;
Expand Down Expand Up @@ -253,10 +253,6 @@ struct FMSharedData {
CAtomic<size_t> finishedTasks;
size_t finishedTasksLimit = std::numeric_limits<size_t>::max();

// ! Switch to applying moves directly if the use of local delta partitions exceeded a memory limit
bool deltaExceededMemoryConstraints = false;
size_t deltaMemoryLimitPerThread = 0;

bool release_nodes = true;
bool perform_moves_global = true;

Expand All @@ -270,9 +266,6 @@ struct FMSharedData {
unconstrained() {
finishedTasks.store(0, std::memory_order_relaxed);

// 128 * 3/2 GB --> roughly 1.5 GB per thread on our biggest machine
deltaMemoryLimitPerThread = 128UL * (UL(1) << 30) * 3 / ( 2 * std::max(UL(1), numThreads) );

tbb::parallel_invoke([&] {
moveTracker.moveOrder.resize(numNodes);
}, [&] {
Expand Down
194 changes: 40 additions & 154 deletions mt-kahypar/partition/refinement/fm/localized_kway_fm_core.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,26 +49,13 @@ namespace mt_kahypar {
}

if (runStats.pushes > 0) {
if (sharedData.deltaExceededMemoryConstraints) {
deltaPhg.dropMemory();
delta_gain_cache.dropMemory();
}

if (context.refinement.fm.perform_moves_global || sharedData.deltaExceededMemoryConstraints) {
if ( phg.hasFixedVertices() ) {
internalFindMoves<false, true>(phg, fm_strategy);
} else {
internalFindMoves<false, false>(phg, fm_strategy);
}
deltaPhg.clear();
deltaPhg.setPartitionedHypergraph(&phg);
delta_gain_cache.clear();
if ( phg.hasFixedVertices() ) {
internalFindMoves<true>(phg, fm_strategy);
} else {
deltaPhg.clear();
delta_gain_cache.clear();
deltaPhg.setPartitionedHypergraph(&phg);
if ( phg.hasFixedVertices() ) {
internalFindMoves<true, true>(phg, fm_strategy);
} else {
internalFindMoves<true, false>(phg, fm_strategy);
}
internalFindMoves<false>(phg, fm_strategy);
}
return true;
} else {
Expand Down Expand Up @@ -126,30 +113,12 @@ namespace mt_kahypar {


template<typename TypeTraits, typename GainTypes>
template<bool use_delta, bool has_fixed_vertices, typename DispatchedFMStrategy>
template<bool has_fixed_vertices, typename DispatchedFMStrategy>
void LocalizedKWayFM<TypeTraits, GainTypes>::internalFindMoves(PartitionedHypergraph& phg,
DispatchedFMStrategy& fm_strategy) {
StopRule stopRule(phg.initialNumNodes());
Move move;

auto delta_func = [&](const SynchronizedEdgeUpdate& sync_update) {
// Gains of the pins of a hyperedge can only change in the following situations.
if ( GainCache::triggersDeltaGainUpdate(sync_update) ) {
edgesWithGainChanges.push_back(sync_update.he);
}

if constexpr (use_delta) {
fm_strategy.deltaGainUpdates(deltaPhg, delta_gain_cache, sync_update);
} else {
fm_strategy.deltaGainUpdates(phg, gain_cache, sync_update);
}
};

// we can almost make this function take a generic partitioned hypergraph
// we would have to add the success func to the interface of DeltaPhg (and then ignore it there...)
// and do the local rollback outside this function

size_t bestImprovementIndex = 0;
Gain estimatedImprovement = 0;
Gain bestImprovement = 0;

Expand All @@ -159,11 +128,7 @@ namespace mt_kahypar {
while (!stopRule.searchShouldStop()
&& sharedData.finishedTasks.load(std::memory_order_relaxed) < sharedData.finishedTasksLimit) {

if constexpr (use_delta) {
if (!fm_strategy.findNextMove(deltaPhg, delta_gain_cache, move)) break;
} else {
if (!fm_strategy.findNextMove(phg, gain_cache, move)) break;
}
if (!fm_strategy.findNextMove(deltaPhg, delta_gain_cache, move)) break;
sharedData.nodeTracker.deactivateNode(move.node, thisSearch);

// skip if no target block available
Expand All @@ -178,43 +143,34 @@ namespace mt_kahypar {
if (!expect_improvement && high_deg) {
continue;
}
// less restrictive option: skip if negative gain (or < -5000 or smth).
// downside: have to flush before improvement or run it through deltaPhg
// probably quite similar since this only really matters in the first few moves where the stop rule
// doesn't signal us to stop yet

edgesWithGainChanges.clear(); // clear before move. delta_func feeds nets of moved vertex.
MoveID move_id = std::numeric_limits<MoveID>::max();
bool moved = false;
const HypernodeWeight allowed_weight = DispatchedFMStrategy::is_unconstrained ? std::numeric_limits<HypernodeWeight>::max()
: context.partition.max_part_weights[move.to];
if constexpr (use_delta) {
heaviestPartWeight = heaviestPartAndWeight(deltaPhg, context.partition.k).second;
fromWeight = deltaPhg.partWeight(move.from);
toWeight = deltaPhg.partWeight(move.to);
if (expect_improvement) {
// since we will flush the move sequence, don't bother running it through the deltaPhg
// this is intended to allow moving high deg nodes (blow up hash tables) if they give an improvement.
// The nets affected by a gain cache update are collected when we apply this improvement on the
// global partition (used to expand the localized search and update the gain values).
moved = toWeight + phg.nodeWeight(move.node) <= allowed_weight;
} else {
moved = deltaPhg.changeNodePart(move.node, move.from, move.to, allowed_weight, delta_func);
fm_strategy.applyMove(deltaPhg, delta_gain_cache, move, false);
}

heaviestPartWeight = heaviestPartAndWeight(deltaPhg, context.partition.k).second;
fromWeight = deltaPhg.partWeight(move.from);
toWeight = deltaPhg.partWeight(move.to);
if (expect_improvement) {
// since we will flush the move sequence, don't bother running it through the deltaPhg
// this is intended to allow moving high deg nodes (blow up hash tables) if they give an improvement.
// The nets affected by a gain cache update are collected when we apply this improvement on the
// global partition (used to expand the localized search and update the gain values).
moved = toWeight + phg.nodeWeight(move.node) <= allowed_weight;
} else {
heaviestPartWeight = heaviestPartAndWeight(phg, context.partition.k).second;
fromWeight = phg.partWeight(move.from);
toWeight = phg.partWeight(move.to);
moved = phg.changeNodePart(move.node, move.from, move.to, allowed_weight,
[&] { move_id = sharedData.moveTracker.insertMove(move); }, delta_func);
fm_strategy.applyMove(phg, gain_cache, move, true);
moved = deltaPhg.changeNodePart(move.node, move.from, move.to, allowed_weight,
[&](const SynchronizedEdgeUpdate& sync_update) {
if (GainCache::triggersDeltaGainUpdate(sync_update)) {
edgesWithGainChanges.push_back(sync_update.he);
}
delta_gain_cache.deltaGainUpdate(deltaPhg, sync_update);
});
fm_strategy.applyMove(deltaPhg, delta_gain_cache, move, false);
}

if (moved) {
if (DispatchedFMStrategy::is_unconstrained && sharedData.unconstrained.isRebalancingNode(move.node)) {
runStats.rebalancing_node_moves++; // TODO: remove
}
runStats.moves++;
estimatedImprovement += move.gain;
localMoves.emplace_back(move, move_id);
Expand All @@ -224,17 +180,21 @@ namespace mt_kahypar {
&& fromWeight == heaviestPartWeight
&& toWeight + phg.nodeWeight(move.node) < heaviestPartWeight;
if (improved_km1 || improved_balance_less_equal_km1) {
// Apply move sequence to global partition
for (size_t i = 0; i < localMoves.size(); ++i) {
const Move& local_move = localMoves[i].first;
phg.changeNodePart(
gain_cache, local_move.node, local_move.from, local_move.to,
std::numeric_limits<HypernodeWeight>::max(),
[&] { sharedData.moveTracker.insertMove(local_move); },
[&](const SynchronizedEdgeUpdate& ) {});
}
localMoves.clear();
fm_strategy.flushLocalChanges();
stopRule.reset();
deltaPhg.clear(); // clear hashtables, save memory :)
delta_gain_cache.clear();
bestImprovement = estimatedImprovement;
bestImprovementIndex = localMoves.size();

if constexpr (use_delta) {
applyBestLocalPrefixToSharedPartition(phg, fm_strategy, bestImprovementIndex);
bestImprovementIndex = 0;
localMoves.clear();
deltaPhg.clear(); // clear hashtables, save memory :)
delta_gain_cache.clear();
}
}

// no need to update our PQs if we stop anyways
Expand All @@ -243,20 +203,8 @@ namespace mt_kahypar {
break;
}

if constexpr (use_delta) {
acquireOrUpdateNeighbors<has_fixed_vertices>(deltaPhg, delta_gain_cache, move, fm_strategy);
} else {
acquireOrUpdateNeighbors<has_fixed_vertices>(phg, gain_cache, move, fm_strategy);
}
acquireOrUpdateNeighbors<has_fixed_vertices>(deltaPhg, delta_gain_cache, move, fm_strategy);
}

}

if constexpr (use_delta) {
// in this case there is no improved local prefix to apply (was already applied in the loop)
ASSERT(bestImprovementIndex == 0);
} else {
revertToBestLocalPrefix(phg, fm_strategy, bestImprovementIndex);
}

fm_strategy.reset();
Expand All @@ -265,68 +213,6 @@ namespace mt_kahypar {
}


template<typename TypeTraits, typename GainTypes>
template<typename DispatchedFMStrategy>
void LocalizedKWayFM<TypeTraits, GainTypes>::applyBestLocalPrefixToSharedPartition(
PartitionedHypergraph& phg,
DispatchedFMStrategy& fm_strategy,
const size_t best_index_locally_observed) {
// Note: if this precondition does not hold, the call to fm_strategy.flushLocalChanges() would be incorrect
ASSERT(best_index_locally_observed == localMoves.size());

bool is_last_move = false;

auto delta_gain_func = [&](const SynchronizedEdgeUpdate& sync_update) {
// Gains of the pins of a hyperedge can only change in the following situations.
if ( is_last_move && GainCache::triggersDeltaGainUpdate(sync_update) ) {
// This vector is used by the acquireOrUpdateNeighbor function to expand to neighbors
// or update the gain values of neighbors of the moved node and is cleared afterwards.
// BEWARE. Adding the nets at this stage works, because the vector is cleared before the move,
// and the expansion happens after applyBestLocalPrefixToSharedPartition.
edgesWithGainChanges.push_back(sync_update.he);
}
};

// Apply move sequence to original hypergraph
for (size_t i = 0; i < best_index_locally_observed; ++i) {
ASSERT(i < localMoves.size());
Move& local_move = localMoves[i].first;
MoveID& move_id = localMoves[i].second;
// In a localized FM search, we apply all moves to a thread-local partition (delta_phg)
// using hash tables. Once we find an improvement, we apply the corresponding move
// sequence to the global partition. To save memory (in the hash tables), we do not apply
// the last move that leads to the improvement to the thread-local partition as we reset them after
// an improvement is found, anyways. However, when applying a move on the thread-local partition,
// we collect all nets affected by a gain cache update and expand the search to pins
// contained in these nets. Since, we do not apply last move on the thread-local partition we collect
// these nets here.
is_last_move = (i == best_index_locally_observed - 1);

phg.changeNodePart(
gain_cache, local_move.node, local_move.from, local_move.to,
std::numeric_limits<HypernodeWeight>::max(),
[&] { move_id = sharedData.moveTracker.insertMove(local_move); },
delta_gain_func);
ASSERT(move_id != std::numeric_limits<MoveID>::max());
}
fm_strategy.flushLocalChanges();
}

template<typename TypeTraits, typename GainTypes>
template<typename DispatchedFMStrategy>
void LocalizedKWayFM<TypeTraits, GainTypes>::revertToBestLocalPrefix(PartitionedHypergraph& phg,
DispatchedFMStrategy& fm_strategy,
size_t bestGainIndex) {
runStats.local_reverts += localMoves.size() - bestGainIndex;
while (localMoves.size() > bestGainIndex) {
Move& m = sharedData.moveTracker.getMove(localMoves.back().second);
phg.changeNodePart(gain_cache, m.node, m.to, m.from);
fm_strategy.revertMove(phg, gain_cache, m, true);
m.invalidate();
localMoves.pop_back();
}
}

template<typename TypeTraits, typename GainTypes>
void LocalizedKWayFM<TypeTraits, GainTypes>::changeNumberOfBlocks(const PartitionID new_k) {
deltaPhg.changeNumberOfBlocks(new_k);
Expand Down
15 changes: 2 additions & 13 deletions mt-kahypar/partition/refinement/fm/localized_kway_fm_core.h
Original file line number Diff line number Diff line change
Expand Up @@ -88,25 +88,14 @@ class LocalizedKWayFM {
FMStats stats;

private:
template<bool use_delta, bool has_fixed_vertices, typename DispatchedFMStrategy>
template<bool has_fixed_vertices, typename DispatchedFMStrategy>
void internalFindMoves(PartitionedHypergraph& phg, DispatchedFMStrategy& fm_strategy);

template<bool has_fixed_vertices, typename PHG, typename CACHE, typename DispatchedFMStrategy>
MT_KAHYPAR_ATTRIBUTE_ALWAYS_INLINE
void acquireOrUpdateNeighbors(PHG& phg, CACHE& gain_cache, const Move& move, DispatchedFMStrategy& fm_strategy);


// ! Makes moves applied on delta hypergraph visible on the global partitioned hypergraph.
template<typename DispatchedFMStrategy>
void applyBestLocalPrefixToSharedPartition(PartitionedHypergraph& phg,
DispatchedFMStrategy& fm_strategy,
const size_t best_index_locally_observed);

// ! Rollback to the best improvement found during local search in case we applied moves
// ! directly on the global partitioned hypergraph.
template<typename DispatchedFMStrategy>
void revertToBestLocalPrefix(PartitionedHypergraph& phg, DispatchedFMStrategy& fm_strategy, size_t bestGainIndex);

private:

const Context& context;
Expand Down Expand Up @@ -147,4 +136,4 @@ class LocalizedKWayFM {
vec<VertexPriorityQueue> vertexPQs;
};

}
}

0 comments on commit 5b9bcfb

Please sign in to comment.