Skip to content

Commit

Permalink
test: add test for FileScanTask::execute() for parquet data files
Browse files Browse the repository at this point in the history
  • Loading branch information
sdd committed Feb 20, 2024
1 parent 3733c0a commit 4f99eaf
Showing 1 changed file with 123 additions and 90 deletions.
213 changes: 123 additions & 90 deletions crates/iceberg/src/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,97 @@ mod tests {
))
.unwrap()
}

async fn setup_data_files(&mut self) {
let current_snapshot = self.table.metadata().current_snapshot().unwrap();
let parent_snapshot = current_snapshot
.parent_snapshot(self.table.metadata())
.unwrap();
let current_schema = current_snapshot.schema(self.table.metadata()).unwrap();
let current_partition_spec = self.table.metadata().default_partition_spec().unwrap();

// Write data files
let data_file_manifest = ManifestWriter::new(
self.next_manifest_file(),
current_snapshot.snapshot_id(),
vec![],
)
.write(Manifest::new(
ManifestMetadata::builder()
.schema((*current_schema).clone())
.content(ManifestContentType::Data)
.format_version(FormatVersion::V2)
.partition_spec((**current_partition_spec).clone())
.schema_id(current_schema.schema_id())
.build(),
vec![
ManifestEntry::builder()
.status(ManifestStatus::Added)
.data_file(
DataFile::builder()
.content(DataContentType::Data)
.file_path(format!("{}/1.parquet", &self.table_location))
.file_format(DataFileFormat::Parquet)
.file_size_in_bytes(100)
.record_count(1)
.partition(Struct::from_iter([Some(Literal::long(100))]))
.build(),
)
.build(),
ManifestEntry::builder()
.status(ManifestStatus::Deleted)
.snapshot_id(parent_snapshot.snapshot_id())
.sequence_number(parent_snapshot.sequence_number())
.file_sequence_number(parent_snapshot.sequence_number())
.data_file(
DataFile::builder()
.content(DataContentType::Data)
.file_path(format!("{}/2.parquet", &self.table_location))
.file_format(DataFileFormat::Parquet)
.file_size_in_bytes(100)
.record_count(1)
.partition(Struct::from_iter([Some(Literal::long(200))]))
.build(),
)
.build(),
ManifestEntry::builder()
.status(ManifestStatus::Existing)
.snapshot_id(parent_snapshot.snapshot_id())
.sequence_number(parent_snapshot.sequence_number())
.file_sequence_number(parent_snapshot.sequence_number())
.data_file(
DataFile::builder()
.content(DataContentType::Data)
.file_path(format!("{}/3.parquet", &self.table_location))
.file_format(DataFileFormat::Parquet)
.file_size_in_bytes(100)
.record_count(1)
.partition(Struct::from_iter([Some(Literal::long(300))]))
.build(),
)
.build(),
],
))
.await
.unwrap();

// Write to manifest list
let mut manifest_list_write = ManifestListWriter::v2(
self.table
.file_io()
.new_output(current_snapshot.manifest_list())
.unwrap(),
current_snapshot.snapshot_id(),
current_snapshot
.parent_snapshot_id()
.unwrap_or(EMPTY_SNAPSHOT_ID),
current_snapshot.sequence_number(),
);
manifest_list_write
.add_manifest_entries(vec![data_file_manifest].into_iter())
.unwrap();
manifest_list_write.close().await.unwrap();
}
}

#[test]
Expand Down Expand Up @@ -364,97 +455,9 @@ mod tests {

#[tokio::test]
async fn test_plan_files_no_deletions() {
let fixture = TableTestFixture::new();
let mut fixture = TableTestFixture::new();

let current_snapshot = fixture.table.metadata().current_snapshot().unwrap();
let parent_snapshot = current_snapshot
.parent_snapshot(fixture.table.metadata())
.unwrap();
let current_schema = current_snapshot.schema(fixture.table.metadata()).unwrap();
let current_partition_spec = fixture.table.metadata().default_partition_spec().unwrap();

// Write data files
let data_file_manifest = ManifestWriter::new(
fixture.next_manifest_file(),
current_snapshot.snapshot_id(),
vec![],
)
.write(Manifest::new(
ManifestMetadata::builder()
.schema((*current_schema).clone())
.content(ManifestContentType::Data)
.format_version(FormatVersion::V2)
.partition_spec((**current_partition_spec).clone())
.schema_id(current_schema.schema_id())
.build(),
vec![
ManifestEntry::builder()
.status(ManifestStatus::Added)
.data_file(
DataFile::builder()
.content(DataContentType::Data)
.file_path(format!("{}/1.parquet", &fixture.table_location))
.file_format(DataFileFormat::Parquet)
.file_size_in_bytes(100)
.record_count(1)
.partition(Struct::from_iter([Some(Literal::long(100))]))
.build(),
)
.build(),
ManifestEntry::builder()
.status(ManifestStatus::Deleted)
.snapshot_id(parent_snapshot.snapshot_id())
.sequence_number(parent_snapshot.sequence_number())
.file_sequence_number(parent_snapshot.sequence_number())
.data_file(
DataFile::builder()
.content(DataContentType::Data)
.file_path(format!("{}/2.parquet", &fixture.table_location))
.file_format(DataFileFormat::Parquet)
.file_size_in_bytes(100)
.record_count(1)
.partition(Struct::from_iter([Some(Literal::long(200))]))
.build(),
)
.build(),
ManifestEntry::builder()
.status(ManifestStatus::Existing)
.snapshot_id(parent_snapshot.snapshot_id())
.sequence_number(parent_snapshot.sequence_number())
.file_sequence_number(parent_snapshot.sequence_number())
.data_file(
DataFile::builder()
.content(DataContentType::Data)
.file_path(format!("{}/3.parquet", &fixture.table_location))
.file_format(DataFileFormat::Parquet)
.file_size_in_bytes(100)
.record_count(1)
.partition(Struct::from_iter([Some(Literal::long(300))]))
.build(),
)
.build(),
],
))
.await
.unwrap();

// Write to manifest list
let mut manifest_list_write = ManifestListWriter::v2(
fixture
.table
.file_io()
.new_output(current_snapshot.manifest_list())
.unwrap(),
current_snapshot.snapshot_id(),
current_snapshot
.parent_snapshot_id()
.unwrap_or(EMPTY_SNAPSHOT_ID),
current_snapshot.sequence_number(),
);
manifest_list_write
.add_manifest_entries(vec![data_file_manifest].into_iter())
.unwrap();
manifest_list_write.close().await.unwrap();
fixture.setup_data_files().await;

// Create table scan for current snapshot and plan files
let table_scan = fixture.table.scan().build().unwrap();
Expand Down Expand Up @@ -485,4 +488,34 @@ mod tests {
format!("{}/3.parquet", &fixture.table_location)
);
}

#[tokio::test]
async fn test_execute_parquet_no_deletions() {
let mut fixture = TableTestFixture::new();

fixture.setup_data_files().await;

// Create table scan for current snapshot and plan files
let table_scan = fixture.table.scan().build().unwrap();
let mut tasks = table_scan
.plan_files()
.await
.unwrap()
.try_fold(vec![], |mut acc, task| async move {
acc.push(task);
Ok(acc)
})
.await
.unwrap();

assert_eq!(tasks.len(), 2);

tasks.sort_by_key(|t| t.data_file.file_path().to_string());

let stream = tasks[0].execute().await.unwrap();

let batches: Vec<_> = stream.try_collect().await.unwrap();

assert!(!batches.is_empty());
}
}

0 comments on commit 4f99eaf

Please sign in to comment.