Skip to content

Commit

Permalink
Closes #8502: Parallel NDJSON file reading (#8659)
Browse files Browse the repository at this point in the history
* added basic test

* added `fn repartitioned`

* added basic version of FileOpener

* refactor: extract calculate_range

* refactor: handle GetResultPayload::Stream

* refactor: extract common functions to mod.rs

* refactor: use common functions

* added docs

* added test

* clippy

* fix: test_chunked_json

* fix: sqllogictest

* delete imports

* update docs
  • Loading branch information
marvinlanhenke authored Dec 31, 2023
1 parent 848f6c3 commit 03bd9b4
Show file tree
Hide file tree
Showing 6 changed files with 305 additions and 119 deletions.
106 changes: 100 additions & 6 deletions datafusion/core/src/datasource/file_format/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -294,16 +294,20 @@ impl DataSink for JsonSink {
#[cfg(test)]
mod tests {
use super::super::test_util::scan_format;
use super::*;
use crate::physical_plan::collect;
use crate::prelude::{SessionConfig, SessionContext};
use crate::test::object_store::local_unpartitioned_file;

use arrow::util::pretty;
use datafusion_common::cast::as_int64_array;
use datafusion_common::stats::Precision;

use datafusion_common::{assert_batches_eq, internal_err};
use futures::StreamExt;
use object_store::local::LocalFileSystem;
use regex::Regex;
use rstest::rstest;

use super::*;
use crate::execution::options::NdJsonReadOptions;
use crate::physical_plan::collect;
use crate::prelude::{SessionConfig, SessionContext};
use crate::test::object_store::local_unpartitioned_file;

#[tokio::test]
async fn read_small_batches() -> Result<()> {
Expand Down Expand Up @@ -424,4 +428,94 @@ mod tests {
.collect::<Vec<_>>();
assert_eq!(vec!["a: Int64", "b: Float64", "c: Boolean"], fields);
}

async fn count_num_partitions(ctx: &SessionContext, query: &str) -> Result<usize> {
let result = ctx
.sql(&format!("EXPLAIN {query}"))
.await?
.collect()
.await?;

let plan = format!("{}", &pretty::pretty_format_batches(&result)?);

let re = Regex::new(r"file_groups=\{(\d+) group").unwrap();

if let Some(captures) = re.captures(&plan) {
if let Some(match_) = captures.get(1) {
let count = match_.as_str().parse::<usize>().unwrap();
return Ok(count);
}
}

internal_err!("Query contains no Exec: file_groups")
}

#[rstest(n_partitions, case(1), case(2), case(3), case(4))]
#[tokio::test]
async fn it_can_read_ndjson_in_parallel(n_partitions: usize) -> Result<()> {
let config = SessionConfig::new()
.with_repartition_file_scans(true)
.with_repartition_file_min_size(0)
.with_target_partitions(n_partitions);

let ctx = SessionContext::new_with_config(config);

let table_path = "tests/data/1.json";
let options = NdJsonReadOptions::default();

ctx.register_json("json_parallel", table_path, options)
.await?;

let query = "SELECT SUM(a) FROM json_parallel;";

let result = ctx.sql(query).await?.collect().await?;
let actual_partitions = count_num_partitions(&ctx, query).await?;

#[rustfmt::skip]
let expected = [
"+----------------------+",
"| SUM(json_parallel.a) |",
"+----------------------+",
"| -7 |",
"+----------------------+"
];

assert_batches_eq!(expected, &result);
assert_eq!(n_partitions, actual_partitions);

Ok(())
}

#[rstest(n_partitions, case(1), case(2), case(3), case(4))]
#[tokio::test]
async fn it_can_read_empty_ndjson_in_parallel(n_partitions: usize) -> Result<()> {
let config = SessionConfig::new()
.with_repartition_file_scans(true)
.with_repartition_file_min_size(0)
.with_target_partitions(n_partitions);

let ctx = SessionContext::new_with_config(config);

let table_path = "tests/data/empty.json";
let options = NdJsonReadOptions::default();

ctx.register_json("json_parallel_empty", table_path, options)
.await?;

let query = "SELECT * FROM json_parallel_empty WHERE random() > 0.5;";

let result = ctx.sql(query).await?.collect().await?;
let actual_partitions = count_num_partitions(&ctx, query).await?;

#[rustfmt::skip]
let expected = [
"++",
"++",
];

assert_batches_eq!(expected, &result);
assert_eq!(1, actual_partitions);

Ok(())
}
}
98 changes: 15 additions & 83 deletions datafusion/core/src/datasource/physical_plan/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,10 @@
use std::any::Any;
use std::io::{Read, Seek, SeekFrom};
use std::ops::Range;
use std::sync::Arc;
use std::task::Poll;

use super::{FileGroupPartitioner, FileScanConfig};
use super::{calculate_range, FileGroupPartitioner, FileScanConfig, RangeCalculation};
use crate::datasource::file_format::file_compression_type::FileCompressionType;
use crate::datasource::listing::{FileRange, ListingTableUrl};
use crate::datasource::physical_plan::file_stream::{
Expand Down Expand Up @@ -318,47 +317,6 @@ impl CsvOpener {
}
}

/// Returns the offset of the first newline in the object store range [start, end), or the end offset if no newline is found.
async fn find_first_newline(
object_store: &Arc<dyn ObjectStore>,
location: &object_store::path::Path,
start_byte: usize,
end_byte: usize,
) -> Result<usize> {
let options = GetOptions {
range: Some(Range {
start: start_byte,
end: end_byte,
}),
..Default::default()
};

let r = object_store.get_opts(location, options).await?;
let mut input = r.into_stream();

let mut buffered = Bytes::new();
let mut index = 0;

loop {
if buffered.is_empty() {
match input.next().await {
Some(Ok(b)) => buffered = b,
Some(Err(e)) => return Err(e.into()),
None => return Ok(index),
};
}

for byte in &buffered {
if *byte == b'\n' {
return Ok(index);
}
index += 1;
}

buffered.advance(buffered.len());
}
}

impl FileOpener for CsvOpener {
/// Open a partitioned CSV file.
///
Expand Down Expand Up @@ -408,55 +366,29 @@ impl FileOpener for CsvOpener {
);
}

let store = self.config.object_store.clone();

Ok(Box::pin(async move {
let file_size = file_meta.object_meta.size;
// Current partition contains bytes [start_byte, end_byte) (might contain incomplete lines at boundaries)
let range = match file_meta.range {
None => None,
Some(FileRange { start, end }) => {
let (start, end) = (start as usize, end as usize);
// Partition byte range is [start, end), the boundary might be in the middle of
// some line. Need to find out the exact line boundaries.
let start_delta = if start != 0 {
find_first_newline(
&config.object_store,
file_meta.location(),
start - 1,
file_size,
)
.await?
} else {
0
};
let end_delta = if end != file_size {
find_first_newline(
&config.object_store,
file_meta.location(),
end - 1,
file_size,
)
.await?
} else {
0
};
let range = start + start_delta..end + end_delta;
if range.start == range.end {
return Ok(
futures::stream::poll_fn(move |_| Poll::Ready(None)).boxed()
);
}
Some(range)

let calculated_range = calculate_range(&file_meta, &store).await?;

let range = match calculated_range {
RangeCalculation::Range(None) => None,
RangeCalculation::Range(Some(range)) => Some(range),
RangeCalculation::TerminateEarly => {
return Ok(
futures::stream::poll_fn(move |_| Poll::Ready(None)).boxed()
)
}
};

let options = GetOptions {
range,
..Default::default()
};
let result = config
.object_store
.get_opts(file_meta.location(), options)
.await?;

let result = store.get_opts(file_meta.location(), options).await?;

match result.payload {
GetResultPayload::File(mut file, _) => {
Expand Down
Loading

0 comments on commit 03bd9b4

Please sign in to comment.