-
Notifications
You must be signed in to change notification settings - Fork 850
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Stub out Skip Records API (#1792) #1998
Conversation
c413686
to
0071931
Compare
Codecov Report
@@ Coverage Diff @@
## master #1998 +/- ##
==========================================
- Coverage 83.58% 83.42% -0.16%
==========================================
Files 222 222
Lines 57522 57906 +384
==========================================
+ Hits 48078 48309 +231
- Misses 9444 9597 +153
Continue to review full report at Codecov.
|
cool! 👍 @tustvold Are you the Flash 😄! i will try to go through this and give your my opinion today. |
pub(crate) fn with_row_selection( | ||
self, | ||
selection: impl Into<Vec<RowSelection>>, | ||
) -> Self { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we add total_row_count
to check this selection
is valid(maybe like continuous)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it actually an issue if it isn't, e.g. if I only want the first 100 rows?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, got it, it should check in user side.
|
||
/// Gets metadata about the next page, returns an error if no | ||
/// column index information | ||
fn peek_next_page(&self) -> Result<Option<PageMetadata>>; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍 really need this abstraction!
} | ||
|
||
impl Iterator for ParquetRecordBatchReader { | ||
type Item = ArrowResult<RecordBatch>; | ||
|
||
fn next(&mut self) -> Option<Self::Item> { | ||
match self.array_reader.next_batch(self.batch_size) { | ||
let to_read = match self.selection.as_mut() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍 pass mask here not each col is more reasonable 😂
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍 I think this abstraction is great ! Thanks for your effort!❤️
Left some comments, most are
Maybe after this pr merge, i will continue to work on page index.
/// API for reading pages from a column chunk. | ||
/// This offers a iterator like API to get the next page. | ||
pub trait PageReader: Iterator<Item = Result<Page>> + Send { | ||
/// Gets the next page in the column chunk associated with this reader. | ||
/// Returns `None` if there are no pages left. | ||
fn get_next_page(&mut self) -> Result<Option<Page>>; | ||
|
||
/// Gets metadata about the next page, returns an error if no | ||
/// column index information |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there we only need offset index
, without the min max index
?🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think so
|
||
self.consume_def_levels(); | ||
self.consume_rep_levels(); | ||
self.consume_record_data(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this for the situation a page which has been read_records
but left some unreaded buffer?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, i don't get this point, why not directly call column_reader.skip_records(num_records)
could you give me some hint?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
RecordReader is a bit of an odd cookie, let me try to explain what it is doing.
In the absence of repetition levels, it can simply read batch size levels, and the corresponding number of values.
However, if repetition levels are present, it will likely need to read more than batch_size levels in order to read batch_size actual records (rows).
To achieve this it reads to its internal buffer and then splits off the data corresponding to batch_size rows, leaving the excess behind.
It is this excess of data that has been read to its buffers but not yielded to the caller yet, which we must consume here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍 nice write up ! Save me some time 😄!
So, i got it. More specific details to ask:
This is a part of skip, we need to read the rp
,dp
to skip some records in the page(maybe have been readed or never readed ).
let (buffered_records, buffered_values) = self.count_records(num_records);
self.num_records += buffered_records;
self.num_values += buffered_values;
self.consume_def_levels();
self.consume_rep_levels();
self.consume_record_data();
self.consume_bitmap();
self.reset();
let remaining = buffered_records - num_records;
This also part of skip, remaining > 0
, I think this we skip start at a new page
if remaining == 0 {
return Ok(buffered_records);
}
let skipped = match self.column_reader.as_mut() {
Some(column_reader) => column_reader.skip_records(remaining)?,
None => 0,
};
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a part of skip, we need to read the rp ,dp to skip some records in the page(maybe have been readed or never readed ).
Yes, this is just to consume the data that has been read to the internal buffers of RecordReader if any
This also part of skip, remaining > 0, I think this we skip start at a new page
Not necessarily, the only thing RecordReader needs to handle is skipping any data that has already been read from ColumnReader into its own buffers. It can then delegate to ColumnReader to skip the remaining rows, with no requirement that this is done at a page boundary - ColumnReader must be able to handle any case.
0071931
to
7527750
Compare
7527750
to
7324873
Compare
Co-authored-by: Yang Jiang <jiangyang381@163.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The API looks good to me -- I had some questions and I think it would be nicer to return NotImplemented errors rather than panic in certain cases but I think this PR could also be merged as is to unblock further dev work
@@ -210,6 +214,10 @@ impl<I: OffsetSizeTrait + ScalarValue> ColumnValueDecoder | |||
|
|||
decoder.read(out, range.end - range.start, self.dict.as_ref()) | |||
} | |||
|
|||
fn skip_values(&mut self, _num_values: usize) -> Result<usize> { | |||
todo!() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think adding a ticket reference here like
unimplemented!("See https://github.com/apache/arrow-rs/.....")
would help future readers
Bonus points for returning ArrowError::Unimplemented
This comment applies to everything below as well
parquet/src/arrow/arrow_reader.rs
Outdated
|
||
/// Scan rows from the parquet file according to the provided `selection` | ||
/// | ||
/// TODO: Make public once row selection fully implemented |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
perhaps worth a ticket?
/// [`RowSelection`] allows selecting or skipping a provided number of rows | ||
/// when scanning the parquet file | ||
#[derive(Debug, Clone, Copy)] | ||
pub(crate) struct RowSelection { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You probably already have thought about this, but I would expect that in certain scenarios, non contiguous rows / skips would be desired
Like "fetch the first 100 rows, skip the next 200, and then fetch the remaining"
Would this interface handle that case?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See with_row_selection which takes a Vec to allow for this use-case
@@ -555,6 +555,14 @@ impl<T: Read + Send> PageReader for SerializedPageReader<T> { | |||
// We are at the end of this column chunk and no more page left. Return None. | |||
Ok(None) | |||
} | |||
|
|||
fn peek_next_page(&self) -> Result<Option<PageMetadata>> { | |||
todo!() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto returning "not yet implemented" would probably be nicer
@@ -146,15 +146,15 @@ impl LevelsBufferSlice for DefinitionLevelBuffer { | |||
} | |||
} | |||
|
|||
pub struct DefinitionLevelDecoder { | |||
pub struct DefinitionLevelBufferDecoder { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I this rename a public API change as well? It does not appear in the docs
https://docs.rs/parquet/17.0.0/parquet/?search=DefinitionLevelDecoder
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No it is crate local
Which issue does this PR close?
Part of #1792
Rationale for this change
Stubs out an API for providing skip records functionality within parquet. I think this will work to support #1792, #1191 and potentially other functionality down the line.
Let me know what you think @Ted-Jiang @sunchao
What changes are included in this PR?
Stubs out APIs for adding row skipping logic to the parquet implementation
Are there any user-facing changes?
No 🎉