Skip to content

Commit

Permalink
feat(compressed-graph): enable deg-bucket reordering of the compresse…
Browse files Browse the repository at this point in the history
…d graph while reading
  • Loading branch information
dsalwasser committed Jun 1, 2024
1 parent 506a66a commit a03b1ee
Show file tree
Hide file tree
Showing 9 changed files with 148 additions and 41 deletions.
5 changes: 3 additions & 2 deletions apps/KaMinPar.cc
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,8 @@ int main(int argc, char *argv[]) {
std::exit(0);
}

if (ctx.compression.enabled && ctx.node_ordering == NodeOrdering::DEGREE_BUCKETS) {
if (ctx.compression.enabled && app.graph_file_format == io::GraphFileFormat::METIS &&
ctx.node_ordering == NodeOrdering::DEGREE_BUCKETS) {
std::cout << "The nodes of the compressed graph cannot be rearranged by degree buckets!"
<< std::endl;
std::exit(0);
Expand Down Expand Up @@ -236,7 +237,7 @@ int main(int argc, char *argv[]) {
app.graph_file_format,
ctx.compression.enabled,
ctx.compression.may_dismiss,
ctx.node_ordering == NodeOrdering::IMPLICIT_DEGREE_BUCKETS
ctx.node_ordering
);
};

Expand Down
9 changes: 8 additions & 1 deletion apps/benchmarks/shm_input_benchmark.cc
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,13 @@ int main(int argc, char *argv[]) {
- metis
- parhip)")
->capture_default_str();
app.add_option("--node-order", ctx.node_ordering)
->transform(CLI::CheckedTransformer(get_node_orderings()).description(""))
->description(R"(Criteria by which the nodes of the graph are sorted and rearranged:
- natural: keep node order of the graph (do not rearrange)
- deg-buckets: sort nodes by degree bucket and rearrange accordingly
- implicit-deg-buckets: nodes of the input graph are sorted by deg-buckets order)")
->capture_default_str();
app.add_flag(
"--compress-in-memory",
compress_in_memory,
Expand Down Expand Up @@ -93,7 +100,7 @@ int main(int argc, char *argv[]) {
graph_file_format,
ctx.compression.enabled,
ctx.compression.may_dismiss,
false
ctx.node_ordering
);
ctx.setup(graph);
}
Expand Down
2 changes: 1 addition & 1 deletion apps/benchmarks/shm_label_propagation_benchmark.cc
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ int main(int argc, char *argv[]) {
graph_file_format,
ctx.compression.enabled,
ctx.compression.may_dismiss,
ctx.node_ordering == NodeOrdering::IMPLICIT_DEGREE_BUCKETS
ctx.node_ordering
);
ctx.setup(graph);

Expand Down
151 changes: 121 additions & 30 deletions apps/io/parhip_parser.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,12 @@
#include <unistd.h>

#include "kaminpar-shm/datastructures/compressed_graph_builder.h"
#include "kaminpar-shm/kaminpar.h"

#include "kaminpar-common/datastructures/concurrent_circular_vector.h"
#include "kaminpar-common/datastructures/static_array.h"
#include "kaminpar-common/logger.h"
#include "kaminpar-common/parallel/loops.h"
#include "kaminpar-common/timer.h"

namespace kaminpar::shm::io::parhip {
Expand Down Expand Up @@ -357,9 +360,66 @@ void print_stats(const auto &stats_ets) {
}

} // namespace debug

std::pair<StaticArray<NodeID>, StaticArray<NodeID>>
sort_by_degree_buckets(const NodeID n, const StaticArray<NodeID> &degrees) {
auto find_bucket = [&](const NodeID deg) {
return deg == 0 ? (kNumberOfDegreeBuckets<NodeID> - 1) : degree_bucket(deg);
};

const int cpus = std::min<int>(tbb::this_task_arena::max_concurrency(), n);
RECORD("permutation") StaticArray<NodeID> permutation(n, static_array::noinit);
RECORD("inverse_permutation") StaticArray<NodeID> inverse_permutation(n, static_array::noinit);

using Buckets = std::array<NodeID, kNumberOfDegreeBuckets<NodeID> + 1>;
std::vector<Buckets, tbb::cache_aligned_allocator<Buckets>> local_buckets(cpus + 1);

parallel::deterministic_for<NodeID>(0, n, [&](const NodeID from, const NodeID to, const int cpu) {
KASSERT(cpu < cpus);

for (NodeID u = from; u < to; ++u) {
const auto bucket = find_bucket(degrees[u]);
permutation[u] = local_buckets[cpu + 1][bucket]++;
}
});

// Build a table of prefix numbers to correct the position of each node in the
// final permutation After the previous loop, permutation[u] contains the
// position of u in the thread-local bucket. (i) account for smaller buckets
// --> add prefix computed in global_buckets (ii) account for the same bucket
// in smaller processor IDs --> add prefix computed in local_buckets
Buckets global_buckets{};
for (int id = 1; id < cpus + 1; ++id) {
for (std::size_t i = 0; i + 1 < global_buckets.size(); ++i) {
global_buckets[i + 1] += local_buckets[id][i];
}
}
parallel::prefix_sum(global_buckets.begin(), global_buckets.end(), global_buckets.begin());
for (std::size_t i = 0; i < global_buckets.size(); ++i) {
for (int id = 0; id + 1 < cpus; ++id) {
local_buckets[id + 1][i] += local_buckets[id][i];
}
}

// Apply offsets to obtain global permutation
parallel::deterministic_for<NodeID>(0, n, [&](const NodeID from, const NodeID to, const int cpu) {
KASSERT(cpu < cpus);

for (NodeID u = from; u < to; ++u) {
const NodeID bucket = find_bucket(degrees[u]);
permutation[u] += global_buckets[bucket] + local_buckets[cpu][bucket];
}
});

// Compute inverse permutation
tbb::parallel_for<NodeID>(0, n, [&](const NodeID u) { inverse_permutation[permutation[u]] = u; });

return {std::move(permutation), std::move(inverse_permutation)};
}

} // namespace

CompressedGraph compressed_read_parallel(const std::string &filename, const bool sorted) {
CompressedGraph compressed_read_parallel(const std::string &filename, const NodeOrdering ordering) {
try {
BinaryReader reader(filename);

Expand Down Expand Up @@ -392,6 +452,28 @@ CompressedGraph compressed_read_parallel(const std::string &filename, const bool
const auto map_edge_offset = [&](const NodeID node) {
return (nodes[node] - nodes_offset_base) / sizeof(NodeID);
};
const auto fetch_degree = [&](const NodeID node) {
return static_cast<NodeID>((nodes[node + 1] - nodes[node]) / sizeof(NodeID));
};

RECORD("degrees") StaticArray<NodeID> degrees(num_nodes, static_array::noinit);
TIMED_SCOPE("Read degrees") {
tbb::parallel_for(tbb::blocked_range<NodeID>(0, num_nodes), [&](const auto &r) {
for (NodeID u = r.begin(); u != r.end(); ++u) {
degrees[u] = fetch_degree(u);
}
});
};

const bool sort_by_degree_bucket = ordering == NodeOrdering::DEGREE_BUCKETS;
StaticArray<NodeID> permutation;
StaticArray<NodeID> inverse_permutation;
if (sort_by_degree_bucket) {
SCOPED_TIMER("Compute permutation");
auto [perm, inv_perm] = sort_by_degree_buckets(num_nodes, degrees);
permutation = std::move(perm);
inverse_permutation = std::move(inv_perm);
}

// To compress the graph in parallel the nodes are split into chunks. Each parallel task fetches
// a chunk and compresses the neighbourhoods of the corresponding nodes. The compressed
Expand All @@ -400,38 +482,46 @@ CompressedGraph compressed_read_parallel(const std::string &filename, const bool
// chunks is determined.
constexpr std::size_t kNumChunks = 5000;
const EdgeID max_chunk_size = num_edges / kNumChunks;
std::vector<std::pair<NodeID, NodeID>> chunks;
std::vector<std::tuple<NodeID, NodeID, EdgeID>> chunks;

NodeID max_degree = 0;
TIMED_SCOPE("Compute chunks") {
NodeID cur_chunk_start = 0;
EdgeID cur_chunk_size = 0;
for (NodeID node = 0; node < num_nodes; ++node) {
const auto degree = static_cast<NodeID>((nodes[node + 1] - nodes[node]) / sizeof(NodeID));
if (degree > max_degree) {
max_degree = degree;
}
EdgeID cur_first_edge = 0;
for (NodeID i = 0; i < num_nodes; ++i) {
NodeID node = sort_by_degree_bucket ? inverse_permutation[i] : i;

const NodeID degree = degrees[node];
max_degree = std::max(max_degree, degree);

cur_chunk_size += degree;
if (cur_chunk_size >= max_chunk_size) {
if (cur_chunk_start == node) {
chunks.emplace_back(node, node + 1);
cur_chunk_start = node + 1;
if (cur_chunk_start == i) {
chunks.emplace_back(cur_chunk_start, i + 1, cur_first_edge);

cur_chunk_start = i + 1;
cur_first_edge += degree;
cur_chunk_size = 0;
} else {
chunks.emplace_back(cur_chunk_start, node);
cur_chunk_start = node;
}
chunks.emplace_back(cur_chunk_start, i, cur_first_edge);

cur_chunk_size = 0;
cur_chunk_start = i;
cur_first_edge += cur_chunk_size - degree;
cur_chunk_size = degree;
}
}
}

if (cur_chunk_start != num_nodes) {
chunks.emplace_back(cur_chunk_start, num_nodes);
chunks.emplace_back(cur_chunk_start, num_nodes, cur_first_edge);
}
};

degrees.free();

// Initializes the data structures used to build the compressed graph in parallel.
const bool sorted = ordering == NodeOrdering::IMPLICIT_DEGREE_BUCKETS || sort_by_degree_bucket;
ParallelCompressedGraphBuilder builder(
header.num_nodes, header.num_edges, header.has_node_weights, header.has_edge_weights, sorted
);
Expand All @@ -457,28 +547,29 @@ CompressedGraph compressed_read_parallel(const std::string &filename, const bool
CompressedEdgesBuilder &neighbourhood_builder = neighbourhood_builder_ets.local();

const NodeID chunk = buffer.next();
const auto [start_node, end_node] = chunks[chunk];

EdgeID edge = map_edge_offset(start_node);
neighbourhood_builder.init(edge);
const auto [start, end, first_edge] = chunks[chunk];

NodeWeight local_node_weight = 0;
neighbourhood_builder.init(first_edge);

// Compress the neighborhoods of the nodes in the fetched chunk.
debug::scoped_time(dbg.compression_time, [&] {
for (NodeID node = start_node; node < end_node; ++node) {
const auto degree = static_cast<NodeID>((nodes[node + 1] - nodes[node]) / sizeof(NodeID));
for (NodeID i = start; i < end; ++i) {
const NodeID node = sort_by_degree_bucket ? inverse_permutation[i] : i;
const NodeID degree = fetch_degree(node);
IF_DBG dbg.num_edges += degree;

for (NodeID i = 0; i < degree; ++i) {
const NodeID adjacent_node = edges[edge];
EdgeID edge = map_edge_offset(node);
for (NodeID j = 0; j < degree; ++j) {
const NodeID adjacent_node =
sort_by_degree_bucket ? permutation[edges[edge]] : edges[edge];
const EdgeWeight edge_weight = header.has_edge_weights ? edge_weights[edge] : 1;

neighbourhood.emplace_back(adjacent_node, edge_weight);
edge += 1;
}

const EdgeID local_offset = neighbourhood_builder.add(node, neighbourhood);
const EdgeID local_offset = neighbourhood_builder.add(i, neighbourhood);
offsets.push_back(local_offset);

neighbourhood.clear();
Expand All @@ -494,18 +585,18 @@ CompressedGraph compressed_read_parallel(const std::string &filename, const bool
// Store the edge offset and node weight for each node in the chunk and copy the compressed
// neighborhoods into the actual compressed edge array.
debug::scoped_time(dbg.copy_time, [&] {
NodeID node = start_node;
for (EdgeID local_offset : offsets) {
builder.add_node(node, offset + local_offset);
for (NodeID i = start; i < end; ++i) {
const EdgeID local_offset = offsets[i - start];

builder.add_node(i, offset + local_offset);

if (header.has_node_weights) {
const NodeID node = sort_by_degree_bucket ? inverse_permutation[i] : i;
const NodeWeight node_weight = node_weights[node];
local_node_weight += node_weight;

builder.add_node_weight(node, node_weight);
builder.add_node_weight(i, node_weight);
}

node += 1;
}
offsets.clear();

Expand Down
3 changes: 2 additions & 1 deletion apps/io/parhip_parser.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

#include "kaminpar-shm/datastructures/compressed_graph.h"
#include "kaminpar-shm/datastructures/csr_graph.h"
#include "kaminpar-shm/kaminpar.h"

namespace kaminpar::shm::io::parhip {

Expand Down Expand Up @@ -39,6 +40,6 @@ CompressedGraph compressed_read(const std::string &filename, const bool sorted);
* @param sorted Whether the nodes of the graph to read are stored in degree-buckets order.
* @return The graph that is stored in the file.
*/
CompressedGraph compressed_read_parallel(const std::string &filename, const bool sorted);
CompressedGraph compressed_read_parallel(const std::string &filename, const NodeOrdering ordering);

} // namespace kaminpar::shm::io::parhip
6 changes: 4 additions & 2 deletions apps/io/shm_io.cc
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,10 @@ Graph read(
const GraphFileFormat file_format,
const bool compress,
const bool may_dismiss,
const bool sorted
const NodeOrdering ordering
) {
const bool sorted = ordering == NodeOrdering::IMPLICIT_DEGREE_BUCKETS;

if (compressed_binary::is_compressed(filename)) {
if (!compress) {
LOG_ERROR << "The input graph is stored in a compressed format but graph compression is"
Expand All @@ -61,7 +63,7 @@ Graph read(
return metis::compress_read(filename, sorted, may_dismiss);
}
case GraphFileFormat::PARHIP: {
return std::optional(parhip::compressed_read_parallel(filename, sorted));
return std::optional(parhip::compressed_read_parallel(filename, ordering));
}
default:
__builtin_unreachable();
Expand Down
4 changes: 2 additions & 2 deletions apps/io/shm_io.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,15 +50,15 @@ csr_read(const std::string &filename, const GraphFileFormat file_format, const b
* @param compress Whether to compress the graph.
* @param may_dismiss Whether to only return the compressed graph if it uses less memory than the
* uncompressed graph.
* @param sorted Whether the nodes of the graph to read are stored in degree-buckets order.
* @param ordering The node ordering of the graph to read.
* @return The graph to read.
*/
Graph read(
const std::string &filename,
const GraphFileFormat file_format,
const bool compress,
const bool may_dismiss,
const bool sorted
const NodeOrdering ordering
);

namespace partition {
Expand Down
7 changes: 6 additions & 1 deletion apps/tools/shm_graph_properties_tool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include <tbb/global_control.h>

#include "kaminpar-shm/context_io.h"
#include "kaminpar-shm/kaminpar.h"

#include "kaminpar-common/console_io.h"
#include "kaminpar-common/logger.h"
Expand Down Expand Up @@ -92,7 +93,11 @@ int main(int argc, char *argv[]) {
tbb::global_control gc(tbb::global_control::max_allowed_parallelism, ctx.parallel.num_threads);

Graph graph = io::read(
graph_filename, graph_file_format, ctx.compression.enabled, ctx.compression.may_dismiss, false
graph_filename,
graph_file_format,
ctx.compression.enabled,
ctx.compression.may_dismiss,
NodeOrdering::NATURAL
);

ctx.debug.graph_name = str::extract_basename(graph_filename);
Expand Down
2 changes: 1 addition & 1 deletion kaminpar-shm/kaminpar.cc
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ EdgeWeight KaMinPar::compute_partition(const BlockID k, BlockID *partition) {
START_TIMER("Partitioning");

if (!_was_rearranged) {
if (_ctx.node_ordering == NodeOrdering::DEGREE_BUCKETS) {
if (_ctx.node_ordering == NodeOrdering::DEGREE_BUCKETS && !_graph_ptr->sorted()) {
CSRGraph &csr_graph = *dynamic_cast<CSRGraph *>(_graph_ptr->underlying_graph());
_graph_ptr = std::make_unique<Graph>(graph::rearrange_by_degree_buckets(csr_graph));
}
Expand Down

0 comments on commit a03b1ee

Please sign in to comment.