Skip to content

Commit

Permalink
Merge branch 'shm/feat/hugepages'
Browse files Browse the repository at this point in the history
  • Loading branch information
DanielSeemaier committed Apr 29, 2024
2 parents 29c7187 + 6e9516d commit c7bc3dc
Show file tree
Hide file tree
Showing 11 changed files with 109 additions and 62 deletions.
9 changes: 9 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ option(KAMINPAR_ENABLE_STATISTICS "Generate and output detailed statistics." OFF
option(KAMINPAR_ENABLE_TIMERS "Measure running times. Must be set to 'OFF' if the library interface is used from multiple threads simulatinously." ON)
option(KAMINPAR_ENABLE_TIMER_BARRIERS "Add additional MPI_Barrier() instructions for more accurate time measurements." ON)

option(KAMINPAR_ENABLE_THP "Use transparent huge pages for large memory allocations." ON)

option(KAMINPAR_BUILD_WITH_ASAN "Enable address sanitizer." OFF)
option(KAMINPAR_BUILD_WITH_UBSAN "Enable undefined behaviour sanitizer." OFF)
option(KAMINPAR_BUILD_WITH_MTUNE_NATIVE "Build with -mtune=native." ON)
Expand Down Expand Up @@ -203,6 +205,13 @@ else ()
message(STATUS "Timer barriers: disabled")
endif ()

if (KAMINPAR_ENABLE_THP)
list(APPEND KAMINPAR_DEFINITIONS "-DKAMINPAR_ENABLE_THP")
message(STATUS "Huge pages: enabled")
else ()
message(STATUS "Huge pages: disabled")
endif ()

message(STATUS "Graph compression summary:")

if (KAMINPAR_COMPRESSION_HIGH_DEGREE_ENCODING)
Expand Down
2 changes: 1 addition & 1 deletion apps/io/shm_compressed_graph_binary.cc
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ template <typename T>
static StaticArray<T> read_static_array(std::ifstream &in, const std::size_t size) {
T *ptr = static_cast<T *>(std::malloc(sizeof(T) * size));
in.read(reinterpret_cast<char *>(ptr), sizeof(T) * size);
return StaticArray<T>(ptr, size);
return StaticArray<T>(size, ptr);
}

CompressedGraph read(const std::string &filename) {
Expand Down
8 changes: 4 additions & 4 deletions apps/io/shm_io.cc
Original file line number Diff line number Diff line change
Expand Up @@ -158,15 +158,15 @@ template <bool checked> CSRGraph csr_read(const std::string &filename, const boo
store_node_weights = format.has_node_weights;
store_edge_weights = format.has_edge_weights;

nodes.resize(format.number_of_nodes + 1);
edges.resize(format.number_of_edges * 2);
nodes.resize(format.number_of_nodes + 1, static_array::noinit);
edges.resize(format.number_of_edges * 2, static_array::noinit);

if (store_node_weights) {
node_weights.resize(format.number_of_nodes);
node_weights.resize(format.number_of_nodes, static_array::noinit);
}

if (store_edge_weights) {
edge_weights.resize(format.number_of_edges * 2);
edge_weights.resize(format.number_of_edges * 2, static_array::noinit);
}
},
[&](const std::uint64_t weight) {
Expand Down
5 changes: 2 additions & 3 deletions kaminpar-common/datastructures/fast_reset_array.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
#pragma once

#include <utility>
#include <vector>

#include "kaminpar-common/assert.h"
#include "kaminpar-common/datastructures/scalable_vector.h"
Expand Down Expand Up @@ -63,7 +62,7 @@ template <typename Value, typename Size = std::size_t> class FastResetArray {
return _data[pos] != Value();
}

[[nodiscard]] std::vector<size_type> &used_entry_ids() {
[[nodiscard]] scalable_vector<size_type> &used_entry_ids() {
return _used_entries;
}

Expand Down Expand Up @@ -118,7 +117,7 @@ template <typename Value, typename Size = std::size_t> class FastResetArray {

private:
scalable_vector<value_type> _data;
std::vector<size_type> _used_entries{};
scalable_vector<size_type> _used_entries{};

IF_HEAP_PROFILING(heap_profiler::DataStructure *_struct);
};
Expand Down
67 changes: 40 additions & 27 deletions kaminpar-common/datastructures/static_array.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@
******************************************************************************/
#pragma once

#include <cstdlib>
#include <cstring>
#include <initializer_list>
#include <iterator>
#include <thread>
#include <type_traits>
#include <vector>

#include <tbb/parallel_for.h>
Expand All @@ -18,10 +20,23 @@
#include "kaminpar-common/heap_profiler.h"
#include "kaminpar-common/parallel/tbb_malloc.h"

#define KAMINPAR_THP_THRESHOLD 1024 * 1024 * 64

namespace kaminpar {
namespace static_array {
//! Tag for allocating memory, but not touching it. Without this tag, memory is initialized to the
//! default value for the given type.
constexpr struct noinit_t {
} noinit;

//! Tag for small memory allocations that should never be backed by a transparent huge page.
constexpr struct small_t {
} small;

//! Tag for initializing memory sequentially. Without this tag, memory will be initialized by a
//! parallel loop. Has no effect if noinit is also passed.
constexpr struct seq_t {
} seq;
} // namespace static_array

template <typename T> class StaticArray {
Expand Down Expand Up @@ -128,27 +143,19 @@ template <typename T> class StaticArray {
using iterator = StaticArrayIterator;
using const_iterator = const StaticArrayIterator;

StaticArray(T *storage, const std::size_t size) : _size(size), _data(storage) {
RECORD_DATA_STRUCT(size * sizeof(T), _struct);
}

StaticArray(const std::size_t start, const std::size_t size, StaticArray &data)
: StaticArray(size, data._data + start) {
KASSERT(start + size <= data.size());
}

StaticArray(const std::size_t size, value_type *data) : _size(size), _data(data) {
RECORD_DATA_STRUCT(size * sizeof(T), _struct);
}

StaticArray(const std::size_t size, const value_type init_value = value_type()) {
template <typename... Tags>
StaticArray(const std::size_t size, const value_type init_value, Tags... tags) {
RECORD_DATA_STRUCT(0, _struct);
resize(size, init_value);
resize(size, init_value, std::forward<Tags>(tags)...);
}

StaticArray(const std::size_t size, static_array::noinit_t) {
template <typename... Tags> StaticArray(const std::size_t size, Tags... tags) {
RECORD_DATA_STRUCT(0, _struct);
resize(size, static_array::noinit);
resize(size, value_type(), std::forward<Tags>(tags)...);
}

template <typename Iterator>
Expand Down Expand Up @@ -283,26 +290,32 @@ template <typename T> class StaticArray {
return _size;
}

void resize(const std::size_t size, static_array::noinit_t) {
KASSERT(_data == _owned_data.get(), "cannot resize span", assert::always);
allocate_data(size);
template <typename... Tags> void resize(const std::size_t size, Tags &&...tags) {
resize(size, value_type(), std::forward<Tags>(tags)...);
}

void resize(
const size_type size,
const value_type init_value = value_type(),
const bool assign_parallel = true
) {
resize(size, static_array::noinit);
assign(size, init_value, assign_parallel);
template <typename... Tags>
void resize(const std::size_t size, const value_type init_value, Tags &&...tags) {
KASSERT(_data == _owned_data.get(), "cannot resize span", assert::always);
const bool use_thp =
(size >= KAMINPAR_THP_THRESHOLD && !(std::is_same_v<static_array::small_t, Tags> || ...));
allocate_data(size, use_thp);

if constexpr (!(std::is_same_v<static_array::noinit_t, Tags> || ...)) {
if constexpr ((std::is_same_v<static_array::seq_t, Tags> || ...)) {
assign(size, init_value, false);
} else {
assign(size, init_value, true);
}
}
}

void assign(const size_type count, const value_type value, const bool assign_parallel = true) {
KASSERT(_data);

if (assign_parallel) {
const std::size_t step{std::max(count / std::thread::hardware_concurrency(), 1UL)};
tbb::parallel_for(0UL, count, step, [&](const size_type i) {
const std::size_t step = std::max(count / std::thread::hardware_concurrency(), 1UL);
tbb::parallel_for<std::size_t>(0, count, step, [&](const size_type i) {
for (size_type j = i; j < std::min(i + step, count); ++j) {
_data[j] = value;
}
Expand All @@ -322,8 +335,8 @@ template <typename T> class StaticArray {
}

private:
void allocate_data(const std::size_t size) {
_owned_data = parallel::make_unique<value_type>(size);
void allocate_data(const std::size_t size, const bool thp) {
_owned_data = parallel::make_unique<value_type>(size, thp);
_data = _owned_data.get();
_size = size;
_unrestricted_size = _size;
Expand Down
19 changes: 17 additions & 2 deletions kaminpar-common/parallel/tbb_malloc.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@
#include "kaminpar-common/assert.h"
#include "kaminpar-common/heap_profiler.h"

#ifdef KAMINPAR_ENABLE_THP
#include "sys/mman.h"
#endif // KAMINPAR_ENABLE_THP

namespace kaminpar::parallel {
template <typename T> struct tbb_deleter {
void operator()(T *p) {
Expand All @@ -27,9 +31,20 @@ template <typename T> struct tbb_deleter {
template <typename T> using tbb_unique_ptr = std::unique_ptr<T, tbb_deleter<T>>;
// template <typename T> using tbb_unique_ptr = std::unique_ptr<T>;

template <typename T> tbb_unique_ptr<T> make_unique(const std::size_t size) {
template <typename T> tbb_unique_ptr<T> make_unique(const std::size_t size, const bool thp) {
auto nbytes = sizeof(T) * size;
T *ptr = static_cast<T *>(scalable_malloc(nbytes));
T *ptr = nullptr;

#ifdef KAMINPAR_ENABLE_THP
if (thp) {
scalable_posix_memalign(reinterpret_cast<void **>(&ptr), 1 << 21, nbytes);
madvise(ptr, nbytes, MADV_HUGEPAGE);
} else {
#endif // KAMINPAR_ENABLE_THP
ptr = static_cast<T *>(scalable_malloc(nbytes));
#ifdef KAMINPAR_ENABLE_THP
}
#endif // KAMINPAR_ENABLE_THP

KASSERT(
ptr != nullptr, "out of memory: could not allocate " << nbytes << " bytes", assert::light
Expand Down
2 changes: 1 addition & 1 deletion kaminpar-shm/datastructures/compressed_graph.cc
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,7 @@ CompressedGraph CompressedGraphBuilder::build() {
const std::size_t stored_bytes =
static_cast<std::size_t>(_cur_compressed_edges - _compressed_edges);
RECORD("compressed_edges")
StaticArray<std::uint8_t> compressed_edges(_compressed_edges, stored_bytes);
StaticArray<std::uint8_t> compressed_edges(stored_bytes, _compressed_edges);

if constexpr (kHeapProfiling) {
heap_profiler::HeapProfiler::global().record_alloc(_compressed_edges, stored_bytes);
Expand Down
24 changes: 14 additions & 10 deletions kaminpar-shm/graphutils/permutator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include <cmath>

#include <tbb/enumerable_thread_specific.h>
#include <tbb/parallel_invoke.h>

#include "kaminpar-common/assert.h"
#include "kaminpar-common/heap_profiler.h"
Expand All @@ -27,14 +28,14 @@ NodePermutations<StaticArray> rearrange_graph(
StaticArray<EdgeWeight> &edge_weights
) {
START_HEAP_PROFILER("Temporal nodes and edges allocation");
START_TIMER("Allocation (noinit)");
RECORD("tmp_nodes") StaticArray<EdgeID> tmp_nodes(nodes.size(), static_array::noinit);
RECORD("tmp_edges") StaticArray<NodeID> tmp_edges(edges.size(), static_array::noinit);
RECORD("tmp_nodes")
StaticArray<EdgeID> tmp_nodes(nodes.size(), static_array::noinit);
RECORD("tmp_edges")
StaticArray<NodeID> tmp_edges(edges.size(), static_array::noinit);
RECORD("tmp_node_weights")
StaticArray<NodeWeight> tmp_node_weights(node_weights.size(), static_array::noinit);
RECORD("tmp_edge_weights")
StaticArray<EdgeWeight> tmp_edge_weights(edge_weights.size(), static_array::noinit);
STOP_TIMER();
STOP_HEAP_PROFILER();

// if we are about to remove all isolated nodes, we place them to the end of
Expand Down Expand Up @@ -64,10 +65,12 @@ NodePermutations<StaticArray> rearrange_graph(
STOP_HEAP_PROFILER();

START_TIMER("Deallocation");
tmp_nodes.free();
tmp_edges.free();
tmp_node_weights.free();
tmp_edge_weights.free();
tbb::parallel_invoke(
[&] { tmp_nodes.free(); },
[&] { tmp_edges.free(); },
[&] { tmp_node_weights.free(); },
[&] { tmp_edge_weights.free(); }
);
STOP_TIMER();

return permutations;
Expand Down Expand Up @@ -361,7 +364,9 @@ PartitionedGraph assign_isolated_nodes(
const NodeID num_nonisolated_nodes = graph.n() - num_isolated_nodes;

// The following call graph.n() should include isolated nodes now
RECORD("partition") StaticArray<BlockID> partition(graph.n());
RECORD("partition")
StaticArray<BlockID> partition(graph.n(), static_array::noinit);

// copy partition of non-isolated nodes
tbb::parallel_for<NodeID>(0, num_nonisolated_nodes, [&](const NodeID u) {
partition[u] = p_graph.block(u);
Expand All @@ -383,5 +388,4 @@ PartitionedGraph assign_isolated_nodes(

return {graph, k, std::move(partition)};
}

} // namespace kaminpar::shm::graph
24 changes: 14 additions & 10 deletions kaminpar-shm/graphutils/subgraph_extractor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -129,17 +129,21 @@ SequentialSubgraphExtractionResult extract_subgraphs_sequential_generic_graph(
subgraph_positions[1].edges_start_pos = memory_position.edges_start_pos + m1;

auto create_graph = [&](const NodeID n0, const NodeID n, const EdgeID m0, const EdgeID m) {
StaticArray<EdgeID> s_nodes(memory_position.nodes_start_pos + n0, n + 1, subgraph_memory.nodes);
StaticArray<NodeID> s_edges(memory_position.edges_start_pos + m0, m, subgraph_memory.edges);
StaticArray<EdgeID> s_nodes(
n + 1, subgraph_memory.nodes.data() + memory_position.nodes_start_pos + n0
);
StaticArray<NodeID> s_edges(
m, subgraph_memory.edges.data() + memory_position.edges_start_pos + m0
);
StaticArray<NodeWeight> s_node_weights(
is_node_weighted * (memory_position.nodes_start_pos + n0),
is_node_weighted * n,
subgraph_memory.node_weights
subgraph_memory.node_weights.data() +
is_node_weighted * (memory_position.nodes_start_pos + n0)
);
StaticArray<EdgeWeight> s_edge_weights(
is_edge_weighted * (memory_position.edges_start_pos + m0),
is_edge_weighted * m,
subgraph_memory.edge_weights
subgraph_memory.edge_weights.data() +
is_edge_weighted * (memory_position.edges_start_pos + m0)
);
return shm::Graph(std::make_unique<CSRGraph>(
CSRGraph::seq{},
Expand Down Expand Up @@ -291,13 +295,13 @@ SubgraphExtractionResult extract_subgraphs_generic_graph(
start_positions[b + 1].nodes_start_pos - n0 - compute_final_k(b, p_graph.k(), input_k);
const EdgeID m = start_positions[b + 1].edges_start_pos - m0;

StaticArray<EdgeID> nodes(n0, n + 1, subgraph_memory.nodes);
StaticArray<NodeID> edges(m0, m, subgraph_memory.edges);
StaticArray<EdgeID> nodes(n + 1, subgraph_memory.nodes.data() + n0);
StaticArray<NodeID> edges(m, subgraph_memory.edges.data() + m0);
StaticArray<NodeWeight> node_weights(
is_node_weighted * n0, is_node_weighted * n, subgraph_memory.node_weights
is_node_weighted * n, subgraph_memory.node_weights.data() + is_node_weighted * n0
);
StaticArray<EdgeWeight> edge_weights(
is_edge_weighted * m0, is_edge_weighted * m, subgraph_memory.edge_weights
is_edge_weighted * m, subgraph_memory.edge_weights.data() + is_edge_weighted * m0
);
subgraphs[b] = shm::Graph(std::make_unique<CSRGraph>(
std::move(nodes), std::move(edges), std::move(node_weights), std::move(edge_weights)
Expand Down
8 changes: 4 additions & 4 deletions kaminpar-shm/kaminpar.cc
Original file line number Diff line number Diff line change
Expand Up @@ -109,14 +109,14 @@ void KaMinPar::borrow_and_mutate_graph(

const EdgeID m = xadj[n];

RECORD("nodes") StaticArray<EdgeID> nodes(xadj, n + 1);
RECORD("edges") StaticArray<NodeID> edges(adjncy, m);
RECORD("nodes") StaticArray<EdgeID> nodes(n + 1, xadj);
RECORD("edges") StaticArray<NodeID> edges(m, adjncy);
RECORD("node_weights")
StaticArray<NodeWeight> node_weights =
(vwgt == nullptr) ? StaticArray<NodeWeight>(0) : StaticArray<NodeWeight>(vwgt, n);
(vwgt == nullptr) ? StaticArray<NodeWeight>(0) : StaticArray<NodeWeight>(n, vwgt);
RECORD("edge_weights")
StaticArray<EdgeWeight> edge_weights =
(adjwgt == nullptr) ? StaticArray<EdgeWeight>(0) : StaticArray<EdgeWeight>(adjwgt, m);
(adjwgt == nullptr) ? StaticArray<EdgeWeight>(0) : StaticArray<EdgeWeight>(m, adjwgt);

_was_rearranged = false;
_graph_ptr = std::make_unique<Graph>(std::make_unique<CSRGraph>(
Expand Down
3 changes: 3 additions & 0 deletions kaminpar-shm/label_propagation.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,16 @@
#include <tbb/parallel_for.h>
#include <tbb/parallel_invoke.h>

#include "kaminpar-shm/kaminpar.h"

#include "kaminpar-common/assert.h"
#include "kaminpar-common/datastructures/concurrent_fast_reset_array.h"
#include "kaminpar-common/datastructures/concurrent_two_level_vector.h"
#include "kaminpar-common/datastructures/dynamic_map.h"
#include "kaminpar-common/datastructures/rating_map.h"
#include "kaminpar-common/heap_profiler.h"
#include "kaminpar-common/logger.h"
#include "kaminpar-common/parallel/algorithm.h"
#include "kaminpar-common/parallel/atomic.h"
#include "kaminpar-common/random.h"
#include "kaminpar-common/tags.h"
Expand Down

0 comments on commit c7bc3dc

Please sign in to comment.