diff --git a/Cargo.toml b/Cargo.toml index 8d04f6799..b96b97cf8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,6 +18,7 @@ [workspace] resolver = "2" members = [ + "crates/cli", "crates/catalog/*", "crates/examples", "crates/iceberg", diff --git a/crates/iceberg/src/scan.rs b/crates/iceberg/src/scan.rs index 45d7d4fd1..fa1ee3af6 100644 --- a/crates/iceberg/src/scan.rs +++ b/crates/iceberg/src/scan.rs @@ -22,7 +22,7 @@ use std::sync::{Arc, RwLock}; use arrow_array::RecordBatch; use futures::channel::mpsc::{channel, Sender}; -use futures::stream::BoxStream; +use futures::stream::{self, BoxStream}; use futures::{SinkExt, StreamExt, TryFutureExt, TryStreamExt}; use serde::{Deserialize, Serialize}; @@ -160,29 +160,33 @@ impl<'a> TableScanBuilder<'a> { /// Build the table scan. pub fn build(self) -> Result { let snapshot = match self.snapshot_id { - Some(snapshot_id) => self - .table - .metadata() - .snapshot_by_id(snapshot_id) - .ok_or_else(|| { - Error::new( - ErrorKind::DataInvalid, - format!("Snapshot with id {} not found", snapshot_id), - ) - })? - .clone(), - None => self - .table - .metadata() - .current_snapshot() - .ok_or_else(|| { - Error::new( - ErrorKind::FeatureUnsupported, - "Can't scan table without snapshots", - ) - })? - .clone(), + Some(snapshot_id) => Some( + self.table + .metadata() + .snapshot_by_id(snapshot_id) + .ok_or_else(|| { + Error::new( + ErrorKind::DataInvalid, + format!("Snapshot with id {} not found", snapshot_id), + ) + })?, + ), + None => self.table.metadata().current_snapshot(), }; + if snapshot.is_none() { + return Ok(TableScan { + plan_context: None, + batch_size: None, + file_io: self.table.file_io().clone(), + column_names: self.column_names, + concurrency_limit_data_files: self.concurrency_limit_data_files, + concurrency_limit_manifest_entries: self.concurrency_limit_manifest_entries, + concurrency_limit_manifest_files: self.concurrency_limit_manifest_files, + row_group_filtering_enabled: self.row_group_filtering_enabled, + }); + } + + let snapshot = snapshot.unwrap(); let schema = snapshot.schema(self.table.metadata())?; @@ -246,7 +250,7 @@ impl<'a> TableScanBuilder<'a> { }; let plan_context = PlanContext { - snapshot, + snapshot: snapshot.clone(), table_metadata: self.table.metadata_ref(), snapshot_schema: schema, case_sensitive: self.case_sensitive, @@ -263,7 +267,7 @@ impl<'a> TableScanBuilder<'a> { batch_size: self.batch_size, column_names: self.column_names, file_io: self.table.file_io().clone(), - plan_context, + plan_context: Some(plan_context), concurrency_limit_data_files: self.concurrency_limit_data_files, concurrency_limit_manifest_entries: self.concurrency_limit_manifest_entries, concurrency_limit_manifest_files: self.concurrency_limit_manifest_files, @@ -275,7 +279,7 @@ impl<'a> TableScanBuilder<'a> { /// Table scan. #[derive(Debug)] pub struct TableScan { - plan_context: PlanContext, + plan_context: Option, batch_size: Option, file_io: FileIO, column_names: Vec, @@ -316,6 +320,12 @@ struct PlanContext { impl TableScan { /// Returns a stream of [`FileScanTask`]s. pub async fn plan_files(&self) -> Result { + if self.plan_context.is_none() { + return Ok(stream::empty().boxed()); + }; + + let plan_context = &self.plan_context.as_ref().unwrap(); + let concurrency_limit_manifest_files = self.concurrency_limit_manifest_files; let concurrency_limit_manifest_entries = self.concurrency_limit_manifest_entries; @@ -325,14 +335,13 @@ impl TableScan { // used to stream the results back to the caller let (file_scan_task_tx, file_scan_task_rx) = channel(concurrency_limit_manifest_entries); - let manifest_list = self.plan_context.get_manifest_list().await?; + let manifest_list = plan_context.get_manifest_list().await?; // get the [`ManifestFile`]s from the [`ManifestList`], filtering out any // whose content type is not Data or whose partitions cannot match this // scan's filter - let manifest_file_contexts = self - .plan_context - .build_manifest_file_contexts(manifest_list, manifest_entry_ctx_tx)?; + let manifest_file_contexts = + plan_context.build_manifest_file_contexts(manifest_list, manifest_entry_ctx_tx)?; let mut channel_for_manifest_error = file_scan_task_tx.clone(); @@ -392,8 +401,11 @@ impl TableScan { &self.column_names } /// Returns a reference to the snapshot of the table scan. - pub fn snapshot(&self) -> &SnapshotRef { - &self.plan_context.snapshot + pub fn snapshot(&self) -> Option<&SnapshotRef> { + match &self.plan_context { + Some(plan_context) => Some(&plan_context.snapshot), + None => None, + } } async fn process_manifest_entry( @@ -1175,7 +1187,7 @@ mod tests { let table_scan = table.scan().build().unwrap(); assert_eq!( table.metadata().current_snapshot().unwrap().snapshot_id(), - table_scan.snapshot().snapshot_id() + table_scan.snapshot().unwrap().snapshot_id() ); } @@ -1196,7 +1208,10 @@ mod tests { .snapshot_id(3051729675574597004) .build() .unwrap(); - assert_eq!(table_scan.snapshot().snapshot_id(), 3051729675574597004); + assert_eq!( + table_scan.snapshot().unwrap().snapshot_id(), + 3051729675574597004 + ); } #[tokio::test]