Skip to content

Commit

Permalink
scan: fix error when reading an empty table
Browse files Browse the repository at this point in the history
Previously TableScan struct was requiring a Snapshot to plan files and
for empty tables without a snapshot an error was being returned instead
of an empty result.

Following the same approach of Java [0] and Python [1] implementation
this commit change the snapshot property to accept None values and the
`plan_files` method was also changed to return an empty stream if the
snapshot is not present on on PlanContext.

[0] https://github.com/apache/iceberg/blob/main/core/src/main/java/org/apache/iceberg/SnapshotScan.java#L119
[1] https://github.com/apache/iceberg-python/blob/main/pyiceberg/table/__init__.py#L1979

Fixes: #580
  • Loading branch information
mattheusv committed Sep 6, 2024
1 parent ae75f96 commit 39b8da9
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 34 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
[workspace]
resolver = "2"
members = [
"crates/cli",
"crates/catalog/*",
"crates/examples",
"crates/iceberg",
Expand Down
83 changes: 49 additions & 34 deletions crates/iceberg/src/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use std::sync::{Arc, RwLock};

use arrow_array::RecordBatch;
use futures::channel::mpsc::{channel, Sender};
use futures::stream::BoxStream;
use futures::stream::{self, BoxStream};
use futures::{SinkExt, StreamExt, TryFutureExt, TryStreamExt};
use serde::{Deserialize, Serialize};

Expand Down Expand Up @@ -160,29 +160,33 @@ impl<'a> TableScanBuilder<'a> {
/// Build the table scan.
pub fn build(self) -> Result<TableScan> {
let snapshot = match self.snapshot_id {
Some(snapshot_id) => self
.table
.metadata()
.snapshot_by_id(snapshot_id)
.ok_or_else(|| {
Error::new(
ErrorKind::DataInvalid,
format!("Snapshot with id {} not found", snapshot_id),
)
})?
.clone(),
None => self
.table
.metadata()
.current_snapshot()
.ok_or_else(|| {
Error::new(
ErrorKind::FeatureUnsupported,
"Can't scan table without snapshots",
)
})?
.clone(),
Some(snapshot_id) => Some(
self.table
.metadata()
.snapshot_by_id(snapshot_id)
.ok_or_else(|| {
Error::new(
ErrorKind::DataInvalid,
format!("Snapshot with id {} not found", snapshot_id),
)
})?,
),
None => self.table.metadata().current_snapshot(),
};
if snapshot.is_none() {
return Ok(TableScan {
plan_context: None,
batch_size: None,
file_io: self.table.file_io().clone(),
column_names: self.column_names,
concurrency_limit_data_files: self.concurrency_limit_data_files,
concurrency_limit_manifest_entries: self.concurrency_limit_manifest_entries,
concurrency_limit_manifest_files: self.concurrency_limit_manifest_files,
row_group_filtering_enabled: self.row_group_filtering_enabled,
});
}

let snapshot = snapshot.unwrap();

let schema = snapshot.schema(self.table.metadata())?;

Expand Down Expand Up @@ -246,7 +250,7 @@ impl<'a> TableScanBuilder<'a> {
};

let plan_context = PlanContext {
snapshot,
snapshot: snapshot.clone(),
table_metadata: self.table.metadata_ref(),
snapshot_schema: schema,
case_sensitive: self.case_sensitive,
Expand All @@ -263,7 +267,7 @@ impl<'a> TableScanBuilder<'a> {
batch_size: self.batch_size,
column_names: self.column_names,
file_io: self.table.file_io().clone(),
plan_context,
plan_context: Some(plan_context),
concurrency_limit_data_files: self.concurrency_limit_data_files,
concurrency_limit_manifest_entries: self.concurrency_limit_manifest_entries,
concurrency_limit_manifest_files: self.concurrency_limit_manifest_files,
Expand All @@ -275,7 +279,7 @@ impl<'a> TableScanBuilder<'a> {
/// Table scan.
#[derive(Debug)]
pub struct TableScan {
plan_context: PlanContext,
plan_context: Option<PlanContext>,
batch_size: Option<usize>,
file_io: FileIO,
column_names: Vec<String>,
Expand Down Expand Up @@ -316,6 +320,12 @@ struct PlanContext {
impl TableScan {
/// Returns a stream of [`FileScanTask`]s.
pub async fn plan_files(&self) -> Result<FileScanTaskStream> {
if self.plan_context.is_none() {
return Ok(stream::empty().boxed());
};

let plan_context = &self.plan_context.as_ref().unwrap();

let concurrency_limit_manifest_files = self.concurrency_limit_manifest_files;
let concurrency_limit_manifest_entries = self.concurrency_limit_manifest_entries;

Expand All @@ -325,14 +335,13 @@ impl TableScan {
// used to stream the results back to the caller
let (file_scan_task_tx, file_scan_task_rx) = channel(concurrency_limit_manifest_entries);

let manifest_list = self.plan_context.get_manifest_list().await?;
let manifest_list = plan_context.get_manifest_list().await?;

// get the [`ManifestFile`]s from the [`ManifestList`], filtering out any
// whose content type is not Data or whose partitions cannot match this
// scan's filter
let manifest_file_contexts = self
.plan_context
.build_manifest_file_contexts(manifest_list, manifest_entry_ctx_tx)?;
let manifest_file_contexts =
plan_context.build_manifest_file_contexts(manifest_list, manifest_entry_ctx_tx)?;

let mut channel_for_manifest_error = file_scan_task_tx.clone();

Expand Down Expand Up @@ -392,8 +401,11 @@ impl TableScan {
&self.column_names
}
/// Returns a reference to the snapshot of the table scan.
pub fn snapshot(&self) -> &SnapshotRef {
&self.plan_context.snapshot
pub fn snapshot(&self) -> Option<&SnapshotRef> {
match &self.plan_context {
Some(plan_context) => Some(&plan_context.snapshot),
None => None,
}
}

async fn process_manifest_entry(
Expand Down Expand Up @@ -1175,7 +1187,7 @@ mod tests {
let table_scan = table.scan().build().unwrap();
assert_eq!(
table.metadata().current_snapshot().unwrap().snapshot_id(),
table_scan.snapshot().snapshot_id()
table_scan.snapshot().unwrap().snapshot_id()
);
}

Expand All @@ -1196,7 +1208,10 @@ mod tests {
.snapshot_id(3051729675574597004)
.build()
.unwrap();
assert_eq!(table_scan.snapshot().snapshot_id(), 3051729675574597004);
assert_eq!(
table_scan.snapshot().unwrap().snapshot_id(),
3051729675574597004
);
}

#[tokio::test]
Expand Down

0 comments on commit 39b8da9

Please sign in to comment.