diff --git a/Cargo.toml b/Cargo.toml index a48f8c7894..9fe8c44bb3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -26,7 +26,7 @@ debug = true debug = "line-tables-only" [workspace.dependencies] -delta_kernel = { version = "0.4.1", features = ["sync-engine"] } +delta_kernel = { version = "0.4.1", features = ["default-engine"] } #delta_kernel = { path = "../delta-kernel-rs/kernel", features = ["sync-engine"] } # arrow diff --git a/crates/core/Cargo.toml b/crates/core/Cargo.toml index 24f4ac8777..57a9496070 100644 --- a/crates/core/Cargo.toml +++ b/crates/core/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "deltalake-core" -version = "0.22.2" +version = "0.22.3" authors.workspace = true keywords.workspace = true readme.workspace = true diff --git a/crates/core/src/kernel/snapshot/log_data.rs b/crates/core/src/kernel/snapshot/log_data.rs index 562ca3da90..f22f88ad23 100644 --- a/crates/core/src/kernel/snapshot/log_data.rs +++ b/crates/core/src/kernel/snapshot/log_data.rs @@ -245,23 +245,14 @@ impl LogicalFile<'_> { /// Defines a deletion vector pub fn deletion_vector(&self) -> Option> { - if let Some(arr) = self.deletion_vector.as_ref() { - // With v0.22 and the upgrade to a more recent arrow. Reading nullable structs with - // non-nullable entries back out of parquet is resulting in the DeletionVector having - // an empty string rather than a null. The addition check on the value ensures that a - // [DeletionVectorView] is not created in this scenario - // - // - if arr.storage_type.is_valid(self.index) - && !arr.storage_type.value(self.index).is_empty() - { - return Some(DeletionVectorView { + self.deletion_vector.as_ref().and_then(|arr| { + arr.storage_type + .is_valid(self.index) + .then_some(DeletionVectorView { data: arr, index: self.index, - }); - } - } - None + }) + }) } /// The number of records stored in the data file. @@ -380,18 +371,23 @@ impl<'a> FileStatsAccessor<'a> { ); let deletion_vector = extract_and_cast_opt::(data, "add.deletionVector"); let deletion_vector = deletion_vector.and_then(|dv| { - let storage_type = extract_and_cast::(dv, "storageType").ok()?; - let path_or_inline_dv = extract_and_cast::(dv, "pathOrInlineDv").ok()?; - let size_in_bytes = extract_and_cast::(dv, "sizeInBytes").ok()?; - let cardinality = extract_and_cast::(dv, "cardinality").ok()?; - let offset = extract_and_cast_opt::(dv, "offset"); - Some(DeletionVector { - storage_type, - path_or_inline_dv, - size_in_bytes, - cardinality, - offset, - }) + if dv.null_count() == dv.len() { + None + } else { + let storage_type = extract_and_cast::(dv, "storageType").ok()?; + let path_or_inline_dv = + extract_and_cast::(dv, "pathOrInlineDv").ok()?; + let size_in_bytes = extract_and_cast::(dv, "sizeInBytes").ok()?; + let cardinality = extract_and_cast::(dv, "cardinality").ok()?; + let offset = extract_and_cast_opt::(dv, "offset"); + Some(DeletionVector { + storage_type, + path_or_inline_dv, + size_in_bytes, + cardinality, + offset, + }) + } }); Ok(Self { diff --git a/crates/core/src/kernel/snapshot/parse.rs b/crates/core/src/kernel/snapshot/parse.rs index f75744691e..e8630cbe0c 100644 --- a/crates/core/src/kernel/snapshot/parse.rs +++ b/crates/core/src/kernel/snapshot/parse.rs @@ -78,6 +78,10 @@ pub(super) fn read_adds(array: &dyn ProvidesColumnByName) -> DeltaResult(array, "add") { + // Stop early if all values are null + if arr.null_count() == arr.len() { + return Ok(vec![]); + } let path = ex::extract_and_cast::(arr, "path")?; let pvs = ex::extract_and_cast_opt::(arr, "partitionValues"); let size = ex::extract_and_cast::(arr, "size")?; @@ -94,22 +98,33 @@ pub(super) fn read_adds(array: &dyn ProvidesColumnByName) -> DeltaResult(d, "sizeInBytes")?; let cardinality = ex::extract_and_cast::(d, "cardinality")?; - Box::new(|idx: usize| { - if ex::read_str(storage_type, idx).is_ok() { - Some(DeletionVectorDescriptor { - storage_type: std::str::FromStr::from_str( - ex::read_str(storage_type, idx).ok()?, - ) - .ok()?, - path_or_inline_dv: ex::read_str(path_or_inline_dv, idx).ok()?.to_string(), - offset: ex::read_primitive_opt(offset, idx), - size_in_bytes: ex::read_primitive(size_in_bytes, idx).ok()?, - cardinality: ex::read_primitive(cardinality, idx).ok()?, - }) - } else { - None - } - }) + // Column might exist but have nullability set for the whole array, so we just return Nones + if d.null_count() == d.len() { + Box::new(|_| None) + } else { + Box::new(|idx: usize| { + d.is_valid(idx) + .then(|| { + if ex::read_str(storage_type, idx).is_ok() { + Some(DeletionVectorDescriptor { + storage_type: std::str::FromStr::from_str( + ex::read_str(storage_type, idx).ok()?, + ) + .ok()?, + path_or_inline_dv: ex::read_str(path_or_inline_dv, idx) + .ok()? + .to_string(), + offset: ex::read_primitive_opt(offset, idx), + size_in_bytes: ex::read_primitive(size_in_bytes, idx).ok()?, + cardinality: ex::read_primitive(cardinality, idx).ok()?, + }) + } else { + None + } + }) + .flatten() + }) + } } else { Box::new(|_| None) }; @@ -210,22 +225,33 @@ pub(super) fn read_removes(array: &dyn ProvidesColumnByName) -> DeltaResult(d, "sizeInBytes")?; let cardinality = ex::extract_and_cast::(d, "cardinality")?; - Box::new(|idx: usize| { - if ex::read_str(storage_type, idx).is_ok() { - Some(DeletionVectorDescriptor { - storage_type: std::str::FromStr::from_str( - ex::read_str(storage_type, idx).ok()?, - ) - .ok()?, - path_or_inline_dv: ex::read_str(path_or_inline_dv, idx).ok()?.to_string(), - offset: ex::read_primitive_opt(offset, idx), - size_in_bytes: ex::read_primitive(size_in_bytes, idx).ok()?, - cardinality: ex::read_primitive(cardinality, idx).ok()?, - }) - } else { - None - } - }) + // Column might exist but have nullability set for the whole array, so we just return Nones + if d.null_count() == d.len() { + Box::new(|_| None) + } else { + Box::new(|idx: usize| { + d.is_valid(idx) + .then(|| { + if ex::read_str(storage_type, idx).is_ok() { + Some(DeletionVectorDescriptor { + storage_type: std::str::FromStr::from_str( + ex::read_str(storage_type, idx).ok()?, + ) + .ok()?, + path_or_inline_dv: ex::read_str(path_or_inline_dv, idx) + .ok()? + .to_string(), + offset: ex::read_primitive_opt(offset, idx), + size_in_bytes: ex::read_primitive(size_in_bytes, idx).ok()?, + cardinality: ex::read_primitive(cardinality, idx).ok()?, + }) + } else { + None + } + }) + .flatten() + }) + } } else { Box::new(|_| None) }; diff --git a/crates/core/src/kernel/snapshot/replay.rs b/crates/core/src/kernel/snapshot/replay.rs index 1b18b61bc7..6267a7f3be 100644 --- a/crates/core/src/kernel/snapshot/replay.rs +++ b/crates/core/src/kernel/snapshot/replay.rs @@ -20,7 +20,7 @@ use hashbrown::HashSet; use itertools::Itertools; use percent_encoding::percent_decode_str; use pin_project_lite::pin_project; -use tracing::debug; +use tracing::log::*; use super::parse::collect_map; use super::ReplayVisitor; @@ -440,6 +440,14 @@ pub(super) struct DVInfo<'a> { fn seen_key(info: &FileInfo<'_>) -> String { let path = percent_decode_str(info.path).decode_utf8_lossy(); if let Some(dv) = &info.dv { + // If storage_type is empty then delta-rs has somehow gotten an empty rather than a null + // deletion vector, oooof + // + // See #3030 + if dv.storage_type.is_empty() { + warn!("An empty but not nullable deletionVector was seen for {info:?}"); + return path.to_string(); + } if let Some(offset) = &dv.offset { format!( "{}::{}{}@{offset}", @@ -551,22 +559,32 @@ fn read_file_info<'a>(arr: &'a dyn ProvidesColumnByName) -> DeltaResult(d, "pathOrInlineDv")?; let offset = ex::extract_and_cast::(d, "offset")?; - Box::new(|idx: usize| { - if ex::read_str(storage_type, idx).is_ok() { - Ok(Some(DVInfo { - storage_type: ex::read_str(storage_type, idx)?, - path_or_inline_dv: ex::read_str(path_or_inline_dv, idx)?, - offset: ex::read_primitive_opt(offset, idx), - })) - } else { - Ok(None) - } - }) + // Column might exist but have nullability set for the whole array, so we just return Nones + if d.null_count() == d.len() { + Box::new(|_| Ok(None)) + } else { + Box::new(|idx: usize| { + if d.is_valid(idx) { + if ex::read_str(storage_type, idx).is_ok() { + Ok(Some(DVInfo { + storage_type: ex::read_str(storage_type, idx)?, + path_or_inline_dv: ex::read_str(path_or_inline_dv, idx)?, + offset: ex::read_primitive_opt(offset, idx), + })) + } else { + Ok(None) + } + } else { + Ok(None) + } + }) + } } else { Box::new(|_| Ok(None)) }; let mut adds = Vec::with_capacity(path.len()); + for idx in 0..path.len() { let value = path .is_valid(idx) @@ -579,6 +597,7 @@ fn read_file_info<'a>(arr: &'a dyn ProvidesColumnByName) -> DeltaResult + #[cfg(feature = "datafusion")] #[tokio::test] async fn test_create_checkpoint_overwrite() -> DeltaResult<()> { use crate::protocol::SaveMode; + use crate::writer::test_utils::datafusion::get_data_sorted; use crate::writer::test_utils::get_arrow_schema; + use datafusion::assert_batches_sorted_eq; + + let tmp_dir = tempfile::tempdir().unwrap(); + let tmp_path = std::fs::canonicalize(tmp_dir.path()).unwrap(); let batch = RecordBatch::try_new( Arc::clone(&get_arrow_schema(&None)), @@ -1177,13 +1183,15 @@ mod tests { ], ) .unwrap(); - let table = DeltaOps::try_from_uri_with_storage_options("memory://", HashMap::default()) + + let mut table = DeltaOps::try_from_uri(tmp_path.as_os_str().to_str().unwrap()) .await? .write(vec![batch]) .await?; + table.load().await?; assert_eq!(table.version(), 0); - create_checkpoint_for(0, table.snapshot().unwrap(), table.log_store.as_ref()).await?; + create_checkpoint(&table).await?; let batch = RecordBatch::try_new( Arc::clone(&get_arrow_schema(&None)), @@ -1194,11 +1202,23 @@ mod tests { ], ) .unwrap(); - let table = DeltaOps(table) + + let table = DeltaOps::try_from_uri(tmp_path.as_os_str().to_str().unwrap()) + .await? .write(vec![batch]) .with_save_mode(SaveMode::Overwrite) .await?; assert_eq!(table.version(), 1); + + let expected = [ + "+----+-------+------------+", + "| id | value | modified |", + "+----+-------+------------+", + "| A | 0 | 2021-02-02 |", + "+----+-------+------------+", + ]; + let actual = get_data_sorted(&table, "id,value,modified").await; + assert_batches_sorted_eq!(&expected, &actual); Ok(()) } } diff --git a/crates/deltalake/Cargo.toml b/crates/deltalake/Cargo.toml index d7fdb50184..476f0b5d60 100644 --- a/crates/deltalake/Cargo.toml +++ b/crates/deltalake/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "deltalake" -version = "0.22.2" +version = "0.22.3" authors.workspace = true keywords.workspace = true readme.workspace = true diff --git a/python/Cargo.toml b/python/Cargo.toml index fba55dcd31..bb6fbba621 100644 --- a/python/Cargo.toml +++ b/python/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "deltalake-python" -version = "0.22.2" +version = "0.22.3" authors = ["Qingping Hou ", "Will Jones "] homepage = "https://github.com/delta-io/delta-rs" license = "Apache-2.0" diff --git a/python/tests/test_checkpoint.py b/python/tests/test_checkpoint.py index 5961a57b09..309a1f3663 100644 --- a/python/tests/test_checkpoint.py +++ b/python/tests/test_checkpoint.py @@ -483,18 +483,19 @@ def test_checkpoint_with_multiple_writes(tmp_path: pathlib.Path): } ), ) - DeltaTable(tmp_path).create_checkpoint() + dt = DeltaTable(tmp_path) + dt.create_checkpoint() + assert dt.version() == 0 + df = pd.DataFrame( + { + "a": ["a"], + "b": [100], + } + ) + write_deltalake(tmp_path, df, mode="overwrite") dt = DeltaTable(tmp_path) + assert dt.version() == 1 + new_df = dt.to_pandas() print(dt.to_pandas()) - - write_deltalake( - tmp_path, - pd.DataFrame( - { - "a": ["a"], - "b": [100], - } - ), - mode="overwrite", - ) + assert len(new_df) == 1, "We overwrote! there should only be one row"