From 0b48642c4f3ddd4f74476b52c02ef78c1d79f35e Mon Sep 17 00:00:00 2001 From: rjzamora Date: Tue, 30 Jul 2024 11:53:17 -0700 Subject: [PATCH 01/11] add new remote-io benchmark --- dask_cuda/benchmarks/custom/__init__.py | 1 + dask_cuda/benchmarks/custom/parquet.py | 153 +++++++++++++++++++ dask_cuda/benchmarks/remote_parquet.py | 187 ++++++++++++++++++++++++ 3 files changed, 341 insertions(+) create mode 100644 dask_cuda/benchmarks/custom/__init__.py create mode 100644 dask_cuda/benchmarks/custom/parquet.py create mode 100644 dask_cuda/benchmarks/remote_parquet.py diff --git a/dask_cuda/benchmarks/custom/__init__.py b/dask_cuda/benchmarks/custom/__init__.py new file mode 100644 index 000000000..cfd5f0de3 --- /dev/null +++ b/dask_cuda/benchmarks/custom/__init__.py @@ -0,0 +1 @@ +from .parquet import read_parquet as custom_read_parquet diff --git a/dask_cuda/benchmarks/custom/parquet.py b/dask_cuda/benchmarks/custom/parquet.py new file mode 100644 index 000000000..fa5075bc0 --- /dev/null +++ b/dask_cuda/benchmarks/custom/parquet.py @@ -0,0 +1,153 @@ +import math +import os +from concurrent.futures import ThreadPoolExecutor + +import numpy as np +import pyarrow as pa +import pyarrow.parquet as pq +from pyarrow import dataset + +import dask +import dask.dataframe as dd +from dask.base import apply, tokenize +from dask.distributed import get_worker +from dask.utils import parse_bytes + +# NOTE: The pyarrow component of this code was mostly copied +# from dask-expr (dask_expr/io/parquet.py) + + +_CPU_COUNT_SET = False + + +def _maybe_adjust_cpu_count(): + global _CPU_COUNT_SET + if not _CPU_COUNT_SET: + # Set the number of threads to the number of cores + # This is a default for pyarrow, but it's not set by default in + # dask/distributed + pa.set_cpu_count(os.cpu_count()) + _CPU_COUNT_SET = True + + +def fragment_to_table(fragment, filters=None, columns=None, schema=None): + _maybe_adjust_cpu_count() + + if isinstance(filters, list): + filters = pq.filters_to_expression(filters) + + return fragment.to_table( + schema=schema, + columns=columns, + filter=filters, + # Batch size determines how many rows are read at once and will + # cause the underlying array to be split into chunks of this size + # (max). We'd like to avoid fragmentation as much as possible and + # and to set this to something like inf but we have to set a finite, + # positive number. + # In the presence of row groups, the underlying array will still be + # chunked per rowgroup + batch_size=10_000_000, + fragment_scan_options=pa.dataset.ParquetFragmentScanOptions( + pre_buffer=True, + cache_options=pa.CacheOptions( + hole_size_limit=parse_bytes("4 MiB"), + range_size_limit=parse_bytes("32.00 MiB"), + ), + ), + use_threads=True, + ) + + +def tables_to_frame(tables): + import cudf + + return cudf.DataFrame.from_arrow( + pa.concat_tables(tables) if len(tables) > 1 else tables[0] + ) + + +def read_parquet_fragments(fragments, columns=None, filters=None): + + kwargs = {"columns": columns, "filters": filters} + if not isinstance(fragments, list): + fragments = [fragments] + + if len(fragments) > 1: + # Read multiple fragments + token = tokenize(fragments, columns, filters) + chunk_name = f"read-chunk-{token}" + dsk = { + (chunk_name, i): (apply, fragment_to_table, [fragment], kwargs) + for i, fragment in enumerate(fragments) + } + dsk[chunk_name] = (tables_to_frame, list(dsk.keys())) + + try: + worker = get_worker() + except ValueError: + return dask.threaded.get(dsk, chunk_name) + + if not hasattr(worker, "_rapids_executor"): + num_threads = len(os.sched_getaffinity(0)) + worker._rapids_executor = ThreadPoolExecutor(num_threads) + with dask.config.set(pool=worker._rapids_executor): + return dask.threaded.get(dsk, chunk_name) + + else: + # Read single fragment + return tables_to_frame([fragment_to_table(fragments[0], **kwargs)]) + + +def mean_file_size(fragments, n=10): + n_frags = len(fragments) + if n < n_frags: + indices = np.random.choice(np.arange(n_frags), size=n, replace=False) + else: + indices = np.arange(n_frags) + + sizes = [] + for f in indices: + size = 0 + frag = fragments[f] + for row_group in frag.row_groups: + size += row_group.total_byte_size + sizes.append(size) + + return np.mean(sizes) + + +def aggregate_fragments(fragments, blocksize): + size = mean_file_size(fragments) + blocksize = parse_bytes(blocksize) + stride = int(math.floor(blocksize / size)) + + if stride < 1: + pass # Not implemented yet! + + stride = max(stride, 1) + return [fragments[i : i + stride] for i in range(0, len(fragments), stride)] + + +def read_parquet(urlpath, columns=None, filters=None, blocksize="256MB", **kwargs): + + # Use pyarrow dataset API to get fragments and meta + ds = dataset.dataset(urlpath, format="parquet") + meta = tables_to_frame([ds.schema.empty_table()]) + if columns is not None: + meta = meta[columns] + fragments = list(ds.get_fragments()) + + # Aggregate fragments together if necessary + if blocksize: + fragments = aggregate_fragments(fragments, blocksize) + + # Construct collection + return dd.from_map( + read_parquet_fragments, + fragments, + columns=columns, + filters=filters, + meta=meta, + enforce_metadata=False, + ) diff --git a/dask_cuda/benchmarks/remote_parquet.py b/dask_cuda/benchmarks/remote_parquet.py new file mode 100644 index 000000000..8cd99ec77 --- /dev/null +++ b/dask_cuda/benchmarks/remote_parquet.py @@ -0,0 +1,187 @@ +import contextlib +from collections import ChainMap +from time import perf_counter as clock + +import pandas as pd + +import dask +import dask.dataframe as dd +from dask.distributed import performance_report +from dask.utils import format_bytes, parse_bytes + +from dask_cuda.benchmarks.common import Config, execute_benchmark +from dask_cuda.benchmarks.custom import custom_read_parquet +from dask_cuda.benchmarks.utils import ( + parse_benchmark_args, + print_key_value, + print_separator, + print_throughput_bandwidth, +) + +DEFAULT_DATASET_PATH = "s3://dask-cudf-parquet-testing/dedup_parquet" +DEFAULT_COLUMNS = ["text", "id"] +DEFAULT_STORAGE_SIZE = 2_843_373_145 # Compressed byte size + + +def read_data( + backend, + filesystem, + aggregate_files, + blocksize, +): + path = DEFAULT_DATASET_PATH + columns = DEFAULT_COLUMNS + with dask.config.set({"dataframe.backend": backend}): + if filesystem == "arrow" and backend == "cudf": + df = custom_read_parquet( + path, + columns=columns, + blocksize=blocksize, + ) + else: + if filesystem == "arrow": + # TODO: Warn user that blocksize and aggregate_files + # are ingored when `filesystem == "arrow"` + _blocksize = {} + _aggregate_files = {} + else: + _blocksize = {"blocksize": blocksize} + _aggregate_files = {"aggregate_files": aggregate_files} + + df = dd.read_parquet( + path, + columns=columns, + filesystem=filesystem, + **_blocksize, + **_aggregate_files, + ) + return df.memory_usage().compute().sum() + + +def bench_once(client, args, write_profile=None): + + if write_profile is None: + ctx = contextlib.nullcontext() + else: + ctx = performance_report(filename=args.profile) + + with ctx: + t1 = clock() + output_size = read_data( + backend="cudf" if args.type == "gpu" else "pandas", + filesystem=args.filesystem, + aggregate_files=args.aggregate_files, + blocksize=args.blocksize, + ) + t2 = clock() + + return (DEFAULT_STORAGE_SIZE, output_size, t2 - t1) + + +def pretty_print_results(args, address_to_index, p2p_bw, results): + if args.markdown: + print("```") + print("Remote Parquet benchmark") + print_separator(separator="-") + backend = "cudf" if args.type == "gpu" else "pandas" + print_key_value(key="Backend", value=f"{backend}") + print_key_value(key="Filesystem", value=f"{args.filesystem}") + print_key_value(key="Blocksize", value=f"{args.blocksize}") + print_key_value(key="Aggregate files", value=f"{args.aggregate_files}") + print_key_value(key="Output size", value=f"{format_bytes(results[0][1])}") + if args.markdown: + print("\n```") + data_processed, output_size, durations = zip(*results) + print_throughput_bandwidth( + args, durations, data_processed, p2p_bw, address_to_index + ) + + +def create_tidy_results(args, p2p_bw, results): + configuration = { + "backend": "cudf" if args.type == "gpu" else "pandas", + "filesystem": args.filesystem, + "blocksize": args.blocksize, + "aggregate_files": args.aggregate_files, + } + timing_data = pd.DataFrame( + [ + pd.Series( + data=ChainMap( + configuration, + { + "wallclock": duration, + "data_processed": data_processed, + "output_size": output_size, + }, + ) + ) + for data_processed, output_size, duration in results + ] + ) + return timing_data, p2p_bw + + +def parse_args(): + special_args = [ + { + "name": "--blocksize", + "default": "256MB", + "type": str, + "help": "How to set the blocksize option", + }, + { + "name": "--aggregate-files", + "default": False, + "action": "store_true", + "help": "How to set the aggregate_files option", + }, + { + "name": [ + "-t", + "--type", + ], + "choices": ["cpu", "gpu"], + "default": "gpu", + "type": str, + "help": "Use GPU or CPU dataframes (default 'gpu')", + }, + { + "name": "--filesystem", + "choices": ["arrow", "fsspec"], + "default": "fsspec", + "type": str, + "help": "Filesystem backend", + }, + { + "name": "--runs", + "default": 3, + "type": int, + "help": "Number of runs", + }, + # NOTE: The following args are not relevant to this benchmark + { + "name": "--ignore-size", + "default": "1 MiB", + "metavar": "nbytes", + "type": parse_bytes, + "help": "Ignore messages smaller than this (default '1 MB')", + }, + ] + + return parse_benchmark_args( + description="Remote Parquet (dask/cudf) benchmark", + args_list=special_args, + check_explicit_comms=False, + ) + + +if __name__ == "__main__": + execute_benchmark( + Config( + args=parse_args(), + bench_once=bench_once, + create_tidy_results=create_tidy_results, + pretty_print_results=pretty_print_results, + ) + ) From 150589031bfdf8b053705ebe3df88bc29936904a Mon Sep 17 00:00:00 2001 From: rjzamora Date: Tue, 30 Jul 2024 14:02:27 -0700 Subject: [PATCH 02/11] use fragment_parallelism --- dask_cuda/benchmarks/custom/parquet.py | 22 +++++++++++++++++++--- 1 file changed, 19 insertions(+), 3 deletions(-) diff --git a/dask_cuda/benchmarks/custom/parquet.py b/dask_cuda/benchmarks/custom/parquet.py index fa5075bc0..47bf69ef8 100644 --- a/dask_cuda/benchmarks/custom/parquet.py +++ b/dask_cuda/benchmarks/custom/parquet.py @@ -67,7 +67,12 @@ def tables_to_frame(tables): ) -def read_parquet_fragments(fragments, columns=None, filters=None): +def read_parquet_fragments( + fragments, + columns=None, + filters=None, + fragment_parallelism=None, +): kwargs = {"columns": columns, "filters": filters} if not isinstance(fragments, list): @@ -89,7 +94,11 @@ def read_parquet_fragments(fragments, columns=None, filters=None): return dask.threaded.get(dsk, chunk_name) if not hasattr(worker, "_rapids_executor"): - num_threads = len(os.sched_getaffinity(0)) + fragment_parallelism = fragment_parallelism or 8 + num_threads = min( + fragment_parallelism, + len(os.sched_getaffinity(0)), + ) worker._rapids_executor = ThreadPoolExecutor(num_threads) with dask.config.set(pool=worker._rapids_executor): return dask.threaded.get(dsk, chunk_name) @@ -129,7 +138,13 @@ def aggregate_fragments(fragments, blocksize): return [fragments[i : i + stride] for i in range(0, len(fragments), stride)] -def read_parquet(urlpath, columns=None, filters=None, blocksize="256MB", **kwargs): +def read_parquet( + urlpath, + columns=None, + filters=None, + blocksize="256MB", + fragment_parallelism=None, +): # Use pyarrow dataset API to get fragments and meta ds = dataset.dataset(urlpath, format="parquet") @@ -148,6 +163,7 @@ def read_parquet(urlpath, columns=None, filters=None, blocksize="256MB", **kwarg fragments, columns=columns, filters=filters, + fragment_parallelism=fragment_parallelism, meta=meta, enforce_metadata=False, ) From ad8df90b9cdab436bff001bb5562cb389a16001f Mon Sep 17 00:00:00 2001 From: rjzamora Date: Wed, 28 Aug 2024 12:48:07 -0700 Subject: [PATCH 03/11] remove custom arrow code path in favor of proper dask-cudf support --- dask_cuda/benchmarks/custom/__init__.py | 1 - dask_cuda/benchmarks/custom/parquet.py | 169 ------------------------ dask_cuda/benchmarks/remote_parquet.py | 37 +++--- 3 files changed, 15 insertions(+), 192 deletions(-) delete mode 100644 dask_cuda/benchmarks/custom/__init__.py delete mode 100644 dask_cuda/benchmarks/custom/parquet.py diff --git a/dask_cuda/benchmarks/custom/__init__.py b/dask_cuda/benchmarks/custom/__init__.py deleted file mode 100644 index cfd5f0de3..000000000 --- a/dask_cuda/benchmarks/custom/__init__.py +++ /dev/null @@ -1 +0,0 @@ -from .parquet import read_parquet as custom_read_parquet diff --git a/dask_cuda/benchmarks/custom/parquet.py b/dask_cuda/benchmarks/custom/parquet.py deleted file mode 100644 index 47bf69ef8..000000000 --- a/dask_cuda/benchmarks/custom/parquet.py +++ /dev/null @@ -1,169 +0,0 @@ -import math -import os -from concurrent.futures import ThreadPoolExecutor - -import numpy as np -import pyarrow as pa -import pyarrow.parquet as pq -from pyarrow import dataset - -import dask -import dask.dataframe as dd -from dask.base import apply, tokenize -from dask.distributed import get_worker -from dask.utils import parse_bytes - -# NOTE: The pyarrow component of this code was mostly copied -# from dask-expr (dask_expr/io/parquet.py) - - -_CPU_COUNT_SET = False - - -def _maybe_adjust_cpu_count(): - global _CPU_COUNT_SET - if not _CPU_COUNT_SET: - # Set the number of threads to the number of cores - # This is a default for pyarrow, but it's not set by default in - # dask/distributed - pa.set_cpu_count(os.cpu_count()) - _CPU_COUNT_SET = True - - -def fragment_to_table(fragment, filters=None, columns=None, schema=None): - _maybe_adjust_cpu_count() - - if isinstance(filters, list): - filters = pq.filters_to_expression(filters) - - return fragment.to_table( - schema=schema, - columns=columns, - filter=filters, - # Batch size determines how many rows are read at once and will - # cause the underlying array to be split into chunks of this size - # (max). We'd like to avoid fragmentation as much as possible and - # and to set this to something like inf but we have to set a finite, - # positive number. - # In the presence of row groups, the underlying array will still be - # chunked per rowgroup - batch_size=10_000_000, - fragment_scan_options=pa.dataset.ParquetFragmentScanOptions( - pre_buffer=True, - cache_options=pa.CacheOptions( - hole_size_limit=parse_bytes("4 MiB"), - range_size_limit=parse_bytes("32.00 MiB"), - ), - ), - use_threads=True, - ) - - -def tables_to_frame(tables): - import cudf - - return cudf.DataFrame.from_arrow( - pa.concat_tables(tables) if len(tables) > 1 else tables[0] - ) - - -def read_parquet_fragments( - fragments, - columns=None, - filters=None, - fragment_parallelism=None, -): - - kwargs = {"columns": columns, "filters": filters} - if not isinstance(fragments, list): - fragments = [fragments] - - if len(fragments) > 1: - # Read multiple fragments - token = tokenize(fragments, columns, filters) - chunk_name = f"read-chunk-{token}" - dsk = { - (chunk_name, i): (apply, fragment_to_table, [fragment], kwargs) - for i, fragment in enumerate(fragments) - } - dsk[chunk_name] = (tables_to_frame, list(dsk.keys())) - - try: - worker = get_worker() - except ValueError: - return dask.threaded.get(dsk, chunk_name) - - if not hasattr(worker, "_rapids_executor"): - fragment_parallelism = fragment_parallelism or 8 - num_threads = min( - fragment_parallelism, - len(os.sched_getaffinity(0)), - ) - worker._rapids_executor = ThreadPoolExecutor(num_threads) - with dask.config.set(pool=worker._rapids_executor): - return dask.threaded.get(dsk, chunk_name) - - else: - # Read single fragment - return tables_to_frame([fragment_to_table(fragments[0], **kwargs)]) - - -def mean_file_size(fragments, n=10): - n_frags = len(fragments) - if n < n_frags: - indices = np.random.choice(np.arange(n_frags), size=n, replace=False) - else: - indices = np.arange(n_frags) - - sizes = [] - for f in indices: - size = 0 - frag = fragments[f] - for row_group in frag.row_groups: - size += row_group.total_byte_size - sizes.append(size) - - return np.mean(sizes) - - -def aggregate_fragments(fragments, blocksize): - size = mean_file_size(fragments) - blocksize = parse_bytes(blocksize) - stride = int(math.floor(blocksize / size)) - - if stride < 1: - pass # Not implemented yet! - - stride = max(stride, 1) - return [fragments[i : i + stride] for i in range(0, len(fragments), stride)] - - -def read_parquet( - urlpath, - columns=None, - filters=None, - blocksize="256MB", - fragment_parallelism=None, -): - - # Use pyarrow dataset API to get fragments and meta - ds = dataset.dataset(urlpath, format="parquet") - meta = tables_to_frame([ds.schema.empty_table()]) - if columns is not None: - meta = meta[columns] - fragments = list(ds.get_fragments()) - - # Aggregate fragments together if necessary - if blocksize: - fragments = aggregate_fragments(fragments, blocksize) - - # Construct collection - return dd.from_map( - read_parquet_fragments, - fragments, - columns=columns, - filters=filters, - fragment_parallelism=fragment_parallelism, - meta=meta, - enforce_metadata=False, - ) diff --git a/dask_cuda/benchmarks/remote_parquet.py b/dask_cuda/benchmarks/remote_parquet.py index 8cd99ec77..24b905628 100644 --- a/dask_cuda/benchmarks/remote_parquet.py +++ b/dask_cuda/benchmarks/remote_parquet.py @@ -32,29 +32,22 @@ def read_data( path = DEFAULT_DATASET_PATH columns = DEFAULT_COLUMNS with dask.config.set({"dataframe.backend": backend}): - if filesystem == "arrow" and backend == "cudf": - df = custom_read_parquet( - path, - columns=columns, - blocksize=blocksize, - ) + if filesystem == "arrow": + # TODO: Warn user that blocksize and aggregate_files + # are ingored when `filesystem == "arrow"` + _blocksize = {} + _aggregate_files = {} else: - if filesystem == "arrow": - # TODO: Warn user that blocksize and aggregate_files - # are ingored when `filesystem == "arrow"` - _blocksize = {} - _aggregate_files = {} - else: - _blocksize = {"blocksize": blocksize} - _aggregate_files = {"aggregate_files": aggregate_files} - - df = dd.read_parquet( - path, - columns=columns, - filesystem=filesystem, - **_blocksize, - **_aggregate_files, - ) + _blocksize = {"blocksize": blocksize} + _aggregate_files = {"aggregate_files": aggregate_files} + + df = dd.read_parquet( + path, + columns=columns, + filesystem=filesystem, + **_blocksize, + **_aggregate_files, + ) return df.memory_usage().compute().sum() From 5b24195ad96b55dc950b8e4a87815e2098b01961 Mon Sep 17 00:00:00 2001 From: rjzamora Date: Wed, 28 Aug 2024 12:48:54 -0700 Subject: [PATCH 04/11] fix typo --- dask_cuda/benchmarks/remote_parquet.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/dask_cuda/benchmarks/remote_parquet.py b/dask_cuda/benchmarks/remote_parquet.py index 24b905628..45550acc1 100644 --- a/dask_cuda/benchmarks/remote_parquet.py +++ b/dask_cuda/benchmarks/remote_parquet.py @@ -10,7 +10,6 @@ from dask.utils import format_bytes, parse_bytes from dask_cuda.benchmarks.common import Config, execute_benchmark -from dask_cuda.benchmarks.custom import custom_read_parquet from dask_cuda.benchmarks.utils import ( parse_benchmark_args, print_key_value, @@ -34,7 +33,7 @@ def read_data( with dask.config.set({"dataframe.backend": backend}): if filesystem == "arrow": # TODO: Warn user that blocksize and aggregate_files - # are ingored when `filesystem == "arrow"` + # are ignored when `filesystem == "arrow"` _blocksize = {} _aggregate_files = {} else: From 4b045de4b4792d6f460ed02d197ea8ffd61827aa Mon Sep 17 00:00:00 2001 From: rjzamora Date: Thu, 29 Aug 2024 08:49:10 -0700 Subject: [PATCH 05/11] rename file and generalize --- dask_cuda/benchmarks/local_read_parquet.py | 272 +++++++++++++++++++++ dask_cuda/benchmarks/remote_parquet.py | 179 -------------- 2 files changed, 272 insertions(+), 179 deletions(-) create mode 100644 dask_cuda/benchmarks/local_read_parquet.py delete mode 100644 dask_cuda/benchmarks/remote_parquet.py diff --git a/dask_cuda/benchmarks/local_read_parquet.py b/dask_cuda/benchmarks/local_read_parquet.py new file mode 100644 index 000000000..7dcf0343a --- /dev/null +++ b/dask_cuda/benchmarks/local_read_parquet.py @@ -0,0 +1,272 @@ +import contextlib +from collections import ChainMap +from time import perf_counter as clock + +import fsspec +import pandas as pd + +import dask +import dask.dataframe as dd +from dask.base import tokenize +from dask.distributed import performance_report +from dask.utils import format_bytes, parse_bytes + +from dask_cuda.benchmarks.common import Config, execute_benchmark +from dask_cuda.benchmarks.utils import ( + parse_benchmark_args, + print_key_value, + print_separator, + print_throughput_bandwidth, +) + +DISK_SIZE_CACHE = {} +OPTIONS_CACHE = {} + + +def _noop(df): + return df + + +def read_data(paths, columns, backend, **kwargs): + with dask.config.set({"dataframe.backend": backend}): + return dd.read_parquet( + paths, + columns=columns, + **kwargs, + ) + + +def get_fs_paths_kwargs(args): + kwargs = {} + + storage_options = {} + if args.key: + storage_options["key"] = args.key + if args.secret: + storage_options["secret"] = args.secret + + if args.filesystem == "arrow": + import pyarrow.fs as pa_fs + from fsspec.implementations.arrow import ArrowFSWrapper + + _mapping = { + "key": "access_key", + "secret": "secret_key", + } # See: pyarrow.fs.S3FileSystem docs + s3_args = {} + for k, v in storage_options.items(): + s3_args[_mapping[k]] = v + + fs = pa_fs.FileSystem.from_uri(args.path)[0] + kwargs["filesystem"] = type(fs)(**s3_args) + fsspec_fs = ArrowFSWrapper(kwargs["filesystem"]) + paths = fsspec_fs.glob(f"{args.path}/*.parquet") + + if args.type == "gpu": + kwargs["blocksize"] = args.blocksize + + if args.aggregate_files: + raise NotImplementedError( + "aggregate-files is not supported for filesystem='arrow'" + ) + else: + fsspec_fs = fsspec.core.get_fs_token_paths( + args.path, mode="rb", storage_options=storage_options + )[0] + kwargs["filesystem"] = fsspec_fs + paths = fsspec_fs.glob(f"{args.path}/*.parquet") + + kwargs["blocksize"] = args.blocksize + kwargs["aggregate_files"] = args.aggregate_files + + if args.file_count: + paths = paths[: args.file_count] + + return fsspec_fs, paths, kwargs + + +def bench_once(client, args, write_profile=None): + global OPTIONS_CACHE + global DISK_SIZE_CACHE + + # Construct kwargs + token = tokenize(args) + try: + fsspec_fs, paths, kwargs = OPTIONS_CACHE[token] + except KeyError: + fsspec_fs, paths, kwargs = get_fs_paths_kwargs(args) + OPTIONS_CACHE[token] = (fsspec_fs, paths, kwargs) + + if write_profile is None: + ctx = contextlib.nullcontext() + else: + ctx = performance_report(filename=args.profile) + + with ctx: + t1 = clock() + df = read_data( + paths, + columns=args.columns, + backend="cudf" if args.type == "gpu" else "pandas", + **kwargs, + ) + num_rows = len( + # Use opaque `map_partitions` call to "block" + # dask-expr from using pq metadata to get length + df.map_partitions( + _noop, + meta=df._meta, + enforce_metadata=False, + ) + ) + t2 = clock() + + # Extract total size of files on disk + token = tokenize(paths) + try: + disk_size = DISK_SIZE_CACHE[token] + except KeyError: + disk_size = sum(fsspec_fs.sizes(paths)) + DISK_SIZE_CACHE[token] = disk_size + + return (disk_size, num_rows, t2 - t1) + + +def pretty_print_results(args, address_to_index, p2p_bw, results): + if args.markdown: + print("```") + print("Remote Parquet benchmark") + data_processed, row_count, durations = zip(*results) + print_separator(separator="-") + backend = "cudf" if args.type == "gpu" else "pandas" + print_key_value(key="Path", value=args.path) + print_key_value(key="Columns", value=f"{args.columns}") + print_key_value(key="Backend", value=f"{backend}") + print_key_value(key="Filesystem", value=f"{args.filesystem}") + print_key_value(key="Blocksize", value=f"{format_bytes(args.blocksize)}") + print_key_value(key="Aggregate files", value=f"{args.aggregate_files}") + print_key_value(key="Row count", value=f"{row_count[0]}") + print_key_value(key="Size on disk", value=f"{format_bytes(data_processed[0])}") + if args.markdown: + print("\n```") + print_throughput_bandwidth( + args, durations, data_processed, p2p_bw, address_to_index + ) + + +def create_tidy_results(args, p2p_bw, results): + configuration = { + "path": args.path, + "columns": args.columns, + "backend": "cudf" if args.type == "gpu" else "pandas", + "filesystem": args.filesystem, + "blocksize": args.blocksize, + "aggregate_files": args.aggregate_files, + } + timing_data = pd.DataFrame( + [ + pd.Series( + data=ChainMap( + configuration, + { + "wallclock": duration, + "data_processed": data_processed, + "num_rows": num_rows, + }, + ) + ) + for data_processed, num_rows, duration in results + ] + ) + return timing_data, p2p_bw + + +def parse_args(): + special_args = [ + { + "name": "--blocksize", + "default": "256MB", + "type": parse_bytes, + "help": "How to set the blocksize option", + }, + { + "name": "--aggregate-files", + "default": False, + "action": "store_true", + "help": "How to set the aggregate_files option", + }, + { + "name": "--path", + "default": "s3://dask-cudf-parquet-testing/dedup_parquet", + "type": str, + "help": "Parquet directory to read from (must be a flat directory).", + }, + { + "name": "--file-count", + "type": int, + "help": "Maximum number of files to read.", + }, + { + "name": "--columns", + "type": str, + "help": "Columns to read/select from data.", + }, + { + "name": "--key", + "type": str, + "help": "Public S3 key.", + }, + { + "name": "--secret", + "type": str, + "help": "Secret S3 key.", + }, + { + "name": [ + "-t", + "--type", + ], + "choices": ["cpu", "gpu"], + "default": "gpu", + "type": str, + "help": "Use GPU or CPU dataframes (default 'gpu')", + }, + { + "name": "--filesystem", + "choices": ["arrow", "fsspec"], + "default": "fsspec", + "type": str, + "help": "Filesystem backend", + }, + { + "name": "--runs", + "default": 3, + "type": int, + "help": "Number of runs", + }, + # NOTE: The following args are not relevant to this benchmark + { + "name": "--ignore-size", + "default": "1 MiB", + "metavar": "nbytes", + "type": parse_bytes, + "help": "Ignore messages smaller than this (default '1 MB')", + }, + ] + + return parse_benchmark_args( + description="Parquet read benchmark", + args_list=special_args, + check_explicit_comms=False, + ) + + +if __name__ == "__main__": + execute_benchmark( + Config( + args=parse_args(), + bench_once=bench_once, + create_tidy_results=create_tidy_results, + pretty_print_results=pretty_print_results, + ) + ) diff --git a/dask_cuda/benchmarks/remote_parquet.py b/dask_cuda/benchmarks/remote_parquet.py deleted file mode 100644 index 45550acc1..000000000 --- a/dask_cuda/benchmarks/remote_parquet.py +++ /dev/null @@ -1,179 +0,0 @@ -import contextlib -from collections import ChainMap -from time import perf_counter as clock - -import pandas as pd - -import dask -import dask.dataframe as dd -from dask.distributed import performance_report -from dask.utils import format_bytes, parse_bytes - -from dask_cuda.benchmarks.common import Config, execute_benchmark -from dask_cuda.benchmarks.utils import ( - parse_benchmark_args, - print_key_value, - print_separator, - print_throughput_bandwidth, -) - -DEFAULT_DATASET_PATH = "s3://dask-cudf-parquet-testing/dedup_parquet" -DEFAULT_COLUMNS = ["text", "id"] -DEFAULT_STORAGE_SIZE = 2_843_373_145 # Compressed byte size - - -def read_data( - backend, - filesystem, - aggregate_files, - blocksize, -): - path = DEFAULT_DATASET_PATH - columns = DEFAULT_COLUMNS - with dask.config.set({"dataframe.backend": backend}): - if filesystem == "arrow": - # TODO: Warn user that blocksize and aggregate_files - # are ignored when `filesystem == "arrow"` - _blocksize = {} - _aggregate_files = {} - else: - _blocksize = {"blocksize": blocksize} - _aggregate_files = {"aggregate_files": aggregate_files} - - df = dd.read_parquet( - path, - columns=columns, - filesystem=filesystem, - **_blocksize, - **_aggregate_files, - ) - return df.memory_usage().compute().sum() - - -def bench_once(client, args, write_profile=None): - - if write_profile is None: - ctx = contextlib.nullcontext() - else: - ctx = performance_report(filename=args.profile) - - with ctx: - t1 = clock() - output_size = read_data( - backend="cudf" if args.type == "gpu" else "pandas", - filesystem=args.filesystem, - aggregate_files=args.aggregate_files, - blocksize=args.blocksize, - ) - t2 = clock() - - return (DEFAULT_STORAGE_SIZE, output_size, t2 - t1) - - -def pretty_print_results(args, address_to_index, p2p_bw, results): - if args.markdown: - print("```") - print("Remote Parquet benchmark") - print_separator(separator="-") - backend = "cudf" if args.type == "gpu" else "pandas" - print_key_value(key="Backend", value=f"{backend}") - print_key_value(key="Filesystem", value=f"{args.filesystem}") - print_key_value(key="Blocksize", value=f"{args.blocksize}") - print_key_value(key="Aggregate files", value=f"{args.aggregate_files}") - print_key_value(key="Output size", value=f"{format_bytes(results[0][1])}") - if args.markdown: - print("\n```") - data_processed, output_size, durations = zip(*results) - print_throughput_bandwidth( - args, durations, data_processed, p2p_bw, address_to_index - ) - - -def create_tidy_results(args, p2p_bw, results): - configuration = { - "backend": "cudf" if args.type == "gpu" else "pandas", - "filesystem": args.filesystem, - "blocksize": args.blocksize, - "aggregate_files": args.aggregate_files, - } - timing_data = pd.DataFrame( - [ - pd.Series( - data=ChainMap( - configuration, - { - "wallclock": duration, - "data_processed": data_processed, - "output_size": output_size, - }, - ) - ) - for data_processed, output_size, duration in results - ] - ) - return timing_data, p2p_bw - - -def parse_args(): - special_args = [ - { - "name": "--blocksize", - "default": "256MB", - "type": str, - "help": "How to set the blocksize option", - }, - { - "name": "--aggregate-files", - "default": False, - "action": "store_true", - "help": "How to set the aggregate_files option", - }, - { - "name": [ - "-t", - "--type", - ], - "choices": ["cpu", "gpu"], - "default": "gpu", - "type": str, - "help": "Use GPU or CPU dataframes (default 'gpu')", - }, - { - "name": "--filesystem", - "choices": ["arrow", "fsspec"], - "default": "fsspec", - "type": str, - "help": "Filesystem backend", - }, - { - "name": "--runs", - "default": 3, - "type": int, - "help": "Number of runs", - }, - # NOTE: The following args are not relevant to this benchmark - { - "name": "--ignore-size", - "default": "1 MiB", - "metavar": "nbytes", - "type": parse_bytes, - "help": "Ignore messages smaller than this (default '1 MB')", - }, - ] - - return parse_benchmark_args( - description="Remote Parquet (dask/cudf) benchmark", - args_list=special_args, - check_explicit_comms=False, - ) - - -if __name__ == "__main__": - execute_benchmark( - Config( - args=parse_args(), - bench_once=bench_once, - create_tidy_results=create_tidy_results, - pretty_print_results=pretty_print_results, - ) - ) From 31f5a8a01fa49b7ac444d1724cc511026f3ac2b3 Mon Sep 17 00:00:00 2001 From: rjzamora Date: Thu, 29 Aug 2024 08:57:26 -0700 Subject: [PATCH 06/11] fix benchmark name --- dask_cuda/benchmarks/local_read_parquet.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dask_cuda/benchmarks/local_read_parquet.py b/dask_cuda/benchmarks/local_read_parquet.py index 7dcf0343a..61d65e40e 100644 --- a/dask_cuda/benchmarks/local_read_parquet.py +++ b/dask_cuda/benchmarks/local_read_parquet.py @@ -135,7 +135,7 @@ def bench_once(client, args, write_profile=None): def pretty_print_results(args, address_to_index, p2p_bw, results): if args.markdown: print("```") - print("Remote Parquet benchmark") + print("Parquet read benchmark") data_processed, row_count, durations = zip(*results) print_separator(separator="-") backend = "cudf" if args.type == "gpu" else "pandas" From a15aa358eba6e0fb898e42950d21ef3f7e5293c2 Mon Sep 17 00:00:00 2001 From: rjzamora Date: Thu, 29 Aug 2024 09:33:07 -0700 Subject: [PATCH 07/11] fix case where path ends in slash --- dask_cuda/benchmarks/local_read_parquet.py | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/dask_cuda/benchmarks/local_read_parquet.py b/dask_cuda/benchmarks/local_read_parquet.py index 61d65e40e..2241451b2 100644 --- a/dask_cuda/benchmarks/local_read_parquet.py +++ b/dask_cuda/benchmarks/local_read_parquet.py @@ -58,9 +58,12 @@ def get_fs_paths_kwargs(args): s3_args[_mapping[k]] = v fs = pa_fs.FileSystem.from_uri(args.path)[0] - kwargs["filesystem"] = type(fs)(**s3_args) + try: + region = {"region": fs.region} + except AttributeError: + region = {} + kwargs["filesystem"] = type(fs)(**region, **s3_args) fsspec_fs = ArrowFSWrapper(kwargs["filesystem"]) - paths = fsspec_fs.glob(f"{args.path}/*.parquet") if args.type == "gpu": kwargs["blocksize"] = args.blocksize @@ -74,11 +77,14 @@ def get_fs_paths_kwargs(args): args.path, mode="rb", storage_options=storage_options )[0] kwargs["filesystem"] = fsspec_fs - paths = fsspec_fs.glob(f"{args.path}/*.parquet") - kwargs["blocksize"] = args.blocksize kwargs["aggregate_files"] = args.aggregate_files + # Collect list of paths + stripped_url_path = fsspec_fs._strip_protocol(args.path) + if stripped_url_path.endswith("/"): + stripped_url_path = stripped_url_path[:-1] + paths = fsspec_fs.glob(f"{stripped_url_path}/*.parquet") if args.file_count: paths = paths[: args.file_count] From 6f3e5c5252adb621c328a50f7e4ea8e5a779b793 Mon Sep 17 00:00:00 2001 From: rjzamora Date: Thu, 29 Aug 2024 17:25:07 -0700 Subject: [PATCH 08/11] address code review --- dask_cuda/benchmarks/local_cudf_groupby.py | 9 +-------- dask_cuda/benchmarks/local_cudf_merge.py | 9 +-------- dask_cuda/benchmarks/local_cudf_shuffle.py | 7 ------- dask_cuda/benchmarks/local_cupy.py | 9 +-------- dask_cuda/benchmarks/local_cupy_map_overlap.py | 9 +-------- .../{local_read_parquet.py => read_parquet.py} | 9 --------- dask_cuda/benchmarks/utils.py | 7 +++++++ 7 files changed, 11 insertions(+), 48 deletions(-) rename dask_cuda/benchmarks/{local_read_parquet.py => read_parquet.py} (95%) diff --git a/dask_cuda/benchmarks/local_cudf_groupby.py b/dask_cuda/benchmarks/local_cudf_groupby.py index 2f07e3df7..f094ff185 100644 --- a/dask_cuda/benchmarks/local_cudf_groupby.py +++ b/dask_cuda/benchmarks/local_cudf_groupby.py @@ -7,7 +7,7 @@ import dask import dask.dataframe as dd from dask.distributed import performance_report, wait -from dask.utils import format_bytes, parse_bytes +from dask.utils import format_bytes from dask_cuda.benchmarks.common import Config, execute_benchmark from dask_cuda.benchmarks.utils import ( @@ -260,13 +260,6 @@ def parse_args(): "type": str, "help": "Do shuffle with GPU or CPU dataframes (default 'gpu')", }, - { - "name": "--ignore-size", - "default": "1 MiB", - "metavar": "nbytes", - "type": parse_bytes, - "help": "Ignore messages smaller than this (default '1 MB')", - }, { "name": "--runs", "default": 3, diff --git a/dask_cuda/benchmarks/local_cudf_merge.py b/dask_cuda/benchmarks/local_cudf_merge.py index 6a68ad788..e2b035204 100644 --- a/dask_cuda/benchmarks/local_cudf_merge.py +++ b/dask_cuda/benchmarks/local_cudf_merge.py @@ -9,7 +9,7 @@ import dask import dask.dataframe as dd from dask.distributed import performance_report, wait -from dask.utils import format_bytes, parse_bytes +from dask.utils import format_bytes from dask_cuda.benchmarks.common import Config, execute_benchmark from dask_cuda.benchmarks.utils import ( @@ -335,13 +335,6 @@ def parse_args(): "action": "store_true", "help": "Use shuffle join (takes precedence over '--broadcast-join').", }, - { - "name": "--ignore-size", - "default": "1 MiB", - "metavar": "nbytes", - "type": parse_bytes, - "help": "Ignore messages smaller than this (default '1 MB')", - }, { "name": "--frac-match", "default": 0.3, diff --git a/dask_cuda/benchmarks/local_cudf_shuffle.py b/dask_cuda/benchmarks/local_cudf_shuffle.py index a1129dd37..25f42e59d 100644 --- a/dask_cuda/benchmarks/local_cudf_shuffle.py +++ b/dask_cuda/benchmarks/local_cudf_shuffle.py @@ -228,13 +228,6 @@ def parse_args(): "type": str, "help": "Do shuffle with GPU or CPU dataframes (default 'gpu')", }, - { - "name": "--ignore-size", - "default": "1 MiB", - "metavar": "nbytes", - "type": parse_bytes, - "help": "Ignore messages smaller than this (default '1 MB')", - }, { "name": "--runs", "default": 3, diff --git a/dask_cuda/benchmarks/local_cupy.py b/dask_cuda/benchmarks/local_cupy.py index 22c51556f..c9c8fe1c1 100644 --- a/dask_cuda/benchmarks/local_cupy.py +++ b/dask_cuda/benchmarks/local_cupy.py @@ -8,7 +8,7 @@ from dask import array as da from dask.distributed import performance_report, wait -from dask.utils import format_bytes, parse_bytes +from dask.utils import format_bytes from dask_cuda.benchmarks.common import Config, execute_benchmark from dask_cuda.benchmarks.utils import ( @@ -297,13 +297,6 @@ def parse_args(): "type": int, "help": "Chunk size (default 2500).", }, - { - "name": "--ignore-size", - "default": "1 MiB", - "metavar": "nbytes", - "type": parse_bytes, - "help": "Ignore messages smaller than this (default '1 MB').", - }, { "name": "--runs", "default": 3, diff --git a/dask_cuda/benchmarks/local_cupy_map_overlap.py b/dask_cuda/benchmarks/local_cupy_map_overlap.py index 8250c9f9f..8b975a24a 100644 --- a/dask_cuda/benchmarks/local_cupy_map_overlap.py +++ b/dask_cuda/benchmarks/local_cupy_map_overlap.py @@ -10,7 +10,7 @@ from dask import array as da from dask.distributed import performance_report, wait -from dask.utils import format_bytes, parse_bytes +from dask.utils import format_bytes from dask_cuda.benchmarks.common import Config, execute_benchmark from dask_cuda.benchmarks.utils import ( @@ -168,13 +168,6 @@ def parse_args(): "type": int, "help": "Kernel size, 2*k+1, in each dimension (default 1)", }, - { - "name": "--ignore-size", - "default": "1 MiB", - "metavar": "nbytes", - "type": parse_bytes, - "help": "Ignore messages smaller than this (default '1 MB')", - }, { "name": "--runs", "default": 3, diff --git a/dask_cuda/benchmarks/local_read_parquet.py b/dask_cuda/benchmarks/read_parquet.py similarity index 95% rename from dask_cuda/benchmarks/local_read_parquet.py rename to dask_cuda/benchmarks/read_parquet.py index 2241451b2..43174af64 100644 --- a/dask_cuda/benchmarks/local_read_parquet.py +++ b/dask_cuda/benchmarks/read_parquet.py @@ -203,7 +203,6 @@ def parse_args(): }, { "name": "--path", - "default": "s3://dask-cudf-parquet-testing/dedup_parquet", "type": str, "help": "Parquet directory to read from (must be a flat directory).", }, @@ -250,14 +249,6 @@ def parse_args(): "type": int, "help": "Number of runs", }, - # NOTE: The following args are not relevant to this benchmark - { - "name": "--ignore-size", - "default": "1 MiB", - "metavar": "nbytes", - "type": parse_bytes, - "help": "Ignore messages smaller than this (default '1 MB')", - }, ] return parse_benchmark_args( diff --git a/dask_cuda/benchmarks/utils.py b/dask_cuda/benchmarks/utils.py index 48e4755fb..fcb050f95 100644 --- a/dask_cuda/benchmarks/utils.py +++ b/dask_cuda/benchmarks/utils.py @@ -337,6 +337,13 @@ def parse_benchmark_args( "If the files already exist, new files are created with a uniquified " "BASENAME.", ) + parser.add_argument( + "--ignore-size", + default="1 MiB", + metavar="nbytes", + type=parse_bytes, + help="Ignore messages smaller than this (default '1 MB')", + ) for args in args_list: name = args.pop("name") From 8745e1d7e1c15b4e01943502a55cd37362e16ce8 Mon Sep 17 00:00:00 2001 From: rjzamora Date: Thu, 29 Aug 2024 17:42:36 -0700 Subject: [PATCH 09/11] fix output --- dask_cuda/benchmarks/read_parquet.py | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/dask_cuda/benchmarks/read_parquet.py b/dask_cuda/benchmarks/read_parquet.py index 43174af64..21435ef0d 100644 --- a/dask_cuda/benchmarks/read_parquet.py +++ b/dask_cuda/benchmarks/read_parquet.py @@ -67,11 +67,6 @@ def get_fs_paths_kwargs(args): if args.type == "gpu": kwargs["blocksize"] = args.blocksize - - if args.aggregate_files: - raise NotImplementedError( - "aggregate-files is not supported for filesystem='arrow'" - ) else: fsspec_fs = fsspec.core.get_fs_token_paths( args.path, mode="rb", storage_options=storage_options @@ -155,9 +150,11 @@ def pretty_print_results(args, address_to_index, p2p_bw, results): print_key_value(key="Size on disk", value=f"{format_bytes(data_processed[0])}") if args.markdown: print("\n```") + args.no_show_p2p_bandwidth = True print_throughput_bandwidth( args, durations, data_processed, p2p_bw, address_to_index ) + print_separator(separator="=") def create_tidy_results(args, p2p_bw, results): @@ -189,6 +186,12 @@ def create_tidy_results(args, p2p_bw, results): def parse_args(): special_args = [ + { + "name": "path", + # "required": True, + "type": str, + "help": "Parquet directory to read from (must be a flat directory).", + }, { "name": "--blocksize", "default": "256MB", @@ -201,11 +204,6 @@ def parse_args(): "action": "store_true", "help": "How to set the aggregate_files option", }, - { - "name": "--path", - "type": str, - "help": "Parquet directory to read from (must be a flat directory).", - }, { "name": "--file-count", "type": int, @@ -251,11 +249,13 @@ def parse_args(): }, ] - return parse_benchmark_args( + args = parse_benchmark_args( description="Parquet read benchmark", args_list=special_args, check_explicit_comms=False, ) + args.no_show_p2p_bandwidth = True + return args if __name__ == "__main__": From 53065751e1500615812891340e98ac614d804b5e Mon Sep 17 00:00:00 2001 From: "Richard (Rick) Zamora" Date: Fri, 30 Aug 2024 08:16:23 -0500 Subject: [PATCH 10/11] Update dask_cuda/benchmarks/utils.py Co-authored-by: Mads R. B. Kristensen --- dask_cuda/benchmarks/utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dask_cuda/benchmarks/utils.py b/dask_cuda/benchmarks/utils.py index fcb050f95..5b9448d48 100644 --- a/dask_cuda/benchmarks/utils.py +++ b/dask_cuda/benchmarks/utils.py @@ -342,7 +342,7 @@ def parse_benchmark_args( default="1 MiB", metavar="nbytes", type=parse_bytes, - help="Ignore messages smaller than this (default '1 MB')", + help="Bandwidth statistics: ignore messages smaller than this (default '1 MB')", ) for args in args_list: From 5f2937b8306491b29882fb846460f5551c737195 Mon Sep 17 00:00:00 2001 From: "Richard (Rick) Zamora" Date: Fri, 30 Aug 2024 08:18:57 -0500 Subject: [PATCH 11/11] Update dask_cuda/benchmarks/read_parquet.py --- dask_cuda/benchmarks/read_parquet.py | 1 - 1 file changed, 1 deletion(-) diff --git a/dask_cuda/benchmarks/read_parquet.py b/dask_cuda/benchmarks/read_parquet.py index 21435ef0d..bce696737 100644 --- a/dask_cuda/benchmarks/read_parquet.py +++ b/dask_cuda/benchmarks/read_parquet.py @@ -188,7 +188,6 @@ def parse_args(): special_args = [ { "name": "path", - # "required": True, "type": str, "help": "Parquet directory to read from (must be a flat directory).", },