Skip to content

Commit

Permalink
Make deletion vector manipulation less error-prone (#374)
Browse files Browse the repository at this point in the history
Deletion vectors in a scan result can be truncated, with fewer entries
than the corresponding engine data.

Remove a footgun by making the raw DV in a `ScanResult` private, with
new accessors to make it less error-prone to use.
  • Loading branch information
scovich authored Oct 7, 2024
1 parent 7f535a2 commit ff6b128
Show file tree
Hide file tree
Showing 6 changed files with 40 additions and 32 deletions.
3 changes: 2 additions & 1 deletion acceptance/src/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ pub async fn assert_scan_data(engine: Arc<dyn Engine>, test_case: &TestCaseInfo)
.execute(engine)?
.map(|scan_result| -> DeltaResult<_> {
let scan_result = scan_result?;
let mask = scan_result.full_mask();
let data = scan_result.raw_data?;
let record_batch: RecordBatch = data
.into_any()
Expand All @@ -139,7 +140,7 @@ pub async fn assert_scan_data(engine: Arc<dyn Engine>, test_case: &TestCaseInfo)
if schema.is_none() {
schema = Some(record_batch.schema());
}
if let Some(mask) = scan_result.mask {
if let Some(mask) = mask {
Ok(filter_record_batch(&record_batch, &mask.into())?)
} else {
Ok(record_batch)
Expand Down
8 changes: 2 additions & 6 deletions kernel/examples/read-table-single-threaded/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,18 +117,14 @@ fn try_main() -> DeltaResult<()> {
.execute(engine.as_ref())?
.map(|scan_result| -> DeltaResult<_> {
let scan_result = scan_result?;
let mask = scan_result.full_mask();
let data = scan_result.raw_data?;
let record_batch: RecordBatch = data
.into_any()
.downcast::<ArrowEngineData>()
.map_err(|_| delta_kernel::Error::EngineDataType("ArrowEngineData".to_string()))?
.into();
if let Some(mut mask) = scan_result.mask {
let extra_rows = record_batch.num_rows() - mask.len();
if extra_rows > 0 {
// we need to extend the mask here in case it's too short
mask.extend(std::iter::repeat(true).take(extra_rows));
}
if let Some(mask) = mask {
Ok(filter_record_batch(&record_batch, &mask.into())?)
} else {
Ok(record_batch)
Expand Down
37 changes: 29 additions & 8 deletions kernel/src/scan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,14 +124,35 @@ impl ScanBuilder {
pub struct ScanResult {
/// Raw engine data as read from the disk for a particular file included in the query
pub raw_data: DeltaResult<Box<dyn EngineData>>,
/// If an item at mask\[i\] is true, the row at that row index is valid, otherwise if it is
/// false, the row at that row index is invalid and should be ignored. If the mask is *shorter*
/// than the number of rows returned, missing elements are considered `true`, i.e. included in
/// the query. If this is None, all rows are valid. NB: If you are using the default engine and
/// plan to call arrow's `filter_record_batch`, you _need_ to extend this vector to the full
/// length of the batch or arrow will drop the extra rows
/// Raw row mask.
// TODO(nick) this should be allocated by the engine
pub mask: Option<Vec<bool>>,
raw_mask: Option<Vec<bool>>,
}

impl ScanResult {
/// Returns the raw row mask. If an item at `raw_mask()[i]` is true, row `i` is
/// valid. Otherwise, row `i` is invalid and should be ignored.
///
/// The raw mask is dangerous to use because it may be shorter than expected. In particular, if
/// you are using the default engine and plan to call arrow's `filter_record_batch`, you _need_
/// to extend the mask to the full length of the batch or arrow will drop the extra
/// rows. Calling [`full_mask`] instead avoids this risk entirely, at the cost of a copy.
pub fn raw_mask(&self) -> Option<&Vec<bool>> {
self.raw_mask.as_ref()
}

/// Extends the underlying (raw) mask to match the row count of the accompanying data.
///
/// If the raw mask is *shorter* than the number of rows returned, missing elements are
/// considered `true`, i.e. included in the query. If the mask is `None`, all rows are valid.
///
/// NB: If you are using the default engine and plan to call arrow's `filter_record_batch`, you
/// _need_ to extend the mask to the full length of the batch or arrow will drop the extra rows.
pub fn full_mask(&self) -> Option<Vec<bool>> {
let mut mask = self.raw_mask.clone()?;
mask.resize(self.raw_data.as_ref().ok()?.length(), true);
Some(mask)
}
}

/// Scan uses this to set up what kinds of columns it is scanning. For `Selected` we just store the
Expand Down Expand Up @@ -314,7 +335,7 @@ impl Scan {
let rest = split_vector(sv.as_mut(), len, None);
let result = ScanResult {
raw_data: logical,
mask: sv,
raw_mask: sv,
};
selection_vector = rest;
Ok(result)
Expand Down
8 changes: 3 additions & 5 deletions kernel/tests/dv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,10 @@ fn count_total_scan_rows(
scan_result_iter
.map(|scan_result| {
let scan_result = scan_result?;
let data = scan_result.raw_data?;
// NOTE: The mask only suppresses rows for which it is both present and false.
let deleted_rows = scan_result
.mask
.as_ref()
.map_or(0, |mask| mask.iter().filter(|&&m| !m).count());
let mask = scan_result.raw_mask();
let deleted_rows = mask.into_iter().flatten().filter(|&&m| !m).count();
let data = scan_result.raw_data?;
Ok(data.length() - deleted_rows)
})
.fold_ok(0, Add::add)
Expand Down
8 changes: 2 additions & 6 deletions kernel/tests/golden_tables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,14 +161,10 @@ async fn latest_snapshot_test(
let batches: Vec<RecordBatch> = scan_res
.map(|scan_result| -> DeltaResult<_> {
let scan_result = scan_result?;
let mask = scan_result.full_mask();
let data = scan_result.raw_data?;
let record_batch = to_arrow(data)?;
if let Some(mut mask) = scan_result.mask {
let extra_rows = record_batch.num_rows() - mask.len();
if extra_rows > 0 {
// we need to extend the mask here in case it's too short
mask.extend(std::iter::repeat(true).take(extra_rows));
}
if let Some(mask) = mask {
Ok(filter_record_batch(&record_batch, &mask.into())?)
} else {
Ok(record_batch)
Expand Down
8 changes: 2 additions & 6 deletions kernel/tests/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -401,14 +401,10 @@ fn read_with_execute(
let batches: Vec<RecordBatch> = scan_results
.map(|scan_result| -> DeltaResult<_> {
let scan_result = scan_result?;
let mask = scan_result.full_mask();
let data = scan_result.raw_data?;
let record_batch = to_arrow(data)?;
if let Some(mut mask) = scan_result.mask {
let extra_rows = record_batch.num_rows() - mask.len();
if extra_rows > 0 {
// we need to extend the mask here in case it's too short
mask.extend(std::iter::repeat(true).take(extra_rows));
}
if let Some(mask) = mask {
Ok(filter_record_batch(&record_batch, &mask.into())?)
} else {
Ok(record_batch)
Expand Down

0 comments on commit ff6b128

Please sign in to comment.