From e25aed70a02ce817d109e60880c878829c724b84 Mon Sep 17 00:00:00 2001 From: Ion Koutsouris <15728914+ion-elgreco@users.noreply.github.com> Date: Mon, 6 May 2024 18:39:13 +0200 Subject: [PATCH] fix(python, rust): use new schema for stats parsing instead of old (#2480) # Description In some edge cases where we schema evolve, it would parse the stats with the old schema result in these kind of errors: `Exception: Json error: whilst decoding field 'minValues': whilst decoding field 'foo': failed to parse 1000000000000 as Int8` ```python import polars as pl from deltalake import write_deltalake pl.DataFrame({ "foo": [1] }, schema={"foo": pl.Int8}).write_delta("TEST_TABLE_BUG") write_deltalake("TEST_TABLE_BUG", data = pl.DataFrame({ "foo": [1000000000000] }, schema={"foo": pl.Int64}).to_arrow(), mode='overwrite', overwrite_schema=True,engine='rust') ``` Instead of taking the old schema, I added an optional schema to be passed in the logMapper --- crates/core/src/kernel/snapshot/mod.rs | 20 ++++++++++++++------ crates/core/src/kernel/snapshot/replay.rs | 10 +++++++--- python/tests/test_writer.py | 19 +++++++++++++++++++ 3 files changed, 40 insertions(+), 9 deletions(-) diff --git a/crates/core/src/kernel/snapshot/mod.rs b/crates/core/src/kernel/snapshot/mod.rs index aa40599fad..a6fe9e9a62 100644 --- a/crates/core/src/kernel/snapshot/mod.rs +++ b/crates/core/src/kernel/snapshot/mod.rs @@ -287,11 +287,13 @@ impl Snapshot { } /// Get the statistics schema of the snapshot - pub fn stats_schema(&self) -> DeltaResult { + pub fn stats_schema(&self, table_schema: Option<&StructType>) -> DeltaResult { + let schema = table_schema.unwrap_or_else(|| self.schema()); + let stats_fields = if let Some(stats_cols) = self.table_config().stats_columns() { stats_cols .iter() - .map(|col| match self.schema().field_with_name(col) { + .map(|col| match schema.field_with_name(col) { Ok(field) => match field.data_type() { DataType::Map(_) | DataType::Array(_) | &DataType::BINARY => { Err(DeltaTableError::Generic(format!( @@ -314,7 +316,7 @@ impl Snapshot { .collect::, _>>()? } else { let num_indexed_cols = self.table_config().num_indexed_cols(); - self.schema() + schema .fields .iter() .enumerate() @@ -362,7 +364,7 @@ impl EagerSnapshot { let mut files = Vec::new(); let mut scanner = LogReplayScanner::new(); files.push(scanner.process_files_batch(&batch, true)?); - let mapper = LogMapper::try_new(&snapshot)?; + let mapper = LogMapper::try_new(&snapshot, None)?; files = files .into_iter() .map(|b| mapper.map_batch(b)) @@ -401,7 +403,7 @@ impl EagerSnapshot { ) .boxed() }; - let mapper = LogMapper::try_new(&self.snapshot)?; + let mapper = LogMapper::try_new(&self.snapshot, None)?; let files = ReplayStream::try_new(log_stream, checkpoint_stream, &self.snapshot)? .map(|batch| batch.and_then(|b| mapper.map_batch(b))) .try_collect() @@ -517,7 +519,13 @@ impl EagerSnapshot { files.push(scanner.process_files_batch(&batch?, true)?); } - let mapper = LogMapper::try_new(&self.snapshot)?; + let mapper = if let Some(metadata) = &metadata { + let new_schema: StructType = serde_json::from_str(&metadata.schema_string)?; + LogMapper::try_new(&self.snapshot, Some(&new_schema))? + } else { + LogMapper::try_new(&self.snapshot, None)? + }; + self.files = files .into_iter() .chain( diff --git a/crates/core/src/kernel/snapshot/replay.rs b/crates/core/src/kernel/snapshot/replay.rs index 61cdab4c09..43cb6ae999 100644 --- a/crates/core/src/kernel/snapshot/replay.rs +++ b/crates/core/src/kernel/snapshot/replay.rs @@ -21,6 +21,7 @@ use tracing::debug; use crate::kernel::arrow::extract::{self as ex, ProvidesColumnByName}; use crate::kernel::arrow::json; +use crate::kernel::StructType; use crate::{DeltaResult, DeltaTableConfig, DeltaTableError}; use super::Snapshot; @@ -41,7 +42,7 @@ pin_project! { impl ReplayStream { pub(super) fn try_new(commits: S, checkpoint: S, snapshot: &Snapshot) -> DeltaResult { - let stats_schema = Arc::new((&snapshot.stats_schema()?).try_into()?); + let stats_schema = Arc::new((&snapshot.stats_schema(None)?).try_into()?); let mapper = Arc::new(LogMapper { stats_schema, config: snapshot.config.clone(), @@ -61,9 +62,12 @@ pub(super) struct LogMapper { } impl LogMapper { - pub(super) fn try_new(snapshot: &Snapshot) -> DeltaResult { + pub(super) fn try_new( + snapshot: &Snapshot, + table_schema: Option<&StructType>, + ) -> DeltaResult { Ok(Self { - stats_schema: Arc::new((&snapshot.stats_schema()?).try_into()?), + stats_schema: Arc::new((&snapshot.stats_schema(table_schema)?).try_into()?), config: snapshot.config.clone(), }) } diff --git a/python/tests/test_writer.py b/python/tests/test_writer.py index a4fe9c75a4..41fb876c12 100644 --- a/python/tests/test_writer.py +++ b/python/tests/test_writer.py @@ -1626,3 +1626,22 @@ def test_write_timestamp_ntz_on_table_with_features_not_enabled(tmp_path: pathli write_deltalake( tmp_path, data, mode="overwrite", engine="pyarrow", schema_mode="overwrite" ) + + +@pytest.mark.parametrize("engine", ["pyarrow", "rust"]) +def test_parse_stats_with_new_schema(tmp_path, engine): + sample_data = pa.table( + { + "val": pa.array([1, 1], pa.int8()), + } + ) + write_deltalake(tmp_path, sample_data) + + sample_data = pa.table( + { + "val": pa.array([1000000000000, 1000000000000], pa.int64()), + } + ) + write_deltalake( + tmp_path, sample_data, mode="overwrite", schema_mode="overwrite", engine=engine + )