-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
when inferring the schema of compressed CSV, decompress before newline-delimited chunking #5860
Conversation
2e62a39
to
3b8f4e9
Compare
@jackwener @Jefffrey I wonder if you have time to review this PR. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for this, left some comments
@@ -124,8 +154,13 @@ impl FileFormat for CsvFormat { | |||
let mut records_to_read = self.schema_infer_max_rec.unwrap_or(usize::MAX); | |||
|
|||
for object in objects { | |||
// stream to only read as many rows as needed into memory | |||
let stream = read_to_delimited_chunks(store, object).await; | |||
let stream = store.get(&object.location).await.unwrap(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
safe unwrap?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks. fixed
.unwrap() | ||
.downcast::<DataFusionError>() | ||
.unwrap()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
safe unwraps?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's fine.
Because err_converter only accepts std::io::Error type as input, and DataFusionError has also implemented it.
@@ -48,5 +49,22 @@ async fn main() -> Result<()> { | |||
// print the results | |||
df.show().await?; | |||
|
|||
// query with |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
expand on this comment a bit? seems it was cutoff, so like query compressed CSV with specific options
etc.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed.
@Jefffrey Thanks for the review. |
// #[cfg(feature = "compression")] | ||
let err_converter = |e: std::io::Error| match e | ||
.get_ref() | ||
.and_then(|e| e.downcast_ref::<DataFusionError>()) | ||
{ | ||
Some(_) => { | ||
*(e.into_inner() | ||
.unwrap() | ||
.downcast::<DataFusionError>() | ||
.unwrap()) | ||
} | ||
None => Into::<DataFusionError>::into(e), | ||
}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I find this syntax to be a confusing and not ideal, however didn't find any alternative when I experimented myself (without introducing a clone)
This unstable feature seems to solve the usecase, if ever get around to refactoring this in future if/when it becomes available: rust-lang/rust#99262
Some other points:
- Is that first commented line about cfg supposed to be there or not?
None => Into::<DataFusionError>::into(e)
->None => DataFusionError::from(e)
looks cleaner I think
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- Is that first commented line about cfg supposed to be there or not?
None => Into::<DataFusionError>::into(e)
->None => DataFusionError::from(e)
looks cleaner I think
fixed.
Thank you again for your review @Jefffrey |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
testing
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I verified this is a commit on master of testing: 👍 apache/arrow-testing@47f7b56
GZIP => Box::new( | ||
ReaderStream::new(AsyncGzEncoder::new(StreamReader::new(s))) | ||
.map_err(err_converter), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure why we need the err_converer
-- this worked for me locally:
GZIP => Box::new( | |
ReaderStream::new(AsyncGzEncoder::new(StreamReader::new(s))) | |
.map_err(err_converter), | |
GZIP => Box::new( | |
ReaderStream::new(AsyncGzEncoder::new(StreamReader::new(s))) | |
.map_err(DataFusionError::from), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed. thanks
/// Return a newline delimited stream from the specified file on | ||
/// Stream, decompressing if necessary | ||
/// Each returned `Bytes` has a whole number of newline delimited rows | ||
async fn read_to_delimited_chunks( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I feel like there must be a simpler way to express this code, but this does appear to work.
I wonder if we could use BoxStream
rather than impl Stream....
🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed,thanks a lot.
), | ||
#[cfg(feature = "compression")] | ||
ZSTD => Box::new( | ||
ReaderStream::new(AsyncZstdEncoer::new(StreamReader::new(s))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it should be ´AsyncZstdEncoder´
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks, fixed.
3101231
to
9667c21
Compare
…e-delimited chunking
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good -- thank you @jiangzhx
…e-delimited chunking (apache#5860)
Which issue does this PR close?
Closes #5657.
Rationale for this change
#5041
#1736
What changes are included in this PR?
Are these changes tested?
Are there any user-facing changes?