From 5d2cf488ff7f69393b1e3c73bf72f1dcdd832768 Mon Sep 17 00:00:00 2001 From: Robert Pack Date: Tue, 21 Jan 2025 21:55:18 +0100 Subject: [PATCH] fix: tombstone replay Signed-off-by: Robert Pack --- .gitignore | 1 - crates/core/src/kernel/snapshot_next/lazy.rs | 93 ++++++++++++-------- crates/core/src/kernel/snapshot_next/mod.rs | 13 +-- 3 files changed, 58 insertions(+), 49 deletions(-) diff --git a/.gitignore b/.gitignore index ee7ca99235..c5aca6465b 100644 --- a/.gitignore +++ b/.gitignore @@ -22,7 +22,6 @@ __blobstorage__ .githubchangeloggenerator.cache.log .githubchangeloggenerator.cache/ .githubchangeloggenerator* -data .zed/ # Add all Cargo.lock files except for those in binary crates diff --git a/crates/core/src/kernel/snapshot_next/lazy.rs b/crates/core/src/kernel/snapshot_next/lazy.rs index 20ccfb7031..3a203e1ad1 100644 --- a/crates/core/src/kernel/snapshot_next/lazy.rs +++ b/crates/core/src/kernel/snapshot_next/lazy.rs @@ -9,15 +9,16 @@ use delta_kernel::actions::set_transaction::{SetTransactionMap, SetTransactionSc use delta_kernel::actions::{get_log_schema, REMOVE_NAME}; use delta_kernel::actions::{Metadata, Protocol, SetTransaction}; use delta_kernel::engine::arrow_data::ArrowEngineData; +use delta_kernel::engine::arrow_expression::evaluate_expression; use delta_kernel::engine::default::executor::tokio::{ TokioBackgroundExecutor, TokioMultiThreadExecutor, }; use delta_kernel::engine::default::DefaultEngine; use delta_kernel::log_segment::LogSegment; -use delta_kernel::schema::Schema; +use delta_kernel::schema::{DataType, Schema}; use delta_kernel::snapshot::Snapshot as SnapshotInner; use delta_kernel::table_properties::TableProperties; -use delta_kernel::{Engine, Expression, ExpressionRef, Table, Version}; +use delta_kernel::{Engine, Expression, ExpressionHandler, ExpressionRef, Table, Version}; use itertools::Itertools; use object_store::path::Path; use object_store::ObjectStore; @@ -25,7 +26,7 @@ use url::Url; use super::cache::CommitCacheObjectStore; use super::{replay_file_actions, Snapshot}; -use crate::kernel::{Action, CommitInfo}; +use crate::kernel::{Action, CommitInfo, ARROW_HANDLER}; use crate::{DeltaResult, DeltaTableError}; // TODO: avoid repetitive parsing of json stats @@ -94,11 +95,8 @@ impl Snapshot for LazySnapshot { } fn tombstones(&self) -> DeltaResult>>> { - static META_PREDICATE: LazyLock> = LazyLock::new(|| { - Some(Arc::new( - Expression::column([REMOVE_NAME, "path"]).is_not_null(), - )) - }); + static META_PREDICATE: LazyLock = + LazyLock::new(|| Arc::new(Expression::column([REMOVE_NAME, "path"]).is_not_null())); let read_schema = get_log_schema().project(&[REMOVE_NAME])?; Ok(Box::new( self.inner @@ -107,9 +105,23 @@ impl Snapshot for LazySnapshot { self.engine.as_ref(), read_schema.clone(), read_schema, - META_PREDICATE.clone(), + Some(META_PREDICATE.clone()), )? - .map_ok(|(d, _)| Ok(RecordBatch::from(ArrowEngineData::try_from_engine_data(d)?))) + .map_ok(|(d, _)| { + let batch = RecordBatch::from(ArrowEngineData::try_from_engine_data(d)?); + let selection = evaluate_expression( + META_PREDICATE.as_ref(), + &batch, + Some(&DataType::BOOLEAN), + )?; + let filter = selection + .as_any() + .downcast_ref::() + .ok_or_else(|| { + DeltaTableError::generic("failed to downcast to BooleanArray") + })?; + Ok(filter_record_batch(&batch, filter)?) + }) .flatten(), )) } @@ -247,37 +259,46 @@ impl LazySnapshot { #[cfg(test)] mod tests { - use deltalake_test::acceptance::{read_dat_case, TestCaseInfo}; + use delta_kernel::schema::StructType; + use deltalake_test::utils::*; use deltalake_test::TestResult; - use super::super::tests::get_dat_dir; use super::*; async fn load_snapshot() -> TestResult<()> { - // some comment - let mut dat_dir = get_dat_dir(); - dat_dir.push("multi_partitioned"); - - let dat_info: TestCaseInfo = read_dat_case(dat_dir)?; - let table_info = dat_info.table_summary()?; - - let table = Table::try_from_uri(dat_info.table_root()?)?; - - let snapshot = LazySnapshot::try_new( - table, - Arc::new(object_store::local::LocalFileSystem::default()), - None, - ) - .await?; - - assert_eq!(snapshot.version(), table_info.version); - assert_eq!( - ( - snapshot.protocol().min_reader_version(), - snapshot.protocol().min_writer_version() - ), - (table_info.min_reader_version, table_info.min_writer_version) - ); + let ctx = IntegrationContext::new(Box::::default())?; + ctx.load_table(TestTables::Simple).await?; + + let store = ctx + .table_builder(TestTables::Simple) + .build_storage()? + .object_store(None); + let table = Table::try_from_uri("memory:///")?; + let snapshot = LazySnapshot::try_new(table, store, None).await?; + + let schema_string = r#"{"type":"struct","fields":[{"name":"id","type":"long","nullable":true,"metadata":{}}]}"#; + let expected: StructType = serde_json::from_str(schema_string)?; + assert_eq!(snapshot.schema(), &expected); + + let infos = snapshot.commit_infos(None, None)?.collect_vec(); + assert_eq!(infos.len(), 5); + + let tombstones: Vec<_> = snapshot.tombstones()?.try_collect()?; + let num_tombstones = tombstones.iter().map(|b| b.num_rows() as i64).sum::(); + assert_eq!(num_tombstones, 31); + + let expected = vec![ + "part-00000-2befed33-c358-4768-a43c-3eda0d2a499d-c000.snappy.parquet", + "part-00000-c1777d7d-89d9-4790-b38a-6ee7e24456b1-c000.snappy.parquet", + "part-00001-7891c33d-cedc-47c3-88a6-abcfb049d3b4-c000.snappy.parquet", + "part-00004-315835fe-fb44-4562-98f6-5e6cfa3ae45d-c000.snappy.parquet", + "part-00007-3a0e4727-de0d-41b6-81ef-5223cf40f025-c000.snappy.parquet", + ]; + let file_names: Vec<_> = snapshot + .logical_files_view(None)? + .map_ok(|f| f.path().to_owned()) + .try_collect()?; + assert_eq!(file_names, expected); Ok(()) } diff --git a/crates/core/src/kernel/snapshot_next/mod.rs b/crates/core/src/kernel/snapshot_next/mod.rs index a415f71a99..079ceb0298 100644 --- a/crates/core/src/kernel/snapshot_next/mod.rs +++ b/crates/core/src/kernel/snapshot_next/mod.rs @@ -361,24 +361,13 @@ fn scan_as_log_data( #[cfg(test)] mod tests { - use std::{future::Future, path::PathBuf, pin::Pin}; + use std::{future::Future, pin::Pin}; use delta_kernel::Table; use deltalake_test::utils::*; use super::*; - pub(super) fn get_dat_dir() -> PathBuf { - let d = PathBuf::from(env!("CARGO_MANIFEST_DIR")); - let mut rep_root = d - .parent() - .and_then(|p| p.parent()) - .expect("valid directory") - .to_path_buf(); - rep_root.push("dat/out/reader_tests/generated"); - rep_root - } - fn get_lazy( ctx: &IntegrationContext, table: TestTables,