Skip to content

Commit

Permalink
Merge branch 'shm/feat/mem-efficient2'
Browse files Browse the repository at this point in the history
  • Loading branch information
DanielSeemaier committed Jun 3, 2024
2 parents 84dad79 + 0170783 commit b87522a
Show file tree
Hide file tree
Showing 13 changed files with 283 additions and 93 deletions.
5 changes: 3 additions & 2 deletions apps/KaMinPar.cc
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,8 @@ int main(int argc, char *argv[]) {
std::exit(0);
}

if (ctx.compression.enabled && ctx.node_ordering == NodeOrdering::DEGREE_BUCKETS) {
if (ctx.compression.enabled && app.graph_file_format == io::GraphFileFormat::METIS &&
ctx.node_ordering == NodeOrdering::DEGREE_BUCKETS) {
std::cout << "The nodes of the compressed graph cannot be rearranged by degree buckets!"
<< std::endl;
std::exit(0);
Expand Down Expand Up @@ -236,7 +237,7 @@ int main(int argc, char *argv[]) {
app.graph_file_format,
ctx.compression.enabled,
ctx.compression.may_dismiss,
ctx.node_ordering == NodeOrdering::IMPLICIT_DEGREE_BUCKETS
ctx.node_ordering
);
};

Expand Down
55 changes: 46 additions & 9 deletions apps/benchmarks/shm_input_benchmark.cc
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include <tbb/global_control.h>

#include "kaminpar-shm/context_io.h"
#include "kaminpar-shm/datastructures/compressed_graph_builder.h"

#include "kaminpar-common/console_io.h"
#include "kaminpar-common/logger.h"
Expand All @@ -31,6 +32,7 @@ int main(int argc, char *argv[]) {
// Parse CLI arguments
std::string graph_filename;
io::GraphFileFormat graph_file_format = io::GraphFileFormat::METIS;
bool compress_in_memory = false;
int seed = 0;

CLI::App app("Shared-memory input benchmark");
Expand All @@ -41,9 +43,22 @@ int main(int argc, char *argv[]) {
- metis
- parhip)")
->capture_default_str();
app.add_option("--node-order", ctx.node_ordering)
->transform(CLI::CheckedTransformer(get_node_orderings()).description(""))
->description(R"(Criteria by which the nodes of the graph are sorted and rearranged:
- natural: keep node order of the graph (do not rearrange)
- deg-buckets: sort nodes by degree bucket and rearrange accordingly
- implicit-deg-buckets: nodes of the input graph are sorted by deg-buckets order)")
->capture_default_str();
app.add_flag(
"--compress-in-memory",
compress_in_memory,
"Whether to compress the input graph in memory when graph compression is enabled"
)
->capture_default_str();
app.add_option("-t,--threads", ctx.parallel.num_threads, "Number of threads")
->capture_default_str();
app.add_option("-s,--seed", seed, "Seed for random number generation.")->capture_default_str();
app.add_option("-s,--seed", seed, "Seed for random number generation")->capture_default_str();
app.add_option("-k,--k", ctx.partition.k);
app.add_option("-e,--epsilon", ctx.partition.epsilon);
create_graph_compression_options(&app, ctx);
Expand All @@ -59,14 +74,36 @@ int main(int argc, char *argv[]) {
SCOPED_HEAP_PROFILER("Read Input Graph");
SCOPED_TIMER("Read Input Graph");

Graph graph = io::read(
graph_filename,
graph_file_format,
ctx.compression.enabled,
ctx.compression.may_dismiss,
false
);
ctx.setup(graph);
if (ctx.compression.enabled && compress_in_memory) {
CSRGraph csr_graph = TIMED_SCOPE("Read CSR Graph") {
SCOPED_HEAP_PROFILER("Read CSR Graph");
return io::csr_read(graph_filename, graph_file_format, false);
};

SCOPED_TIMER("Compress CSR Graph");
SCOPED_HEAP_PROFILER("Compress CSR Graph");

const bool sequential_compression = ctx.parallel.num_threads <= 1;
if (sequential_compression) {
Graph graph =
Graph(std::make_unique<CompressedGraph>(CompressedGraphBuilder::compress(csr_graph)));
ctx.setup(graph);
} else {
Graph graph = Graph(
std::make_unique<CompressedGraph>(ParallelCompressedGraphBuilder::compress(csr_graph))
);
ctx.setup(graph);
}
} else {
Graph graph = io::read(
graph_filename,
graph_file_format,
ctx.compression.enabled,
ctx.compression.may_dismiss,
ctx.node_ordering
);
ctx.setup(graph);
}
}

DISABLE_HEAP_PROFILER();
Expand Down
2 changes: 1 addition & 1 deletion apps/benchmarks/shm_label_propagation_benchmark.cc
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ int main(int argc, char *argv[]) {
graph_file_format,
ctx.compression.enabled,
ctx.compression.may_dismiss,
ctx.node_ordering == NodeOrdering::IMPLICIT_DEGREE_BUCKETS
ctx.node_ordering
);
ctx.setup(graph);

Expand Down
151 changes: 121 additions & 30 deletions apps/io/parhip_parser.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,12 @@
#include <unistd.h>

#include "kaminpar-shm/datastructures/compressed_graph_builder.h"
#include "kaminpar-shm/kaminpar.h"

#include "kaminpar-common/datastructures/concurrent_circular_vector.h"
#include "kaminpar-common/datastructures/static_array.h"
#include "kaminpar-common/logger.h"
#include "kaminpar-common/parallel/loops.h"
#include "kaminpar-common/timer.h"

namespace kaminpar::shm::io::parhip {
Expand Down Expand Up @@ -357,9 +360,66 @@ void print_stats(const auto &stats_ets) {
}

} // namespace debug

std::pair<StaticArray<NodeID>, StaticArray<NodeID>>
sort_by_degree_buckets(const NodeID n, const StaticArray<NodeID> &degrees) {
auto find_bucket = [&](const NodeID deg) {
return deg == 0 ? (kNumberOfDegreeBuckets<NodeID> - 1) : degree_bucket(deg);
};

const int cpus = std::min<int>(tbb::this_task_arena::max_concurrency(), n);
RECORD("permutation") StaticArray<NodeID> permutation(n, static_array::noinit);
RECORD("inverse_permutation") StaticArray<NodeID> inverse_permutation(n, static_array::noinit);

using Buckets = std::array<NodeID, kNumberOfDegreeBuckets<NodeID> + 1>;
std::vector<Buckets, tbb::cache_aligned_allocator<Buckets>> local_buckets(cpus + 1);

parallel::deterministic_for<NodeID>(0, n, [&](const NodeID from, const NodeID to, const int cpu) {
KASSERT(cpu < cpus);

for (NodeID u = from; u < to; ++u) {
const auto bucket = find_bucket(degrees[u]);
permutation[u] = local_buckets[cpu + 1][bucket]++;
}
});

// Build a table of prefix numbers to correct the position of each node in the
// final permutation After the previous loop, permutation[u] contains the
// position of u in the thread-local bucket. (i) account for smaller buckets
// --> add prefix computed in global_buckets (ii) account for the same bucket
// in smaller processor IDs --> add prefix computed in local_buckets
Buckets global_buckets{};
for (int id = 1; id < cpus + 1; ++id) {
for (std::size_t i = 0; i + 1 < global_buckets.size(); ++i) {
global_buckets[i + 1] += local_buckets[id][i];
}
}
parallel::prefix_sum(global_buckets.begin(), global_buckets.end(), global_buckets.begin());
for (std::size_t i = 0; i < global_buckets.size(); ++i) {
for (int id = 0; id + 1 < cpus; ++id) {
local_buckets[id + 1][i] += local_buckets[id][i];
}
}

// Apply offsets to obtain global permutation
parallel::deterministic_for<NodeID>(0, n, [&](const NodeID from, const NodeID to, const int cpu) {
KASSERT(cpu < cpus);

for (NodeID u = from; u < to; ++u) {
const NodeID bucket = find_bucket(degrees[u]);
permutation[u] += global_buckets[bucket] + local_buckets[cpu][bucket];
}
});

// Compute inverse permutation
tbb::parallel_for<NodeID>(0, n, [&](const NodeID u) { inverse_permutation[permutation[u]] = u; });

return {std::move(permutation), std::move(inverse_permutation)};
}

} // namespace

CompressedGraph compressed_read_parallel(const std::string &filename, const bool sorted) {
CompressedGraph compressed_read_parallel(const std::string &filename, const NodeOrdering ordering) {
try {
BinaryReader reader(filename);

Expand Down Expand Up @@ -392,6 +452,28 @@ CompressedGraph compressed_read_parallel(const std::string &filename, const bool
const auto map_edge_offset = [&](const NodeID node) {
return (nodes[node] - nodes_offset_base) / sizeof(NodeID);
};
const auto fetch_degree = [&](const NodeID node) {
return static_cast<NodeID>((nodes[node + 1] - nodes[node]) / sizeof(NodeID));
};

RECORD("degrees") StaticArray<NodeID> degrees(num_nodes, static_array::noinit);
TIMED_SCOPE("Read degrees") {
tbb::parallel_for(tbb::blocked_range<NodeID>(0, num_nodes), [&](const auto &r) {
for (NodeID u = r.begin(); u != r.end(); ++u) {
degrees[u] = fetch_degree(u);
}
});
};

const bool sort_by_degree_bucket = ordering == NodeOrdering::DEGREE_BUCKETS;
StaticArray<NodeID> permutation;
StaticArray<NodeID> inverse_permutation;
if (sort_by_degree_bucket) {
SCOPED_TIMER("Compute permutation");
auto [perm, inv_perm] = sort_by_degree_buckets(num_nodes, degrees);
permutation = std::move(perm);
inverse_permutation = std::move(inv_perm);
}

// To compress the graph in parallel the nodes are split into chunks. Each parallel task fetches
// a chunk and compresses the neighbourhoods of the corresponding nodes. The compressed
Expand All @@ -400,38 +482,46 @@ CompressedGraph compressed_read_parallel(const std::string &filename, const bool
// chunks is determined.
constexpr std::size_t kNumChunks = 5000;
const EdgeID max_chunk_size = num_edges / kNumChunks;
std::vector<std::pair<NodeID, NodeID>> chunks;
std::vector<std::tuple<NodeID, NodeID, EdgeID>> chunks;

NodeID max_degree = 0;
TIMED_SCOPE("Compute chunks") {
NodeID cur_chunk_start = 0;
EdgeID cur_chunk_size = 0;
for (NodeID node = 0; node < num_nodes; ++node) {
const auto degree = static_cast<NodeID>((nodes[node + 1] - nodes[node]) / sizeof(NodeID));
if (degree > max_degree) {
max_degree = degree;
}
EdgeID cur_first_edge = 0;
for (NodeID i = 0; i < num_nodes; ++i) {
NodeID node = sort_by_degree_bucket ? inverse_permutation[i] : i;

const NodeID degree = degrees[node];
max_degree = std::max(max_degree, degree);

cur_chunk_size += degree;
if (cur_chunk_size >= max_chunk_size) {
if (cur_chunk_start == node) {
chunks.emplace_back(node, node + 1);
cur_chunk_start = node + 1;
if (cur_chunk_start == i) {
chunks.emplace_back(cur_chunk_start, i + 1, cur_first_edge);

cur_chunk_start = i + 1;
cur_first_edge += degree;
cur_chunk_size = 0;
} else {
chunks.emplace_back(cur_chunk_start, node);
cur_chunk_start = node;
}
chunks.emplace_back(cur_chunk_start, i, cur_first_edge);

cur_chunk_size = 0;
cur_chunk_start = i;
cur_first_edge += cur_chunk_size - degree;
cur_chunk_size = degree;
}
}
}

if (cur_chunk_start != num_nodes) {
chunks.emplace_back(cur_chunk_start, num_nodes);
chunks.emplace_back(cur_chunk_start, num_nodes, cur_first_edge);
}
};

degrees.free();

// Initializes the data structures used to build the compressed graph in parallel.
const bool sorted = ordering == NodeOrdering::IMPLICIT_DEGREE_BUCKETS || sort_by_degree_bucket;
ParallelCompressedGraphBuilder builder(
header.num_nodes, header.num_edges, header.has_node_weights, header.has_edge_weights, sorted
);
Expand All @@ -457,28 +547,29 @@ CompressedGraph compressed_read_parallel(const std::string &filename, const bool
CompressedEdgesBuilder &neighbourhood_builder = neighbourhood_builder_ets.local();

const NodeID chunk = buffer.next();
const auto [start_node, end_node] = chunks[chunk];

EdgeID edge = map_edge_offset(start_node);
neighbourhood_builder.init(edge);
const auto [start, end, first_edge] = chunks[chunk];

NodeWeight local_node_weight = 0;
neighbourhood_builder.init(first_edge);

// Compress the neighborhoods of the nodes in the fetched chunk.
debug::scoped_time(dbg.compression_time, [&] {
for (NodeID node = start_node; node < end_node; ++node) {
const auto degree = static_cast<NodeID>((nodes[node + 1] - nodes[node]) / sizeof(NodeID));
for (NodeID i = start; i < end; ++i) {
const NodeID node = sort_by_degree_bucket ? inverse_permutation[i] : i;
const NodeID degree = fetch_degree(node);
IF_DBG dbg.num_edges += degree;

for (NodeID i = 0; i < degree; ++i) {
const NodeID adjacent_node = edges[edge];
EdgeID edge = map_edge_offset(node);
for (NodeID j = 0; j < degree; ++j) {
const NodeID adjacent_node =
sort_by_degree_bucket ? permutation[edges[edge]] : edges[edge];
const EdgeWeight edge_weight = header.has_edge_weights ? edge_weights[edge] : 1;

neighbourhood.emplace_back(adjacent_node, edge_weight);
edge += 1;
}

const EdgeID local_offset = neighbourhood_builder.add(node, neighbourhood);
const EdgeID local_offset = neighbourhood_builder.add(i, neighbourhood);
offsets.push_back(local_offset);

neighbourhood.clear();
Expand All @@ -494,18 +585,18 @@ CompressedGraph compressed_read_parallel(const std::string &filename, const bool
// Store the edge offset and node weight for each node in the chunk and copy the compressed
// neighborhoods into the actual compressed edge array.
debug::scoped_time(dbg.copy_time, [&] {
NodeID node = start_node;
for (EdgeID local_offset : offsets) {
builder.add_node(node, offset + local_offset);
for (NodeID i = start; i < end; ++i) {
const EdgeID local_offset = offsets[i - start];

builder.add_node(i, offset + local_offset);

if (header.has_node_weights) {
const NodeID node = sort_by_degree_bucket ? inverse_permutation[i] : i;
const NodeWeight node_weight = node_weights[node];
local_node_weight += node_weight;

builder.add_node_weight(node, node_weight);
builder.add_node_weight(i, node_weight);
}

node += 1;
}
offsets.clear();

Expand Down
3 changes: 2 additions & 1 deletion apps/io/parhip_parser.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

#include "kaminpar-shm/datastructures/compressed_graph.h"
#include "kaminpar-shm/datastructures/csr_graph.h"
#include "kaminpar-shm/kaminpar.h"

namespace kaminpar::shm::io::parhip {

Expand Down Expand Up @@ -39,6 +40,6 @@ CompressedGraph compressed_read(const std::string &filename, const bool sorted);
* @param sorted Whether the nodes of the graph to read are stored in degree-buckets order.
* @return The graph that is stored in the file.
*/
CompressedGraph compressed_read_parallel(const std::string &filename, const bool sorted);
CompressedGraph compressed_read_parallel(const std::string &filename, const NodeOrdering ordering);

} // namespace kaminpar::shm::io::parhip
Loading

0 comments on commit b87522a

Please sign in to comment.