Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement manifest filtering in TableScan #323

Merged
merged 1 commit into from
Apr 26, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 79 additions & 3 deletions crates/iceberg/src/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,20 @@
//! Table scan api.

use crate::arrow::ArrowReaderBuilder;
use crate::expr::visitors::manifest_evaluator::ManifestEvaluator;
use crate::expr::{Bind, Predicate};
use crate::io::FileIO;
use crate::spec::{DataContentType, ManifestEntryRef, SchemaRef, SnapshotRef, TableMetadataRef};
use crate::spec::{
DataContentType, ManifestEntryRef, SchemaRef, SnapshotRef, TableMetadata, TableMetadataRef,
};
use crate::table::Table;
use crate::{Error, ErrorKind};
use arrow_array::RecordBatch;
use async_stream::try_stream;
use futures::stream::{iter, BoxStream};
use futures::StreamExt;
use std::collections::HashMap;
use std::sync::Arc;

/// Builder to create table scan.
pub struct TableScanBuilder<'a> {
Expand All @@ -34,6 +40,8 @@ pub struct TableScanBuilder<'a> {
column_names: Vec<String>,
snapshot_id: Option<i64>,
batch_size: Option<usize>,
case_sensitive: bool,
filter: Option<Predicate>,
}

impl<'a> TableScanBuilder<'a> {
Expand All @@ -43,6 +51,8 @@ impl<'a> TableScanBuilder<'a> {
column_names: vec![],
snapshot_id: None,
batch_size: None,
case_sensitive: true,
filter: None,
}
}

Expand All @@ -53,6 +63,20 @@ impl<'a> TableScanBuilder<'a> {
self
}

/// Sets the scan's case sensitivity
pub fn with_case_sensitive(mut self, case_sensitive: bool) -> Self {
self.case_sensitive = case_sensitive;
self
}

/// Specifies a predicate to use as a filter
pub fn with_filter(mut self, predicate: Predicate) -> Self {
// calls rewrite_not to remove Not nodes, which must be absent
// when applying the manifest evaluator
self.filter = Some(predicate.rewrite_not());
self
}

/// Select all columns.
pub fn select_all(mut self) -> Self {
self.column_names.clear();
Expand Down Expand Up @@ -125,6 +149,8 @@ impl<'a> TableScanBuilder<'a> {
column_names: self.column_names,
schema,
batch_size: self.batch_size,
case_sensitive: self.case_sensitive,
filter: self.filter.map(Arc::new),
})
}
}
Expand All @@ -139,17 +165,29 @@ pub struct TableScan {
column_names: Vec<String>,
schema: SchemaRef,
batch_size: Option<usize>,
case_sensitive: bool,
filter: Option<Arc<Predicate>>,
}

/// A stream of [`FileScanTask`].
pub type FileScanTaskStream = BoxStream<'static, crate::Result<FileScanTask>>;

impl TableScan {
/// Returns a stream of file scan tasks.

pub async fn plan_files(&self) -> crate::Result<FileScanTaskStream> {
// Cache `ManifestEvaluatorFactory`s created as part of this scan
let mut manifest_evaluator_cache: HashMap<i32, ManifestEvaluator> = HashMap::new();

// these variables needed to ensure that we don't need to pass a
// reference to self into `try_stream`, as it expects references
// passed in to outlive 'static
let schema = self.schema.clone();
let snapshot = self.snapshot.clone();
let table_metadata = self.table_metadata.clone();
let file_io = self.file_io.clone();
let case_sensitive = self.case_sensitive;
let filter = self.filter.clone();

Ok(try_stream! {
let manifest_list = snapshot
Expand All @@ -158,8 +196,24 @@ impl TableScan {
.await?;

// Generate data file stream
let mut entries = iter(manifest_list.entries());
while let Some(entry) = entries.next().await {
for entry in manifest_list.entries() {
// If this scan has a filter, check the partition evaluator cache for an existing
// PartitionEvaluator that matches this manifest's partition spec ID.
// Use one from the cache if there is one. If not, create one, put it in
// the cache, and take a reference to it.
#[allow(clippy::map_entry)]
if let Some(filter) = filter.as_ref() {
if !manifest_evaluator_cache.contains_key(&entry.partition_spec_id) {
liurenjie1024 marked this conversation as resolved.
Show resolved Hide resolved
manifest_evaluator_cache.insert(entry.partition_spec_id, Self::create_manifest_evaluator(entry.partition_spec_id, schema.clone(), table_metadata.clone(), case_sensitive, filter)?);
}
let manifest_evaluator = &manifest_evaluator_cache[&entry.partition_spec_id];

// reject any manifest files whose partition values don't match the filter.
if !manifest_evaluator.eval(entry)? {
continue;
}
}

let manifest = entry.load_manifest(&file_io).await?;

let mut manifest_entries = iter(manifest.entries().iter().filter(|e| e.is_alive()));
Expand All @@ -186,6 +240,28 @@ impl TableScan {
.boxed())
}

fn create_manifest_evaluator(
id: i32,
schema: SchemaRef,
table_metadata: Arc<TableMetadata>,
case_sensitive: bool,
filter: &Predicate,
) -> crate::Result<ManifestEvaluator> {
let bound_predicate = filter.bind(schema.clone(), case_sensitive)?;

let partition_spec = table_metadata.partition_spec_by_id(id).ok_or(Error::new(
ErrorKind::Unexpected,
format!("Could not find partition spec for id {id}"),
))?;

ManifestEvaluator::new(
partition_spec.clone(),
schema.clone(),
bound_predicate,
case_sensitive,
)
}

pub async fn to_arrow(&self) -> crate::Result<ArrowRecordBatchStream> {
let mut arrow_reader_builder =
ArrowReaderBuilder::new(self.file_io.clone(), self.schema.clone());
Expand Down
Loading