Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add num_iterations axis to the multi-threaded Parquet benchmarks #17231

Merged
merged 1 commit into from
Nov 2, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 38 additions & 19 deletions cpp/benchmarks/io/parquet/parquet_reader_multithread.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ std::string get_label(std::string const& test_name, nvbench::state const& state)
auto const num_cols = state.get_int64("num_cols");
size_t const read_size_mb = get_read_size(state) / (1024 * 1024);
return {test_name + ", " + std::to_string(num_cols) + " columns, " +
std::to_string(state.get_int64("num_iterations")) + " iterations, " +
std::to_string(state.get_int64("num_threads")) + " threads " + " (" +
std::to_string(read_size_mb) + " MB each)"};
}
Expand Down Expand Up @@ -90,9 +91,10 @@ void BM_parquet_multithreaded_read_common(nvbench::state& state,
std::vector<cudf::type_id> const& d_types,
std::string const& label)
{
size_t const data_size = state.get_int64("total_data_size");
auto const num_threads = state.get_int64("num_threads");
auto const source_type = retrieve_io_type_enum(state.get_string("io_type"));
size_t const data_size = state.get_int64("total_data_size");
auto const num_threads = state.get_int64("num_threads");
auto const num_iterations = state.get_int64("num_iterations");
auto const source_type = retrieve_io_type_enum(state.get_string("io_type"));

auto streams = cudf::detail::fork_streams(cudf::get_default_stream(), num_threads);
BS::thread_pool threads(num_threads);
Expand All @@ -109,12 +111,15 @@ void BM_parquet_multithreaded_read_common(nvbench::state& state,

nvtxRangePushA(("(read) " + label).c_str());
state.exec(nvbench::exec_tag::sync | nvbench::exec_tag::timer,
[&](nvbench::launch& launch, auto& timer) {
[&, num_files = num_files](nvbench::launch& launch, auto& timer) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unrelated: we used to capture a structured binding variable in lambdas, which is not supported in C++17.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, I wasn't aware of that!

auto read_func = [&](int index) {
auto const stream = streams[index % num_threads];
cudf::io::parquet_reader_options read_opts =
cudf::io::parquet_reader_options::builder(source_info_vector[index]);
cudf::io::read_parquet(read_opts, stream, cudf::get_current_device_resource_ref());
for (int i = 0; i < num_iterations; ++i) {
cudf::io::read_parquet(
read_opts, stream, cudf::get_current_device_resource_ref());
}
};

threads.pause();
Expand All @@ -128,7 +133,8 @@ void BM_parquet_multithreaded_read_common(nvbench::state& state,
nvtxRangePop();

auto const time = state.get_summary("nv/cold/time/gpu/mean").get_float64("value");
state.add_element_count(static_cast<double>(data_size) / time, "bytes_per_second");
state.add_element_count(num_iterations * static_cast<double>(data_size) / time,
"bytes_per_second");
state.add_buffer_size(
mem_stats_logger.peak_memory_usage(), "peak_memory_usage", "peak_memory_usage");
state.add_buffer_size(total_file_size, "encoded_file_size", "encoded_file_size");
Expand Down Expand Up @@ -173,6 +179,7 @@ void BM_parquet_multithreaded_read_chunked_common(nvbench::state& state,
{
size_t const data_size = state.get_int64("total_data_size");
auto const num_threads = state.get_int64("num_threads");
auto const num_iterations = state.get_int64("num_iterations");
size_t const input_limit = state.get_int64("input_limit");
size_t const output_limit = state.get_int64("output_limit");
auto const source_type = retrieve_io_type_enum(state.get_string("io_type"));
Expand All @@ -192,22 +199,25 @@ void BM_parquet_multithreaded_read_chunked_common(nvbench::state& state,
nvtxRangePushA(("(read) " + label).c_str());
std::vector<cudf::io::table_with_metadata> chunks;
state.exec(nvbench::exec_tag::sync | nvbench::exec_tag::timer,
[&](nvbench::launch& launch, auto& timer) {
[&, num_files = num_files](nvbench::launch& launch, auto& timer) {
auto read_func = [&](int index) {
auto const stream = streams[index % num_threads];
cudf::io::parquet_reader_options read_opts =
cudf::io::parquet_reader_options::builder(source_info_vector[index]);
// divide chunk limits by number of threads so the number of chunks produced is the
// same for all cases. this seems better than the alternative, which is to keep the
// limits the same. if we do that, as the number of threads goes up, the number of
// chunks goes down - so are actually benchmarking the same thing in that case?
auto reader = cudf::io::chunked_parquet_reader(
output_limit / num_threads, input_limit / num_threads, read_opts, stream);

// read all the chunks
do {
auto table = reader.read_chunk();
} while (reader.has_next());
for (int i = 0; i < num_iterations; ++i) {
// divide chunk limits by number of threads so the number of chunks produced is
// the same for all cases. this seems better than the alternative, which is to
// keep the limits the same. if we do that, as the number of threads goes up, the
// number of chunks goes down - so are actually benchmarking the same thing in
// that case?
auto reader = cudf::io::chunked_parquet_reader(
output_limit / num_threads, input_limit / num_threads, read_opts, stream);

// read all the chunks
do {
auto table = reader.read_chunk();
} while (reader.has_next());
}
};

threads.pause();
Expand All @@ -221,7 +231,8 @@ void BM_parquet_multithreaded_read_chunked_common(nvbench::state& state,
nvtxRangePop();

auto const time = state.get_summary("nv/cold/time/gpu/mean").get_float64("value");
state.add_element_count(static_cast<double>(data_size) / time, "bytes_per_second");
state.add_element_count(num_iterations * static_cast<double>(data_size) / time,
"bytes_per_second");
state.add_buffer_size(
mem_stats_logger.peak_memory_usage(), "peak_memory_usage", "peak_memory_usage");
state.add_buffer_size(total_file_size, "encoded_file_size", "encoded_file_size");
Expand Down Expand Up @@ -267,6 +278,7 @@ NVBENCH_BENCH(BM_parquet_multithreaded_read_mixed)
.add_int64_axis("cardinality", {1000})
.add_int64_axis("total_data_size", {512 * 1024 * 1024, 1024 * 1024 * 1024})
.add_int64_axis("num_threads", {1, 2, 4, 8})
.add_int64_axis("num_iterations", {1})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What values do we want to use here? Are the results interesting in comparing 1 to e.g. 8?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not seeing very interesting results on my system (checked 1, 2, 4 and 8), but that might be because of low H2D/D2H transfer rates on it.
I added the axis so we can easily change the number of iterations when we're looking into pipelining. So, for now, I think we should stick to a single value to keep the number of benchmarks from growing.
I'm fine with defaulting to a larger value. Thoughts? CC @GregoryKimball

.add_int64_axis("num_cols", {4})
.add_int64_axis("run_length", {8})
.add_string_axis("io_type", {"PINNED_BUFFER"});
Expand All @@ -277,6 +289,7 @@ NVBENCH_BENCH(BM_parquet_multithreaded_read_fixed_width)
.add_int64_axis("cardinality", {1000})
.add_int64_axis("total_data_size", {512 * 1024 * 1024, 1024 * 1024 * 1024})
.add_int64_axis("num_threads", {1, 2, 4, 8})
.add_int64_axis("num_iterations", {1})
.add_int64_axis("num_cols", {4})
.add_int64_axis("run_length", {8})
.add_string_axis("io_type", {"PINNED_BUFFER"});
Expand All @@ -287,6 +300,7 @@ NVBENCH_BENCH(BM_parquet_multithreaded_read_string)
.add_int64_axis("cardinality", {1000})
.add_int64_axis("total_data_size", {512 * 1024 * 1024, 1024 * 1024 * 1024})
.add_int64_axis("num_threads", {1, 2, 4, 8})
.add_int64_axis("num_iterations", {1})
.add_int64_axis("num_cols", {4})
.add_int64_axis("run_length", {8})
.add_string_axis("io_type", {"PINNED_BUFFER"});
Expand All @@ -297,6 +311,7 @@ NVBENCH_BENCH(BM_parquet_multithreaded_read_list)
.add_int64_axis("cardinality", {1000})
.add_int64_axis("total_data_size", {512 * 1024 * 1024, 1024 * 1024 * 1024})
.add_int64_axis("num_threads", {1, 2, 4, 8})
.add_int64_axis("num_iterations", {1})
.add_int64_axis("num_cols", {4})
.add_int64_axis("run_length", {8})
.add_string_axis("io_type", {"PINNED_BUFFER"});
Expand All @@ -308,6 +323,7 @@ NVBENCH_BENCH(BM_parquet_multithreaded_read_chunked_mixed)
.add_int64_axis("cardinality", {1000})
.add_int64_axis("total_data_size", {512 * 1024 * 1024, 1024 * 1024 * 1024})
.add_int64_axis("num_threads", {1, 2, 4, 8})
.add_int64_axis("num_iterations", {1})
.add_int64_axis("num_cols", {4})
.add_int64_axis("run_length", {8})
.add_int64_axis("input_limit", {640 * 1024 * 1024})
Expand All @@ -320,6 +336,7 @@ NVBENCH_BENCH(BM_parquet_multithreaded_read_chunked_fixed_width)
.add_int64_axis("cardinality", {1000})
.add_int64_axis("total_data_size", {512 * 1024 * 1024, 1024 * 1024 * 1024})
.add_int64_axis("num_threads", {1, 2, 4, 8})
.add_int64_axis("num_iterations", {1})
.add_int64_axis("num_cols", {4})
.add_int64_axis("run_length", {8})
.add_int64_axis("input_limit", {640 * 1024 * 1024})
Expand All @@ -332,6 +349,7 @@ NVBENCH_BENCH(BM_parquet_multithreaded_read_chunked_string)
.add_int64_axis("cardinality", {1000})
.add_int64_axis("total_data_size", {512 * 1024 * 1024, 1024 * 1024 * 1024})
.add_int64_axis("num_threads", {1, 2, 4, 8})
.add_int64_axis("num_iterations", {1})
.add_int64_axis("num_cols", {4})
.add_int64_axis("run_length", {8})
.add_int64_axis("input_limit", {640 * 1024 * 1024})
Expand All @@ -344,6 +362,7 @@ NVBENCH_BENCH(BM_parquet_multithreaded_read_chunked_list)
.add_int64_axis("cardinality", {1000})
.add_int64_axis("total_data_size", {512 * 1024 * 1024, 1024 * 1024 * 1024})
.add_int64_axis("num_threads", {1, 2, 4, 8})
.add_int64_axis("num_iterations", {1})
.add_int64_axis("num_cols", {4})
.add_int64_axis("run_length", {8})
.add_int64_axis("input_limit", {640 * 1024 * 1024})
Expand Down
Loading