Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make deletion vector manipulation less error-prone #374

Merged
merged 3 commits into from
Oct 7, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion acceptance/src/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ pub async fn assert_scan_data(engine: Arc<dyn Engine>, test_case: &TestCaseInfo)
.execute(engine)?
.map(|scan_result| -> DeltaResult<_> {
let scan_result = scan_result?;
let mask = scan_result.full_mask();
let data = scan_result.raw_data?;
let record_batch: RecordBatch = data
.into_any()
Expand All @@ -139,7 +140,7 @@ pub async fn assert_scan_data(engine: Arc<dyn Engine>, test_case: &TestCaseInfo)
if schema.is_none() {
schema = Some(record_batch.schema());
}
if let Some(mask) = scan_result.mask {
if let Some(mask) = mask {
Ok(filter_record_batch(&record_batch, &mask.into())?)
} else {
Ok(record_batch)
Expand Down
8 changes: 2 additions & 6 deletions kernel/examples/read-table-single-threaded/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,18 +117,14 @@ fn try_main() -> DeltaResult<()> {
.execute(engine.as_ref())?
.map(|scan_result| -> DeltaResult<_> {
let scan_result = scan_result?;
let mask = scan_result.full_mask();
let data = scan_result.raw_data?;
let record_batch: RecordBatch = data
.into_any()
.downcast::<ArrowEngineData>()
.map_err(|_| delta_kernel::Error::EngineDataType("ArrowEngineData".to_string()))?
.into();
if let Some(mut mask) = scan_result.mask {
let extra_rows = record_batch.num_rows() - mask.len();
if extra_rows > 0 {
// we need to extend the mask here in case it's too short
mask.extend(std::iter::repeat(true).take(extra_rows));
}
if let Some(mask) = mask {
Ok(filter_record_batch(&record_batch, &mask.into())?)
} else {
Ok(record_batch)
Expand Down
17 changes: 15 additions & 2 deletions kernel/src/scan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,20 @@ pub struct ScanResult {
/// plan to call arrow's `filter_record_batch`, you _need_ to extend this vector to the full
/// length of the batch or arrow will drop the extra rows
// TODO(nick) this should be allocated by the engine
pub mask: Option<Vec<bool>>,
raw_mask: Option<Vec<bool>>,
}

impl ScanResult {
/// Returns the raw mask. This is dangerous to use because it may be shorter than expected.
scovich marked this conversation as resolved.
Show resolved Hide resolved
pub fn raw_mask(&self) -> Option<&Vec<bool>> {
self.raw_mask.as_ref()
}
/// Extends the underlying mask to match the row count of the accompanying data.
pub fn full_mask(&self) -> Option<Vec<bool>> {
let mut mask = self.raw_mask.clone()?;
mask.resize(self.raw_data.as_ref().ok()?.length(), true);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

very nice! I like this better than extend(iter::repeat().take())

Some(mask)
}
}

/// Scan uses this to set up what kinds of columns it is scanning. For `Selected` we just store the
Expand Down Expand Up @@ -314,7 +327,7 @@ impl Scan {
let rest = split_vector(sv.as_mut(), len, None);
let result = ScanResult {
raw_data: logical,
mask: sv,
raw_mask: sv,
};
selection_vector = rest;
Ok(result)
Expand Down
8 changes: 3 additions & 5 deletions kernel/tests/dv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,10 @@ fn count_total_scan_rows(
scan_result_iter
.map(|scan_result| {
let scan_result = scan_result?;
let data = scan_result.raw_data?;
// NOTE: The mask only suppresses rows for which it is both present and false.
let deleted_rows = scan_result
.mask
.as_ref()
.map_or(0, |mask| mask.iter().filter(|&&m| !m).count());
let mask = scan_result.raw_mask();
let deleted_rows = mask.into_iter().flatten().filter(|&&m| !m).count();
let data = scan_result.raw_data?;
Ok(data.length() - deleted_rows)
})
.fold_ok(0, Add::add)
Expand Down
8 changes: 2 additions & 6 deletions kernel/tests/golden_tables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,14 +161,10 @@ async fn latest_snapshot_test(
let batches: Vec<RecordBatch> = scan_res
.map(|scan_result| -> DeltaResult<_> {
let scan_result = scan_result?;
let mask = scan_result.full_mask();
let data = scan_result.raw_data?;
let record_batch = to_arrow(data)?;
if let Some(mut mask) = scan_result.mask {
let extra_rows = record_batch.num_rows() - mask.len();
if extra_rows > 0 {
// we need to extend the mask here in case it's too short
mask.extend(std::iter::repeat(true).take(extra_rows));
}
if let Some(mask) = mask {
Ok(filter_record_batch(&record_batch, &mask.into())?)
} else {
Ok(record_batch)
Expand Down
8 changes: 2 additions & 6 deletions kernel/tests/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -401,14 +401,10 @@ fn read_with_execute(
let batches: Vec<RecordBatch> = scan_results
.map(|scan_result| -> DeltaResult<_> {
let scan_result = scan_result?;
let mask = scan_result.full_mask();
let data = scan_result.raw_data?;
let record_batch = to_arrow(data)?;
if let Some(mut mask) = scan_result.mask {
let extra_rows = record_batch.num_rows() - mask.len();
if extra_rows > 0 {
// we need to extend the mask here in case it's too short
mask.extend(std::iter::repeat(true).take(extra_rows));
}
if let Some(mask) = mask {
Ok(filter_record_batch(&record_batch, &mask.into())?)
} else {
Ok(record_batch)
Expand Down
Loading