Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[REVIEW] Profile graph creation runtime and memory footprint #3518

Merged
merged 6 commits into from
Apr 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 2 additions & 43 deletions benchmarks/cugraph/standalone/cugraph_dask_funcs.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2021-2022, NVIDIA CORPORATION.
# Copyright (c) 2021-2023, NVIDIA CORPORATION.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
Expand All @@ -22,52 +22,11 @@

import cugraph
from cugraph.dask.comms import comms as Comms
from cugraph.generators import rmat
import tempfile
from cugraph.testing.mg_utils import generate_edgelist

import rmm


def generate_edgelist(scale,
edgefactor,
seed=None,
unweighted=False,
):
"""
Returns a dask_cudf DataFrame created using the R-MAT graph generator.

The resulting graph is weighted with random values of a uniform distribution
from the interval [0, 1)

scale is used to determine the number of vertices to be generated (num_verts
= 2^scale), which is also used to determine the data type for the vertex ID
values in the DataFrame.

edgefactor determies the number of edges (num_edges = num_edges*edgefactor)

seed, if specified, will be used as the seed to the RNG.

unweighted determines if the resulting edgelist will have randomly-generated
weightes ranging in value between [0, 1). If True, an edgelist with only 2
columns is returned.
"""
ddf = rmat(
scale,
(2**scale)*edgefactor,
0.57, # from Graph500
0.19, # from Graph500
0.19, # from Graph500
seed or 42,
clip_and_flip=False,
scramble_vertex_ids=True,
create_using=None, # return edgelist instead of Graph instance
mg=True
)
if not unweighted:
rng = np.random.default_rng(seed)
ddf["weight"] = ddf.map_partitions(lambda df: rng.random(size=len(df)))
return ddf


def read_csv(input_csv_file, scale):
"""
Expand Down
171 changes: 171 additions & 0 deletions benchmarks/cugraph/standalone/cugraph_graph_creation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
# Copyright (c) 2021-2023, NVIDIA CORPORATION.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from cugraph.testing.mg_utils import (
generate_edgelist_rmat,
get_allocation_counts_dask_persist,
sizeof_fmt,
get_peak_output_ratio_across_workers,
restart_client,
)

from cugraph.testing.mg_utils import (
start_dask_client,
stop_dask_client,
enable_spilling,
)
import cugraph
from time import sleep
import pandas as pd


@get_allocation_counts_dask_persist(return_allocations=True, logging=False)
def construct_graph(dask_dataframe, directed=False, renumber=False):
"""
Args:
dask_dataframe:
dask_dataframe contains weighted and undirected edges with self
loops. Multiple edges will likely be present as well.
directed:
If True, the graph will be directed.
renumber:
If True, the graph will be renumbered.
Returns:
G: cugraph.Graph
"""
G = cugraph.Graph(directed=directed)
G.from_dask_cudf_edgelist(
dask_dataframe, source="src", destination="dst", renumber=renumber
)
return G


def benchmark_cugraph_graph_creation(scale, edgefactor, seed, directed, renumber):
"""
Entry point for the benchmark.
"""
dask_df = generate_edgelist_rmat(
scale=scale, edgefactor=edgefactor, seed=seed, unweighted=True, mg=True,
)
dask_df = dask_df.astype("int64")
dask_df = dask_df.reset_index(drop=True)
input_memory = dask_df.memory_usage().sum().compute()
num_input_edges = len(dask_df)
print(
f"Number of input edges = {num_input_edges:,}, directed = {directed}, renumber = {renumber}"
)
G, allocation_counts = construct_graph(
dask_df, directed=directed, renumber=renumber
)
(
input_to_peak_ratio,
output_to_peak_ratio,
input_memory_per_worker,
peak_allocation_across_workers,
) = get_memory_statistics(
allocation_counts=allocation_counts, input_memory=input_memory
)
print(f"Number of edges in final graph = {G.number_of_edges():,}")
print("-" * 80)
return (
num_input_edges,
input_to_peak_ratio,
output_to_peak_ratio,
input_memory_per_worker,
peak_allocation_across_workers,
)


def get_memory_statistics(allocation_counts, input_memory):
"""
Get memory statistics for the benchmark.
"""
output_to_peak_ratio = get_peak_output_ratio_across_workers(allocation_counts)
peak_allocation_across_workers = max(
[a["peak_bytes"] for a in allocation_counts.values()]
)
input_memory_per_worker = input_memory / len(allocation_counts.keys())
input_to_peak_ratio = peak_allocation_across_workers / input_memory_per_worker
print(f"Edge List Memory = {sizeof_fmt(input_memory_per_worker)}")
print(f"Peak Memory across workers = {sizeof_fmt(peak_allocation_across_workers)}")
print(f"Max Peak to output graph ratio across workers = {output_to_peak_ratio:.2f}")
print(
f"Max Peak to avg input graph ratio across workers = {input_to_peak_ratio:.2f}"
)
return (
input_to_peak_ratio,
output_to_peak_ratio,
input_memory_per_worker,
peak_allocation_across_workers,
)

# call __main__ function
if __name__ == "__main__":
client, cluster = start_dask_client(dask_worker_devices=[1], jit_unspill=False)
enable_spilling()
stats_ls = []
client.run(enable_spilling)
for scale in [22, 23, 24]:
for directed in [True, False]:
for renumber in [True, False]:
try:
stats_d = {}
(
num_input_edges,
input_to_peak_ratio,
output_to_peak_ratio,
input_memory_per_worker,
peak_allocation_across_workers,
) = benchmark_cugraph_graph_creation(
scale=scale,
edgefactor=16,
seed=123,
directed=directed,
renumber=renumber,
)
stats_d["scale"] = scale
stats_d["num_input_edges"] = num_input_edges
stats_d["directed"] = directed
stats_d["renumber"] = renumber
stats_d["input_memory_per_worker"] = sizeof_fmt(input_memory_per_worker)
stats_d["peak_allocation_across_workers"] = sizeof_fmt(
peak_allocation_across_workers
)
stats_d["input_to_peak_ratio"] = input_to_peak_ratio
stats_d["output_to_peak_ratio"] = output_to_peak_ratio
stats_ls.append(stats_d)
except Exception as e:
print(e)
restart_client(client)
sleep(10)

print("-" * 40 + f"renumber completed" + "-" * 40)

stats_df = pd.DataFrame(
stats_ls,
columns=[
"scale",
"num_input_edges",
"directed",
"renumber",
"input_memory_per_worker",
"peak_allocation_across_workers",
"input_to_peak_ratio",
"output_to_peak_ratio",
],
)
stats_df.to_csv("cugraph_graph_creation_stats.csv")
print("-" * 40 + f"scale = {scale} completed" + "-" * 40)

# Cleanup Dask Cluster
stop_dask_client(client, cluster)
Loading