Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Require batches to be non-empty in multi-batch JSON reader #17837

Merged
merged 10 commits into from
Feb 4, 2025
65 changes: 37 additions & 28 deletions cpp/src/io/json/read_json.cu
Original file line number Diff line number Diff line change
Expand Up @@ -165,23 +165,26 @@ std::size_t estimate_size_per_subchunk(std::size_t chunk_size)
}

/**
* @brief Return the upper bound on the batch size for the JSON reader.
* @brief Return the batch size for the JSON reader.
*
* The datasources passed to the JSON reader are split into batches demarcated by byte range
* offsets and read iteratively. The batch size is capped at INT_MAX bytes, which is the
* default value returned by the function. This value can be overridden at runtime using the
* environment variable LIBCUDF_JSON_BATCH_SIZE
* The datasources passed to the JSON reader are read iteratively in batches demarcated by byte
* range offsets. The tokenizer requires the JSON buffer read in each batch to be of size at most
* INT_MAX bytes.
* Since the byte range corresponding to a given batch can cause the last JSON line
* in the batch to be incomplete, the batch size returned by this function allows for an additional
* `max_subchunks_prealloced` subchunks to be allocated beyond the byte range offsets. Since the
* size of the subchunk depends on the size of the byte range, the batch size is variable and cannot
* be directly controlled by the user. As a workaround, the environment variable
* LIBCUDF_JSON_BATCH_SIZE can be used to set a fixed batch size at runtime.
*
* @return size in bytes
*/
std::size_t get_batch_size_upper_bound()
std::size_t get_batch_size(std::size_t chunk_size)
{
auto const batch_size_str = std::getenv("LIBCUDF_JSON_BATCH_SIZE");
int64_t const batch_size = batch_size_str != nullptr ? std::atol(batch_size_str) : 0L;
auto const batch_limit = static_cast<int64_t>(std::numeric_limits<int32_t>::max());
auto const batch_size_upper_bound = static_cast<std::size_t>(
(batch_size > 0 && batch_size < batch_limit) ? batch_size : batch_limit);
return batch_size_upper_bound;
auto const size_per_subchunk = estimate_size_per_subchunk(chunk_size);
auto const batch_limit = static_cast<std::size_t>(std::numeric_limits<int32_t>::max()) -
(max_subchunks_prealloced * size_per_subchunk);
return std::min(batch_limit, getenv_or<std::size_t>("LIBCUDF_JSON_BATCH_SIZE", batch_limit));
}

/**
Expand Down Expand Up @@ -295,6 +298,10 @@ datasource::owning_buffer<rmm::device_buffer> get_record_range_raw_input(
}
}

auto const batch_limit = static_cast<size_t>(std::numeric_limits<int32_t>::max());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Double-checking here, we do actually want the limit to be explicitly tied to int32_t, right? i.e. we don't want the implementation-defined size_t size? Are we choosing int32_t because it is size_type? If so, should this be cudf::size_type in the arg to numeric_limits?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we want to set batch_limit to be explicitly set to 2^31 - 1 since that is the maximum string size accepted by the JSON tokenizer.

CUDF_EXPECTS(input_size == 0 || (input_size - 1) <= std::numeric_limits<int32_t>::max(),

If we are changing the arg to numeric_limits to cudf::size_type , I think we should modify check_input_size as well.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree. I don't know this part of the code well enough to know whether int32_t is really semantically cudf::size_type here or if it is a semantically different (but numerically equivalent) limit. I'll defer to your judgment.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sematically, when do we prefer cudf::size_type over int32_t?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

https://github.com/rapidsai/cudf/blob/branch-25.04/cpp/doxygen/developer_guide/DEVELOPER_GUIDE.md#cudfsize_type

The cudf::size_type is the type used for the number of elements in a column, offsets to elements within a column, indices to address specific elements, segments for subsets of column elements, etc.

If this is meant to represent one of those things above ^, it should be cudf::size_type.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, Bradley. I believe that the usage in the JSON batching logic and the tokenizer should be int32_t in that case since we are referring to the size of (and offsets in) a raw JSON string before the cudf table is constructed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes that sounds right to me.

CUDF_EXPECTS(static_cast<size_t>(next_delim_pos - first_delim_pos - shift_for_nonzero_offset) <
batch_limit,
shrshi marked this conversation as resolved.
Show resolved Hide resolved
"The size of the JSON buffer returned by every batch cannot exceed INT_MAX bytes");
return datasource::owning_buffer<rmm::device_buffer>(
std::move(buffer),
reinterpret_cast<uint8_t*>(buffer.data()) + first_delim_pos + shift_for_nonzero_offset,
Expand Down Expand Up @@ -365,17 +372,11 @@ table_with_metadata read_json_impl(host_span<std::unique_ptr<datasource>> source
reader_opts.is_enabled_lines() || total_source_size < std::numeric_limits<int32_t>::max(),
"Parsing Regular JSON inputs of size greater than INT_MAX bytes is not supported");

std::size_t chunk_offset = reader_opts.get_byte_range_offset();
std::size_t chunk_size = reader_opts.get_byte_range_size();
chunk_size = !chunk_size ? total_source_size - chunk_offset
: std::min(chunk_size, total_source_size - chunk_offset);

std::size_t const size_per_subchunk = estimate_size_per_subchunk(chunk_size);
std::size_t const batch_size_upper_bound = get_batch_size_upper_bound();
std::size_t const batch_size =
batch_size_upper_bound < (max_subchunks_prealloced * size_per_subchunk)
? batch_size_upper_bound
: batch_size_upper_bound - (max_subchunks_prealloced * size_per_subchunk);
std::size_t chunk_offset = reader_opts.get_byte_range_offset();
std::size_t chunk_size = reader_opts.get_byte_range_size();
chunk_size = !chunk_size ? total_source_size - chunk_offset
: std::min(chunk_size, total_source_size - chunk_offset);
std::size_t const batch_size = get_batch_size(chunk_size);

/*
* Identify the position (zero-indexed) of starting source file from which to begin
Expand Down Expand Up @@ -490,11 +491,19 @@ table_with_metadata read_json_impl(host_span<std::unique_ptr<datasource>> source
// Dispatch individual batches to read_batch and push the resulting table into
// partial_tables array. Note that the reader options need to be updated for each
// batch to adjust byte range offset and byte range size.
for (std::size_t i = 1; i < batch_offsets.size() - 1; i++) {
batched_reader_opts.set_byte_range_offset(batch_offsets[i]);
batched_reader_opts.set_byte_range_size(batch_offsets[i + 1] - batch_offsets[i]);
partial_tables.emplace_back(
read_batch(sources, batched_reader_opts, stream, cudf::get_current_device_resource_ref()));
for (std::size_t batch_offset_pos = 1; batch_offset_pos < batch_offsets.size() - 1;
batch_offset_pos++) {
batched_reader_opts.set_byte_range_offset(batch_offsets[batch_offset_pos]);
batched_reader_opts.set_byte_range_size(batch_offsets[batch_offset_pos + 1] -
batch_offsets[batch_offset_pos]);
auto partial_table =
read_batch(sources, batched_reader_opts, stream, cudf::get_current_device_resource_ref());
if (partial_table.tbl->num_columns() == 0 && partial_table.tbl->num_rows() == 0) {
CUDF_EXPECTS(batch_offset_pos == batch_offsets.size() - 2,
"Only the partial table generated by the last batch can be empty");
break;
}
partial_tables.emplace_back(std::move(partial_table));
}

auto expects_schema_equality =
Expand Down
44 changes: 43 additions & 1 deletion cpp/tests/io/json/json_test.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020-2024, NVIDIA CORPORATION.
* Copyright (c) 2020-2025, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -3461,4 +3461,46 @@ TEST_F(JsonReaderTest, MismatchedBeginEndTokens)
EXPECT_THROW(cudf::io::read_json(opts), cudf::logic_error);
}

/**
* @brief Base test fixture for JSON batched reader tests
*/
struct JsonBatchedReaderTest : public cudf::test::BaseFixture {
public:
void set_batch_size(size_t batch_size_upper_bound)
{
setenv("LIBCUDF_JSON_BATCH_SIZE", std::to_string(batch_size_upper_bound).c_str(), 1);
}

~JsonBatchedReaderTest() { unsetenv("LIBCUDF_JSON_BATCH_SIZE"); }
};

TEST_F(JsonBatchedReaderTest, EmptyLastBatch)
{
std::string data = R"(
{"a": "b"}
{"a": "b"}
{"a": "b"}
{"a": "b"}
)";
size_t size_of_last_batch = 5;
// This test constructs two batches by setting the batch size such that the last batch is an
// incomplete line. The JSON string corresponding to the first batch is
// '\n{"a": "b"}\n{"a": "b"}\n{"a": "b"}\n{"a": '
// The JSON string corresponding to the second batch is
// '"b"}\n'
this->set_batch_size(data.size() - size_of_last_batch);
auto opts =
cudf::io::json_reader_options::builder(cudf::io::source_info{data.data(), data.size()})
.lines(true)
.build();
auto result = cudf::io::read_json(opts);

EXPECT_EQ(result.tbl->num_columns(), 1);
EXPECT_EQ(result.tbl->num_rows(), 4);
EXPECT_EQ(result.tbl->get_column(0).type().id(), cudf::type_id::STRING);
EXPECT_EQ(result.metadata.schema_info[0].name, "a");
CUDF_TEST_EXPECT_COLUMNS_EQUAL(result.tbl->get_column(0),
cudf::test::strings_column_wrapper{{"b", "b", "b", "b"}});
}

CUDF_TEST_PROGRAM_MAIN()