From 12a3fd645a17d61a92cc1b144b75f602964cba01 Mon Sep 17 00:00:00 2001 From: Will Jones Date: Tue, 20 Dec 2022 20:11:40 -0800 Subject: [PATCH 01/13] feat: expose function to get table of add actions --- python/deltalake/table.py | 3 +++ python/src/lib.rs | 9 +++++++++ rust/src/table_state.rs | 32 ++++++++++++++++++++++++++++++++ 3 files changed, 44 insertions(+) diff --git a/python/deltalake/table.py b/python/deltalake/table.py index 0abff85937..c4fbd00e00 100644 --- a/python/deltalake/table.py +++ b/python/deltalake/table.py @@ -443,3 +443,6 @@ def __stringify_partition_values( str_value = str(value) out.append((field, op, str_value)) return out + + def get_add_actions_df(self) -> pyarrow.RecordBatch: + return self._table.get_add_actions_df() diff --git a/python/src/lib.rs b/python/src/lib.rs index 5664436050..4a3f9a2871 100644 --- a/python/src/lib.rs +++ b/python/src/lib.rs @@ -453,6 +453,15 @@ impl RawDeltaTable { Ok(()) } + + pub fn get_add_actions_df(&self) -> PyResult> { + Ok(PyArrowType( + self._table + .get_state() + .add_actions_table() + .map_err(PyDeltaTableError::from_arrow)?, + )) + } } fn convert_partition_filters<'a>( diff --git a/rust/src/table_state.rs b/rust/src/table_state.rs index 373747fe49..555c3cf8ea 100644 --- a/rust/src/table_state.rs +++ b/rust/src/table_state.rs @@ -8,6 +8,8 @@ use crate::{ ApplyLogError, DeltaDataTypeLong, DeltaDataTypeVersion, DeltaTable, DeltaTableError, DeltaTableMetaData, }; +use arrow::array::ArrayRef; +use arrow::error::ArrowError; use chrono::Utc; use object_store::{path::Path, ObjectStore}; use serde::{Deserialize, Serialize}; @@ -16,6 +18,7 @@ use std::collections::HashMap; use std::collections::HashSet; use std::convert::TryFrom; use std::io::{BufRead, BufReader, Cursor}; +use std::sync::Arc; #[cfg(any(feature = "parquet", feature = "parquet2"))] use super::{CheckPoint, DeltaTableConfig}; @@ -378,6 +381,35 @@ impl DeltaTableState { }); Ok(actions) } + + /// Get the add actions as a RecordBatch + pub fn add_actions_table(&self) -> Result { + let mut paths = arrow::array::StringBuilder::with_capacity( + self.files.len(), + self.files.iter().map(|add| add.path.len()).sum(), + ); + let mut size = arrow::array::Int64Builder::with_capacity(self.files.len()); + // let mut partition_keys = arrow::array::StringArray::with_capacity() + // let partition_values = arrow::array::MapBuilder::with_capacity(None, , key_builder, value_builder, capacity) + let mut mod_time = + arrow::array::TimestampMillisecondBuilder::with_capacity(self.files.len()); + let mut data_change = arrow::array::BooleanBuilder::with_capacity(self.files.len()); + + for action in &self.files { + paths.append_value(&action.path); + size.append_value(action.size); + mod_time.append_value(action.modification_time); + data_change.append_value(action.data_change); + } + + let arrays: Vec<(&str, ArrayRef)> = vec![ + ("path", Arc::new(paths.finish())), + ("size_bytes", Arc::new(size.finish())), + ("modification_time", Arc::new(mod_time.finish())), + ("data_change", Arc::new(data_change.finish())), + ]; + arrow::record_batch::RecordBatch::try_from_iter(arrays) + } } #[cfg(test)] From 528f4791236081a8b2f2bff72974fcf2d61a3451 Mon Sep 17 00:00:00 2001 From: Will Jones Date: Tue, 20 Dec 2022 21:27:50 -0800 Subject: [PATCH 02/13] feat: add partition values to add actions table --- python/deltalake/table.py | 4 +- python/src/lib.rs | 2 +- rust/src/table_state.rs | 96 ++++++++++++++++++++++++++++++++++++--- 3 files changed, 93 insertions(+), 9 deletions(-) diff --git a/python/deltalake/table.py b/python/deltalake/table.py index c4fbd00e00..a4091e4667 100644 --- a/python/deltalake/table.py +++ b/python/deltalake/table.py @@ -444,5 +444,5 @@ def __stringify_partition_values( out.append((field, op, str_value)) return out - def get_add_actions_df(self) -> pyarrow.RecordBatch: - return self._table.get_add_actions_df() + def get_add_actions_df(self, flatten: bool = False) -> pyarrow.RecordBatch: + return self._table.get_add_actions_df(flatten) diff --git a/python/src/lib.rs b/python/src/lib.rs index 4a3f9a2871..7b27af10e0 100644 --- a/python/src/lib.rs +++ b/python/src/lib.rs @@ -458,7 +458,7 @@ impl RawDeltaTable { Ok(PyArrowType( self._table .get_state() - .add_actions_table() + .add_actions_table(flatten) .map_err(PyDeltaTableError::from_arrow)?, )) } diff --git a/rust/src/table_state.rs b/rust/src/table_state.rs index 555c3cf8ea..807dddbd4c 100644 --- a/rust/src/table_state.rs +++ b/rust/src/table_state.rs @@ -14,6 +14,7 @@ use chrono::Utc; use object_store::{path::Path, ObjectStore}; use serde::{Deserialize, Serialize}; use serde_json::{Map, Value}; +use std::borrow::Cow; use std::collections::HashMap; use std::collections::HashSet; use std::convert::TryFrom; @@ -383,7 +384,10 @@ impl DeltaTableState { } /// Get the add actions as a RecordBatch - pub fn add_actions_table(&self) -> Result { + pub fn add_actions_table( + &self, + flatten: bool, + ) -> Result { let mut paths = arrow::array::StringBuilder::with_capacity( self.files.len(), self.files.iter().map(|add| add.path.len()).sum(), @@ -394,20 +398,100 @@ impl DeltaTableState { let mut mod_time = arrow::array::TimestampMillisecondBuilder::with_capacity(self.files.len()); let mut data_change = arrow::array::BooleanBuilder::with_capacity(self.files.len()); + let mut stats = arrow::array::StringBuilder::with_capacity( + self.files.len(), + self.files + .iter() + .map(|add| add.stats.as_ref().map(|s| s.len()).unwrap_or(0)) + .sum(), + ); for action in &self.files { paths.append_value(&action.path); size.append_value(action.size); mod_time.append_value(action.modification_time); data_change.append_value(action.data_change); + if let Some(action_stats) = &action.stats { + stats.append_value(action_stats); + } else { + stats.append_null(); + } } - let arrays: Vec<(&str, ArrayRef)> = vec![ - ("path", Arc::new(paths.finish())), - ("size_bytes", Arc::new(size.finish())), - ("modification_time", Arc::new(mod_time.finish())), - ("data_change", Arc::new(data_change.finish())), + let mut arrays: Vec<(Cow, ArrayRef)> = vec![ + (Cow::Borrowed("path"), Arc::new(paths.finish())), + (Cow::Borrowed("size_bytes"), Arc::new(size.finish())), + ( + Cow::Borrowed("modification_time"), + Arc::new(mod_time.finish()), + ), + (Cow::Borrowed("data_change"), Arc::new(data_change.finish())), + (Cow::Borrowed("stats"), Arc::new(stats.finish())), ]; + + let partition_columns: Vec<(Cow, ArrayRef)> = if flatten { + // Figure out partition columns + let names = self + .files + .iter() + .flat_map(|add_action| add_action.partition_values.keys()) + .map(|col| col.as_ref()) + .collect::>(); + + // Create builder for each + let mut builders = names + .into_iter() + .map(|name| { + let builder = arrow::array::StringBuilder::new(); + (name, builder) + }) + .collect::>(); + + // Append values + for action in &self.files { + for (name, maybe_value) in action.partition_values.iter() { + if let Some(value) = maybe_value { + builders.get_mut(name.as_str()).unwrap().append_value(value); + } else { + builders.get_mut(name.as_str()).unwrap().append_null(); + } + } + } + + builders + .into_iter() + .map(|(name, mut builder)| { + let name: Cow = Cow::Owned(format!("partition_{}", name)); + let array: ArrayRef = Arc::new(builder.finish()); + (name, array) + }) + .collect() + } else { + // Create a mapbuilder + let key_builder = arrow::array::StringBuilder::new(); + let value_builder = arrow::array::StringBuilder::new(); + let mut map_builder = arrow::array::MapBuilder::new(None, key_builder, value_builder); + + // Append values + for action in &self.files { + for (name, maybe_value) in action.partition_values.iter() { + map_builder.keys().append_value(name); + if let Some(value) = maybe_value { + map_builder.values().append_value(value); + } else { + map_builder.values().append_null(); + } + } + map_builder.append(true)?; + } + vec![( + Cow::Borrowed("partition_values"), + Arc::new(map_builder.finish()), + )] + }; + + arrays.extend(partition_columns.into_iter()); + arrow::record_batch::RecordBatch::try_from_iter(arrays) } } From eff515573b773c44a94022f3ffe5fd91a4fece04 Mon Sep 17 00:00:00 2001 From: Will Jones Date: Wed, 21 Dec 2022 15:32:20 -0800 Subject: [PATCH 03/13] feat: support partition columns flat or as struct --- python/deltalake/table.py | 54 +++++++++++++++ python/src/lib.rs | 2 +- python/tests/test_table_read.py | 39 +++++++++++ rust/src/table_state.rs | 117 +++++++++++++++++--------------- 4 files changed, 158 insertions(+), 54 deletions(-) diff --git a/python/deltalake/table.py b/python/deltalake/table.py index a4091e4667..1791271e96 100644 --- a/python/deltalake/table.py +++ b/python/deltalake/table.py @@ -445,4 +445,58 @@ def __stringify_partition_values( return out def get_add_actions_df(self, flatten: bool = False) -> pyarrow.RecordBatch: + """ + Return a dataframe with all current add actions. + + Add actions represent the files that currently make up the table. This + data is a low-level representation parsed from the transaction log. + + :param flatten: whether to use a flattened schema in output (default False). + When False, all partition values are in a single map column, + `partition_values`. When True, each partition column becomes it's own + field, with the prefix `partition_` in the name. + + :returns: a PyArrow Table containing the add action data. + + Examples: + + >>> from deltalake import DeltaTable, write_deltalake + >>> import pyarrow as pa + >>> data = pa.table({"x": [1, 2, 3], "y": [4, 5, 6]}) + >>> write_deltalake("tmp", data, partition_by=["x"]) + >>> dt = DeltaTable("tmp") + >>> dt.get_add_actions_df() + pyarrow.Table + path: string + size_bytes: int64 + modification_time: timestamp[ms] + data_change: bool + stats: string + partition_values: map + child 0, entries: struct not null + child 0, key: string not null + child 1, value: string + ---- + path: [["x=2/0-6e6952dd-ed03-4488-810e-ebe76a1a1200-0.parquet","x=3/0-6e6952dd-ed03-4488-810e-ebe76a1a1200-0.parquet","x=1/0-6e6952dd-ed03-4488-810e-ebe76a1a1200-0.parquet"]] + size_bytes: [[565,565,565]] + modification_time: [[1970-01-20 08:20:49.399,1970-01-20 08:20:49.399,1970-01-20 08:20:49.399]] + data_change: [[true,true,true]] + stats: [["{"numRecords": 1, "minValues": {"y": 5}, "maxValues": {"y": 5}, "nullCount": {"y": 0}}","{"numRecords": 1, "minValues": {"y": 6}, "maxValues": {"y": 6}, "nullCount": {"y": 0}}","{"numRecords": 1, "minValues": {"y": 4}, "maxValues": {"y": 4}, "nullCount": {"y": 0}}"]] + partition_values: [[keys:["x"]values:["2"],keys:["x"]values:["3"],keys:["x"]values:["1"]]] + >>> dt.get_add_actions_df(flatten=True) + pyarrow.Table + path: string + size_bytes: int64 + modification_time: timestamp[ms] + data_change: bool + stats: string + partition_x: string + ---- + path: [["x=2/0-6e6952dd-ed03-4488-810e-ebe76a1a1200-0.parquet","x=3/0-6e6952dd-ed03-4488-810e-ebe76a1a1200-0.parquet","x=1/0-6e6952dd-ed03-4488-810e-ebe76a1a1200-0.parquet"]] + size_bytes: [[565,565,565]] + modification_time: [[1970-01-20 08:20:49.399,1970-01-20 08:20:49.399,1970-01-20 08:20:49.399]] + data_change: [[true,true,true]] + stats: [["{"numRecords": 1, "minValues": {"y": 5}, "maxValues": {"y": 5}, "nullCount": {"y": 0}}","{"numRecords": 1, "minValues": {"y": 6}, "maxValues": {"y": 6}, "nullCount": {"y": 0}}","{"numRecords": 1, "minValues": {"y": 4}, "maxValues": {"y": 4}, "nullCount": {"y": 0}}"]] + partition_x: [["2","3","1"]] + """ return self._table.get_add_actions_df(flatten) diff --git a/python/src/lib.rs b/python/src/lib.rs index 7b27af10e0..6e41a5157d 100644 --- a/python/src/lib.rs +++ b/python/src/lib.rs @@ -459,7 +459,7 @@ impl RawDeltaTable { self._table .get_state() .add_actions_table(flatten) - .map_err(PyDeltaTableError::from_arrow)?, + .map_err(PyDeltaTableError::from_raw)?, )) } } diff --git a/python/tests/test_table_read.py b/python/tests/test_table_read.py index bbeca64a7d..3266c29849 100644 --- a/python/tests/test_table_read.py +++ b/python/tests/test_table_read.py @@ -269,6 +269,45 @@ def test_history_partitioned_table_metadata(): } +@pytest.mark.parametrize("flatten", [True, False]) +def test_add_actions_table(flatten: bool): + table_path = "../rust/tests/data/delta-0.8.0-partitioned" + dt = DeltaTable(table_path) + actions_df = dt.get_add_actions_df(flatten) + # RecordBatch doesn't have a sort_by method yet + actions_df = pa.Table.from_batches([actions_df]).sort_by("path").to_batches()[0] + + assert actions_df.num_rows == 6 + assert actions_df["path"] == pa.array( + [ + "year=2020/month=1/day=1/part-00000-8eafa330-3be9-4a39-ad78-fd13c2027c7e.c000.snappy.parquet", + "year=2020/month=2/day=3/part-00000-94d16827-f2fd-42cd-a060-f67ccc63ced9.c000.snappy.parquet", + "year=2020/month=2/day=5/part-00000-89cdd4c8-2af7-4add-8ea3-3990b2f027b5.c000.snappy.parquet", + "year=2021/month=12/day=20/part-00000-9275fdf4-3961-4184-baa0-1c8a2bb98104.c000.snappy.parquet", + "year=2021/month=12/day=4/part-00000-6dc763c0-3e8b-4d52-b19e-1f92af3fbb25.c000.snappy.parquet", + "year=2021/month=4/day=5/part-00000-c5856301-3439-4032-a6fc-22b7bc92bebb.c000.snappy.parquet", + ] + ) + assert actions_df["size_bytes"] == pa.array([414, 414, 414, 407, 414, 414]) + assert actions_df["data_change"] == pa.array([True] * 6) + assert actions_df["modification_time"] == pa.array( + [1615555646000] * 6, type=pa.timestamp("ms") + ) + + if flatten: + partition_year = actions_df["partition_year"] + partition_month = actions_df["partition_month"] + partition_day = actions_df["partition_day"] + else: + partition_year = actions_df["partition_values"].field("year") + partition_month = actions_df["partition_values"].field("month") + partition_day = actions_df["partition_values"].field("day") + + assert partition_year == pa.array(["2020"] * 3 + ["2021"] * 3) + assert partition_month == pa.array(["1", "2", "2", "12", "12", "4"]) + assert partition_day == pa.array(["1", "3", "5", "20", "4", "5"]) + + def assert_correct_files(dt: DeltaTable, partition_filters, expected_paths): assert dt.files(partition_filters) == expected_paths absolute_paths = [os.path.join(dt.table_uri, path) for path in expected_paths] diff --git a/rust/src/table_state.rs b/rust/src/table_state.rs index 807dddbd4c..fbc7bceec6 100644 --- a/rust/src/table_state.rs +++ b/rust/src/table_state.rs @@ -9,7 +9,7 @@ use crate::{ DeltaTableMetaData, }; use arrow::array::ArrayRef; -use arrow::error::ArrowError; +use arrow::compute::cast; use chrono::Utc; use object_store::{path::Path, ObjectStore}; use serde::{Deserialize, Serialize}; @@ -387,7 +387,8 @@ impl DeltaTableState { pub fn add_actions_table( &self, flatten: bool, - ) -> Result { + ) -> Result { + // TODO: test this thoroughly in Rust let mut paths = arrow::array::StringBuilder::with_capacity( self.files.len(), self.files.iter().map(|add| add.path.len()).sum(), @@ -428,71 +429,81 @@ impl DeltaTableState { (Cow::Borrowed("data_change"), Arc::new(data_change.finish())), (Cow::Borrowed("stats"), Arc::new(stats.finish())), ]; + let metadata = self + .current_metadata + .as_ref() + .ok_or(DeltaTableError::NoMetadata)?; + let partition_column_types: Vec = metadata + .partition_columns + .iter() + .map( + |name| -> Result { + let field = metadata.schema.get_field_with_name(name)?; + Ok(field.get_type().try_into()?) + }, + ) + .collect::>()?; + + // Create builder for each + let mut builders = metadata + .partition_columns + .iter() + .map(|name| { + let builder = arrow::array::StringBuilder::new(); + (name.as_str(), builder) + }) + .collect::>(); - let partition_columns: Vec<(Cow, ArrayRef)> = if flatten { - // Figure out partition columns - let names = self - .files - .iter() - .flat_map(|add_action| add_action.partition_values.keys()) - .map(|col| col.as_ref()) - .collect::>(); - - // Create builder for each - let mut builders = names - .into_iter() - .map(|name| { - let builder = arrow::array::StringBuilder::new(); - (name, builder) - }) - .collect::>(); - - // Append values - for action in &self.files { - for (name, maybe_value) in action.partition_values.iter() { - if let Some(value) = maybe_value { - builders.get_mut(name.as_str()).unwrap().append_value(value); - } else { - builders.get_mut(name.as_str()).unwrap().append_null(); - } + // Append values + for action in &self.files { + for (name, maybe_value) in action.partition_values.iter() { + if let Some(value) = maybe_value { + builders.get_mut(name.as_str()).unwrap().append_value(value); + } else { + builders.get_mut(name.as_str()).unwrap().append_null(); } } + } - builders + // Cast them to their appropriate types + let partition_columns: Vec = metadata + .partition_columns + .iter() + // Get the builders in their original order + .map(|name| builders.remove(name.as_str()).unwrap()) + .zip(partition_column_types.iter()) + .map(|(mut builder, datatype)| { + let string_arr: ArrayRef = Arc::new(builder.finish()); + Ok(cast(&string_arr, datatype)?) + }) + .collect::>()?; + + // if flatten, append columns, otherwise combine into a struct column + let partition_columns: Vec<(Cow, ArrayRef)> = if flatten { + partition_columns .into_iter() - .map(|(name, mut builder)| { + .zip(metadata.partition_columns.iter()) + .map(|(array, name)| { let name: Cow = Cow::Owned(format!("partition_{}", name)); - let array: ArrayRef = Arc::new(builder.finish()); (name, array) }) .collect() } else { - // Create a mapbuilder - let key_builder = arrow::array::StringBuilder::new(); - let value_builder = arrow::array::StringBuilder::new(); - let mut map_builder = arrow::array::MapBuilder::new(None, key_builder, value_builder); - - // Append values - for action in &self.files { - for (name, maybe_value) in action.partition_values.iter() { - map_builder.keys().append_value(name); - if let Some(value) = maybe_value { - map_builder.values().append_value(value); - } else { - map_builder.values().append_null(); - } - } - map_builder.append(true)?; - } - vec![( - Cow::Borrowed("partition_values"), - Arc::new(map_builder.finish()), - )] + let fields = partition_column_types + .into_iter() + .zip(metadata.partition_columns.iter()) + .map(|(datatype, name)| arrow::datatypes::Field::new(name, datatype, true)); + let arr = arrow::array::StructArray::from( + fields + .zip(partition_columns.into_iter()) + .collect::>(), + ); + vec![(Cow::Borrowed("partition_values"), Arc::new(arr))] }; arrays.extend(partition_columns.into_iter()); - arrow::record_batch::RecordBatch::try_from_iter(arrays) + Ok(arrow::record_batch::RecordBatch::try_from_iter(arrays)?) } } From 966a29b4ecb3bb4470100a9e5443c7df3ac268fd Mon Sep 17 00:00:00 2001 From: Will Jones Date: Sun, 25 Dec 2022 16:54:32 -0800 Subject: [PATCH 04/13] test: add tests in Rust --- rust/src/table_state.rs | 15 ++-- rust/tests/add_actions_test.rs | 132 +++++++++++++++++++++++++++++++++ 2 files changed, 141 insertions(+), 6 deletions(-) create mode 100644 rust/tests/add_actions_test.rs diff --git a/rust/src/table_state.rs b/rust/src/table_state.rs index fbc7bceec6..7667671b7d 100644 --- a/rust/src/table_state.rs +++ b/rust/src/table_state.rs @@ -493,12 +493,15 @@ impl DeltaTableState { .into_iter() .zip(metadata.partition_columns.iter()) .map(|(datatype, name)| arrow::datatypes::Field::new(name, datatype, true)); - let arr = arrow::array::StructArray::from( - fields - .zip(partition_columns.into_iter()) - .collect::>(), - ); - vec![(Cow::Borrowed("partition_values"), Arc::new(arr))] + let field_arrays = fields + .zip(partition_columns.into_iter()) + .collect::>(); + if field_arrays.is_empty() { + vec![] + } else { + let arr = Arc::new(arrow::array::StructArray::from(field_arrays)); + vec![(Cow::Borrowed("partition_values"), arr)] + } }; arrays.extend(partition_columns.into_iter()); diff --git a/rust/tests/add_actions_test.rs b/rust/tests/add_actions_test.rs new file mode 100644 index 0000000000..f131cd0ed8 --- /dev/null +++ b/rust/tests/add_actions_test.rs @@ -0,0 +1,132 @@ +#![cfg(feature = "arrow")] + +use arrow::array::{self, ArrayRef}; +use arrow::compute::sort_to_indices; +use arrow::datatypes::{DataType, Field}; +use arrow::record_batch::RecordBatch; +use std::sync::Arc; + +fn sort_batch_by(batch: &RecordBatch, column: &str) -> arrow::error::Result { + let sort_column = batch.column(batch.schema().column_with_name(column).unwrap().0); + let sort_indices = sort_to_indices(sort_column, None, None)?; + let schema = batch.schema(); + let sorted_columns: Vec<(&String, ArrayRef)> = schema + .fields() + .iter() + .zip(batch.columns().iter()) + .map(|(field, column)| { + Ok(( + field.name(), + arrow::compute::take(column, &sort_indices, None)?, + )) + }) + .collect::>()?; + RecordBatch::try_from_iter(sorted_columns) +} + +#[tokio::test] +async fn test_add_action_table() { + // test table with partitions + let path = "./tests/data/delta-0.8.0-null-partition"; + let table = deltalake::open_table(path).await.unwrap(); + let actions = table.get_state().add_actions_table(true).unwrap(); + let actions = sort_batch_by(&actions, "path").unwrap(); + + let mut expected_columns: Vec<(&str, ArrayRef)> = vec![ + ("path", Arc::new(array::StringArray::from(vec![ + "k=A/part-00000-b1f1dbbb-70bc-4970-893f-9bb772bf246e.c000.snappy.parquet", + "k=__HIVE_DEFAULT_PARTITION__/part-00001-8474ac85-360b-4f58-b3ea-23990c71b932.c000.snappy.parquet" + ]))), + ("size_bytes", Arc::new(array::Int64Array::from(vec![460, 460]))), + ("modification_time", Arc::new(arrow::array::TimestampMillisecondArray::from(vec![ + 1627990384000, 1627990384000 + ]))), + ("data_change", Arc::new(array::BooleanArray::from(vec![true, true]))), + ("stats", Arc::new(array::StringArray::from(vec![None, None]))), + ("partition_k", Arc::new(array::StringArray::from(vec![Some("A"), None]))), + ]; + let expected = RecordBatch::try_from_iter(expected_columns.clone()).unwrap(); + + assert_eq!(expected, actions); + + let actions = table.get_state().add_actions_table(false).unwrap(); + let actions = sort_batch_by(&actions, "path").unwrap(); + + expected_columns[5] = ( + "partition_values", + Arc::new(array::StructArray::from(vec![( + Field::new("k", DataType::Utf8, true), + Arc::new(array::StringArray::from(vec![Some("A"), None])) as ArrayRef, + )])), + ); + let expected = RecordBatch::try_from_iter(expected_columns).unwrap(); + + assert_eq!(expected, actions); + + // test table without partitions + let path = "./tests/data/simple_table"; + let table = deltalake::open_table(path).await.unwrap(); + + let actions = table.get_state().add_actions_table(true).unwrap(); + let actions = sort_batch_by(&actions, "path").unwrap(); + + let expected_columns: Vec<(&str, ArrayRef)> = vec![ + ( + "path", + Arc::new(array::StringArray::from(vec![ + "part-00000-2befed33-c358-4768-a43c-3eda0d2a499d-c000.snappy.parquet", + "part-00000-c1777d7d-89d9-4790-b38a-6ee7e24456b1-c000.snappy.parquet", + "part-00001-7891c33d-cedc-47c3-88a6-abcfb049d3b4-c000.snappy.parquet", + "part-00004-315835fe-fb44-4562-98f6-5e6cfa3ae45d-c000.snappy.parquet", + "part-00007-3a0e4727-de0d-41b6-81ef-5223cf40f025-c000.snappy.parquet", + ])), + ), + ( + "size_bytes", + Arc::new(array::Int64Array::from(vec![262, 262, 429, 429, 429])), + ), + ( + "modification_time", + Arc::new(arrow::array::TimestampMillisecondArray::from(vec![ + 1587968626000, + 1587968602000, + 1587968602000, + 1587968602000, + 1587968602000, + ])), + ), + ( + "data_change", + Arc::new(array::BooleanArray::from(vec![ + true, true, true, true, true, + ])), + ), + ( + "stats", + Arc::new(array::StringArray::from(vec![None, None, None, None, None])), + ), + ]; + let expected = RecordBatch::try_from_iter(expected_columns.clone()).unwrap(); + + assert_eq!(expected, actions); + + let actions = table.get_state().add_actions_table(false).unwrap(); + let actions = sort_batch_by(&actions, "path").unwrap(); + + // For now, this column is ignored. + // expected_columns.push(( + // "partition_values", + // new_null_array(&DataType::Struct(vec![]), 5), + // )); + let expected = RecordBatch::try_from_iter(expected_columns.clone()).unwrap(); + + assert_eq!(expected, actions); + + // test table with stats + // let path = "./tests/data/delta-0.8.0"; + // let mut table = deltalake::open_table(path).await.unwrap(); + + // test table with no json stats + // let path = "./tests/data/delta-1.2.1-only-struct-stats"; + // let mut table = deltalake::open_table(path).await.unwrap(); +} From 1c43139fa75c977d813a507d291964122adb72c9 Mon Sep 17 00:00:00 2001 From: Will Jones Date: Wed, 28 Dec 2022 20:15:46 -0800 Subject: [PATCH 05/13] fix: handle checkpoint stats --- rust/src/table_state.rs | 8 +-- rust/tests/add_actions_test.rs | 124 +++++++++++++++++++++++++++++++-- 2 files changed, 122 insertions(+), 10 deletions(-) diff --git a/rust/src/table_state.rs b/rust/src/table_state.rs index 7667671b7d..5efdce9d2e 100644 --- a/rust/src/table_state.rs +++ b/rust/src/table_state.rs @@ -388,14 +388,11 @@ impl DeltaTableState { &self, flatten: bool, ) -> Result { - // TODO: test this thoroughly in Rust let mut paths = arrow::array::StringBuilder::with_capacity( self.files.len(), self.files.iter().map(|add| add.path.len()).sum(), ); let mut size = arrow::array::Int64Builder::with_capacity(self.files.len()); - // let mut partition_keys = arrow::array::StringArray::with_capacity() - // let partition_values = arrow::array::MapBuilder::with_capacity(None, , key_builder, value_builder, capacity) let mut mod_time = arrow::array::TimestampMillisecondBuilder::with_capacity(self.files.len()); let mut data_change = arrow::array::BooleanBuilder::with_capacity(self.files.len()); @@ -412,8 +409,9 @@ impl DeltaTableState { size.append_value(action.size); mod_time.append_value(action.modification_time); data_change.append_value(action.data_change); - if let Some(action_stats) = &action.stats { - stats.append_value(action_stats); + + if let Some(action_stats) = &action.get_stats()? { + stats.append_value(serde_json::to_string(action_stats)?); } else { stats.append_null(); } diff --git a/rust/tests/add_actions_test.rs b/rust/tests/add_actions_test.rs index f131cd0ed8..377d796caf 100644 --- a/rust/tests/add_actions_test.rs +++ b/rust/tests/add_actions_test.rs @@ -1,6 +1,6 @@ #![cfg(feature = "arrow")] -use arrow::array::{self, ArrayRef}; +use arrow::array::{self, ArrayRef, StringArray}; use arrow::compute::sort_to_indices; use arrow::datatypes::{DataType, Field}; use arrow::record_batch::RecordBatch; @@ -123,10 +123,124 @@ async fn test_add_action_table() { assert_eq!(expected, actions); // test table with stats - // let path = "./tests/data/delta-0.8.0"; - // let mut table = deltalake::open_table(path).await.unwrap(); + let path = "./tests/data/delta-0.8.0"; + let table = deltalake::open_table(path).await.unwrap(); + let actions = table.get_state().add_actions_table(true).unwrap(); + let actions = sort_batch_by(&actions, "path").unwrap(); + + let expected_columns: Vec<(&str, ArrayRef)> = vec![ + ("path", Arc::new(array::StringArray::from(vec![ + "part-00000-04ec9591-0b73-459e-8d18-ba5711d6cbe1-c000.snappy.parquet", + "part-00000-c9b90f86-73e6-46c8-93ba-ff6bfaf892a1-c000.snappy.parquet", + ]))), + ("size_bytes", Arc::new(array::Int64Array::from(vec![440, 440]))), + ("modification_time", Arc::new(arrow::array::TimestampMillisecondArray::from(vec![ + 1615043776000, 1615043767000 + ]))), + ("data_change", Arc::new(array::BooleanArray::from(vec![true, true]))), + ("stats", Arc::new(array::StringArray::from(vec![ + "{\"numRecords\":2,\"minValues\":{\"value\":2},\"maxValues\":{\"value\":4},\"nullCount\":{\"value\":0}}", + "{\"numRecords\":2,\"minValues\":{\"value\":0},\"maxValues\":{\"value\":2},\"nullCount\":{\"value\":0}}", + ]))), + ]; + let expected = RecordBatch::try_from_iter(expected_columns.clone()).unwrap(); + + assert_eq!(expected, actions); // test table with no json stats - // let path = "./tests/data/delta-1.2.1-only-struct-stats"; - // let mut table = deltalake::open_table(path).await.unwrap(); + let path = "./tests/data/delta-1.2.1-only-struct-stats"; + let table = deltalake::open_table(path).await.unwrap(); + + let actions = table.get_state().add_actions_table(true).unwrap(); + let actions = sort_batch_by(&actions, "path").unwrap(); + + let stats_col_index = actions.schema().column_with_name("stats").unwrap().0; + let result_stats: &StringArray = actions + .column(stats_col_index) + .as_any() + .downcast_ref() + .unwrap(); + let actions = actions + .project( + &(0..actions.num_columns()) + .into_iter() + .filter(|i| i != &stats_col_index) + .collect::>(), + ) + .unwrap(); + + let expected_columns: Vec<(&str, ArrayRef)> = vec![ + ( + "path", + Arc::new(array::StringArray::from(vec![ + "part-00000-1c2d1a32-02dc-484f-87ff-4328ea56045d-c000.snappy.parquet", + "part-00000-28925d3a-bdf2-411e-bca9-b067444cbcb0-c000.snappy.parquet", + "part-00000-6630b7c4-0aca-405b-be86-68a812f2e4c8-c000.snappy.parquet", + "part-00000-74151571-7ec6-4bd6-9293-b5daab2ce667-c000.snappy.parquet", + "part-00000-7a509247-4f58-4453-9202-51d75dee59af-c000.snappy.parquet", + "part-00000-8e0aefe1-6645-4601-ac29-68cba64023b5-c000.snappy.parquet", + "part-00000-b26ba634-874c-45b0-a7ff-2f0395a53966-c000.snappy.parquet", + "part-00000-c4c8caec-299d-42a4-b50c-5a4bf724c037-c000.snappy.parquet", + "part-00000-ce300400-58ff-4b8f-8ba9-49422fdf9f2e-c000.snappy.parquet", + "part-00000-e1262b3e-2959-4910-aea9-4eaf92f0c68c-c000.snappy.parquet", + "part-00000-e8e3753f-e2f6-4c9f-98f9-8f3d346727ba-c000.snappy.parquet", + "part-00000-f73ff835-0571-4d67-ac43-4fbf948bfb9b-c000.snappy.parquet", + ])), + ), + ( + "size_bytes", + Arc::new(array::Int64Array::from(vec![ + 5488, 5489, 5489, 5489, 5489, 5489, 5489, 5489, 5489, 5489, 5489, 5731, + ])), + ), + ( + "modification_time", + Arc::new(arrow::array::TimestampMillisecondArray::from(vec![ + 1666652376000, + 1666652374000, + 1666652378000, + 1666652377000, + 1666652373000, + 1666652385000, + 1666652375000, + 1666652379000, + 1666652382000, + 1666652386000, + 1666652380000, + 1666652383000, + ])), + ), + ( + "data_change", + Arc::new(array::BooleanArray::from(vec![ + false, false, false, false, false, true, false, false, false, true, false, false, + ])), + ), + ]; + let expected = RecordBatch::try_from_iter(expected_columns.clone()).unwrap(); + + assert_eq!(expected, actions); + + // For stats in checkpoints, the serialization order isn't deterministic, so we can't compare with strings + let expected_stats = vec![ + "{\"numRecords\":1,\"minValues\":{\"struct\":{\"struct_element\":\"struct_value\"},\"double\":1.234,\"decimal\":-5.678,\"nested_struct\":{\"struct_element\":{\"nested_struct_element\":\"nested_struct_value\"}},\"timestamp\":\"2022-10-24T22:59:36.177Z\",\"string\":\"string\",\"integer\":3,\"date\":\"2022-10-24\"},\"maxValues\":{\"timestamp\":\"2022-10-24T22:59:36.177Z\",\"integer\":3,\"struct\":{\"struct_element\":\"struct_value\"},\"double\":1.234,\"nested_struct\":{\"struct_element\":{\"nested_struct_element\":\"nested_struct_value\"}},\"string\":\"string\",\"date\":\"2022-10-24\",\"decimal\":-5.678},\"nullCount\":{\"struct\":{\"struct_element\":0},\"double\":0,\"array\":0,\"integer\":0,\"date\":0,\"map\":0,\"struct_of_array_of_map\":{\"struct_element\":0},\"boolean\":0,\"null\":1,\"decimal\":0,\"binary\":0,\"timestamp\":0,\"nested_struct\":{\"struct_element\":{\"nested_struct_element\":0}},\"string\":0}}", + "{\"numRecords\":1,\"minValues\":{\"integer\":1,\"timestamp\":\"2022-10-24T22:59:34.067Z\",\"nested_struct\":{\"struct_element\":{\"nested_struct_element\":\"nested_struct_value\"}},\"struct\":{\"struct_element\":\"struct_value\"},\"string\":\"string\",\"double\":1.234,\"date\":\"2022-10-24\",\"decimal\":-5.678},\"maxValues\":{\"decimal\":-5.678,\"timestamp\":\"2022-10-24T22:59:34.067Z\",\"nested_struct\":{\"struct_element\":{\"nested_struct_element\":\"nested_struct_value\"}},\"integer\":1,\"double\":1.234,\"string\":\"string\",\"struct\":{\"struct_element\":\"struct_value\"},\"date\":\"2022-10-24\"},\"nullCount\":{\"boolean\":0,\"string\":0,\"struct\":{\"struct_element\":0},\"map\":0,\"double\":0,\"array\":0,\"null\":1,\"nested_struct\":{\"struct_element\":{\"nested_struct_element\":0}},\"binary\":0,\"decimal\":0,\"date\":0,\"timestamp\":0,\"struct_of_array_of_map\":{\"struct_element\":0},\"integer\":0}}", + "{\"numRecords\":1,\"minValues\":{\"integer\":5,\"decimal\":-5.678,\"nested_struct\":{\"struct_element\":{\"nested_struct_element\":\"nested_struct_value\"}},\"string\":\"string\",\"timestamp\":\"2022-10-24T22:59:38.358Z\",\"double\":1.234,\"date\":\"2022-10-24\",\"struct\":{\"struct_element\":\"struct_value\"}},\"maxValues\":{\"timestamp\":\"2022-10-24T22:59:38.358Z\",\"integer\":5,\"decimal\":-5.678,\"date\":\"2022-10-24\",\"string\":\"string\",\"nested_struct\":{\"struct_element\":{\"nested_struct_element\":\"nested_struct_value\"}},\"double\":1.234,\"struct\":{\"struct_element\":\"struct_value\"}},\"nullCount\":{\"null\":1,\"struct_of_array_of_map\":{\"struct_element\":0},\"binary\":0,\"string\":0,\"double\":0,\"integer\":0,\"date\":0,\"timestamp\":0,\"map\":0,\"array\":0,\"nested_struct\":{\"struct_element\":{\"nested_struct_element\":0}},\"decimal\":0,\"struct\":{\"struct_element\":0},\"boolean\":0}}", + "{\"numRecords\":1,\"minValues\":{\"string\":\"string\",\"double\":1.234,\"integer\":4,\"nested_struct\":{\"struct_element\":{\"nested_struct_element\":\"nested_struct_value\"}},\"timestamp\":\"2022-10-24T22:59:37.235Z\",\"date\":\"2022-10-24\",\"decimal\":-5.678,\"struct\":{\"struct_element\":\"struct_value\"}},\"maxValues\":{\"timestamp\":\"2022-10-24T22:59:37.235Z\",\"nested_struct\":{\"struct_element\":{\"nested_struct_element\":\"nested_struct_value\"}},\"date\":\"2022-10-24\",\"string\":\"string\",\"struct\":{\"struct_element\":\"struct_value\"},\"double\":1.234,\"integer\":4,\"decimal\":-5.678},\"nullCount\":{\"nested_struct\":{\"struct_element\":{\"nested_struct_element\":0}},\"struct_of_array_of_map\":{\"struct_element\":0},\"binary\":0,\"map\":0,\"string\":0,\"double\":0,\"date\":0,\"null\":1,\"integer\":0,\"boolean\":0,\"decimal\":0,\"timestamp\":0,\"struct\":{\"struct_element\":0},\"array\":0}}", + "{\"numRecords\":1,\"minValues\":{\"struct\":{\"struct_element\":\"struct_value\"},\"decimal\":-5.678,\"timestamp\":\"2022-10-24T22:59:32.846Z\",\"string\":\"string\",\"integer\":0,\"double\":1.234,\"date\":\"2022-10-24\",\"nested_struct\":{\"struct_element\":{\"nested_struct_element\":\"nested_struct_value\"}}},\"maxValues\":{\"string\":\"string\",\"struct\":{\"struct_element\":\"struct_value\"},\"nested_struct\":{\"struct_element\":{\"nested_struct_element\":\"nested_struct_value\"}},\"date\":\"2022-10-24\",\"decimal\":-5.678,\"integer\":0,\"double\":1.234,\"timestamp\":\"2022-10-24T22:59:32.846Z\"},\"nullCount\":{\"timestamp\":0,\"struct\":{\"struct_element\":0},\"double\":0,\"binary\":0,\"string\":0,\"boolean\":0,\"decimal\":0,\"null\":1,\"integer\":0,\"array\":0,\"nested_struct\":{\"struct_element\":{\"nested_struct_element\":0}},\"struct_of_array_of_map\":{\"struct_element\":0},\"date\":0,\"map\":0}}", + "{\"numRecords\":1,\"minValues\":{\"struct\":{\"struct_element\":\"struct_value\"},\"nested_struct\":{\"struct_element\":{\"nested_struct_element\":\"nested_struct_value\"}},\"date\":\"2022-10-24\",\"timestamp\":\"2022-10-24T22:59:44.639Z\",\"double\":1.234,\"integer\":10,\"string\":\"string\",\"decimal\":-5.678},\"maxValues\":{\"struct\":{\"struct_element\":\"struct_value\"},\"date\":\"2022-10-24\",\"integer\":10,\"decimal\":-5.678,\"string\":\"string\",\"double\":1.234,\"timestamp\":\"2022-10-24T22:59:44.639Z\",\"nested_struct\":{\"struct_element\":{\"nested_struct_element\":\"nested_struct_value\"}}},\"nullCount\":{\"binary\":0,\"nested_struct\":{\"struct_element\":{\"nested_struct_element\":0}},\"timestamp\":0,\"boolean\":0,\"string\":0,\"struct_of_array_of_map\":{\"struct_element\":0},\"map\":0,\"null\":1,\"decimal\":0,\"array\":0,\"struct\":{\"struct_element\":0},\"double\":0,\"integer\":0,\"date\":0}}", + "{\"numRecords\":1,\"minValues\":{\"date\":\"2022-10-24\",\"timestamp\":\"2022-10-24T22:59:35.117Z\",\"integer\":2,\"struct\":{\"struct_element\":\"struct_value\"},\"decimal\":-5.678,\"nested_struct\":{\"struct_element\":{\"nested_struct_element\":\"nested_struct_value\"}},\"string\":\"string\",\"double\":1.234},\"maxValues\":{\"integer\":2,\"timestamp\":\"2022-10-24T22:59:35.117Z\",\"struct\":{\"struct_element\":\"struct_value\"},\"decimal\":-5.678,\"double\":1.234,\"nested_struct\":{\"struct_element\":{\"nested_struct_element\":\"nested_struct_value\"}},\"date\":\"2022-10-24\",\"string\":\"string\"},\"nullCount\":{\"struct\":{\"struct_element\":0},\"double\":0,\"map\":0,\"array\":0,\"boolean\":0,\"integer\":0,\"date\":0,\"binary\":0,\"decimal\":0,\"string\":0,\"nested_struct\":{\"struct_element\":{\"nested_struct_element\":0}},\"struct_of_array_of_map\":{\"struct_element\":0},\"null\":1,\"timestamp\":0}}", + "{\"numRecords\":1,\"minValues\":{\"timestamp\":\"2022-10-24T22:59:39.489Z\",\"string\":\"string\",\"struct\":{\"struct_element\":\"struct_value\"},\"date\":\"2022-10-24\",\"integer\":6,\"double\":1.234,\"decimal\":-5.678,\"nested_struct\":{\"struct_element\":{\"nested_struct_element\":\"nested_struct_value\"}}},\"maxValues\":{\"timestamp\":\"2022-10-24T22:59:39.489Z\",\"decimal\":-5.678,\"integer\":6,\"nested_struct\":{\"struct_element\":{\"nested_struct_element\":\"nested_struct_value\"}},\"string\":\"string\",\"struct\":{\"struct_element\":\"struct_value\"},\"date\":\"2022-10-24\",\"double\":1.234},\"nullCount\":{\"double\":0,\"decimal\":0,\"boolean\":0,\"struct\":{\"struct_element\":0},\"date\":0,\"nested_struct\":{\"struct_element\":{\"nested_struct_element\":0}},\"timestamp\":0,\"array\":0,\"binary\":0,\"map\":0,\"null\":1,\"struct_of_array_of_map\":{\"struct_element\":0},\"string\":0,\"integer\":0}}", + "{\"numRecords\":1,\"minValues\":{\"timestamp\":\"2022-10-24T22:59:41.637Z\",\"decimal\":-5.678,\"date\":\"2022-10-24\",\"integer\":8,\"string\":\"string\",\"double\":1.234,\"struct\":{\"struct_element\":\"struct_value\"},\"nested_struct\":{\"struct_element\":{\"nested_struct_element\":\"nested_struct_value\"}}},\"maxValues\":{\"date\":\"2022-10-24\",\"nested_struct\":{\"struct_element\":{\"nested_struct_element\":\"nested_struct_value\"}},\"decimal\":-5.678,\"double\":1.234,\"integer\":8,\"timestamp\":\"2022-10-24T22:59:41.637Z\",\"string\":\"string\",\"struct\":{\"struct_element\":\"struct_value\"}},\"nullCount\":{\"array\":0,\"nested_struct\":{\"struct_element\":{\"nested_struct_element\":0}},\"timestamp\":0,\"string\":0,\"struct_of_array_of_map\":{\"struct_element\":0},\"map\":0,\"integer\":0,\"binary\":0,\"double\":0,\"null\":1,\"date\":0,\"decimal\":0,\"boolean\":0,\"struct\":{\"struct_element\":0}}}", + "{\"numRecords\":1,\"minValues\":{\"timestamp\":\"2022-10-24T22:59:46.083Z\",\"date\":\"2022-10-24\",\"struct\":{\"struct_element\":\"struct_value\"},\"integer\":11,\"string\":\"string\",\"double\":1.234,\"decimal\":-5.678,\"nested_struct\":{\"struct_element\":{\"nested_struct_element\":\"nested_struct_value\"}}},\"maxValues\":{\"decimal\":-5.678,\"nested_struct\":{\"struct_element\":{\"nested_struct_element\":\"nested_struct_value\"}},\"date\":\"2022-10-24\",\"timestamp\":\"2022-10-24T22:59:46.083Z\",\"string\":\"string\",\"struct\":{\"struct_element\":\"struct_value\"},\"double\":1.234,\"integer\":11},\"nullCount\":{\"nested_struct\":{\"struct_element\":{\"nested_struct_element\":0}},\"binary\":0,\"date\":0,\"double\":0,\"map\":0,\"null\":1,\"struct_of_array_of_map\":{\"struct_element\":0},\"string\":0,\"boolean\":0,\"integer\":0,\"struct\":{\"struct_element\":0},\"decimal\":0,\"timestamp\":0,\"array\":0}}", + "{\"numRecords\":1,\"minValues\":{\"date\":\"2022-10-24\",\"nested_struct\":{\"struct_element\":{\"nested_struct_element\":\"nested_struct_value\"}},\"decimal\":-5.678,\"timestamp\":\"2022-10-24T22:59:40.572Z\",\"struct\":{\"struct_element\":\"struct_value\"},\"string\":\"string\",\"integer\":7,\"double\":1.234},\"maxValues\":{\"integer\":7,\"nested_struct\":{\"struct_element\":{\"nested_struct_element\":\"nested_struct_value\"}},\"struct\":{\"struct_element\":\"struct_value\"},\"double\":1.234,\"decimal\":-5.678,\"string\":\"string\",\"timestamp\":\"2022-10-24T22:59:40.572Z\",\"date\":\"2022-10-24\"},\"nullCount\":{\"double\":0,\"binary\":0,\"boolean\":0,\"timestamp\":0,\"array\":0,\"null\":1,\"nested_struct\":{\"struct_element\":{\"nested_struct_element\":0}},\"struct\":{\"struct_element\":0},\"struct_of_array_of_map\":{\"struct_element\":0},\"map\":0,\"integer\":0,\"date\":0,\"string\":0,\"decimal\":0}}", + "{\"numRecords\":1,\"minValues\":{\"double\":1.234,\"string\":\"string\",\"date\":\"2022-10-24\",\"struct\":{\"struct_element\":\"struct_value\"},\"nested_struct\":{\"struct_element\":{\"nested_struct_element\":\"nested_struct_value\"}},\"timestamp\":\"2022-10-24T22:59:42.908Z\",\"integer\":9,\"decimal\":-5.678,\"new_column\":0},\"maxValues\":{\"date\":\"2022-10-24\",\"nested_struct\":{\"struct_element\":{\"nested_struct_element\":\"nested_struct_value\"}},\"decimal\":-5.678,\"integer\":9,\"double\":1.234,\"string\":\"string\",\"struct\":{\"struct_element\":\"struct_value\"},\"timestamp\":\"2022-10-24T22:59:42.908Z\",\"new_column\":0},\"nullCount\":{\"double\":0,\"new_column\":0,\"struct_of_array_of_map\":{\"struct_element\":0},\"date\":0,\"map\":0,\"decimal\":0,\"null\":1,\"timestamp\":0,\"struct\":{\"struct_element\":0},\"boolean\":0,\"nested_struct\":{\"struct_element\":{\"nested_struct_element\":0}},\"binary\":0,\"integer\":0,\"array\":0,\"string\":0}}", + ]; + + for (maybe_result_stat, expected_stat) in result_stats.iter().zip(expected_stats.into_iter()) { + let result_value: serde_json::Value = + serde_json::from_str(maybe_result_stat.unwrap()).unwrap(); + let expected_value: serde_json::Value = serde_json::from_str(expected_stat).unwrap(); + assert_eq!(result_value, expected_value); + } } From ab4546ece3602e72df5436749dd2a3c36f9d37e9 Mon Sep 17 00:00:00 2001 From: Will Jones Date: Fri, 30 Dec 2022 13:42:45 -0800 Subject: [PATCH 06/13] feat: get flattened version of stats working --- rust/Cargo.toml | 1 + rust/src/lib.rs | 3 + rust/src/table_state.rs | 131 ---------- rust/src/table_state_arrow.rs | 424 +++++++++++++++++++++++++++++++++ rust/tests/add_actions_test.rs | 273 +++++++++++++++++++-- 5 files changed, 676 insertions(+), 156 deletions(-) create mode 100644 rust/src/table_state_arrow.rs diff --git a/rust/Cargo.toml b/rust/Cargo.toml index f04bd04e4d..c18c8d35a9 100644 --- a/rust/Cargo.toml +++ b/rust/Cargo.toml @@ -35,6 +35,7 @@ tokio = { version = "1", features = ["macros", "rt", "parking_lot"] } regex = "1" uuid = { version = "1", features = ["serde", "v4"] } url = "2.3" +num = { version = "0.4", default-features = false, features = ["std"] } # S3 lock client rusoto_core = { version = "0.48", default-features = false, optional = true } diff --git a/rust/src/lib.rs b/rust/src/lib.rs index 280ce33710..529f98cbd2 100644 --- a/rust/src/lib.rs +++ b/rust/src/lib.rs @@ -92,6 +92,9 @@ pub mod table_properties; pub mod table_state; pub mod time_utils; +#[cfg(all(feature = "arrow"))] +pub mod table_state_arrow; + #[cfg(all(feature = "arrow", feature = "parquet"))] pub mod checkpoints; #[cfg(all(feature = "arrow", feature = "parquet"))] diff --git a/rust/src/table_state.rs b/rust/src/table_state.rs index 5efdce9d2e..23f7812395 100644 --- a/rust/src/table_state.rs +++ b/rust/src/table_state.rs @@ -8,18 +8,14 @@ use crate::{ ApplyLogError, DeltaDataTypeLong, DeltaDataTypeVersion, DeltaTable, DeltaTableError, DeltaTableMetaData, }; -use arrow::array::ArrayRef; -use arrow::compute::cast; use chrono::Utc; use object_store::{path::Path, ObjectStore}; use serde::{Deserialize, Serialize}; use serde_json::{Map, Value}; -use std::borrow::Cow; use std::collections::HashMap; use std::collections::HashSet; use std::convert::TryFrom; use std::io::{BufRead, BufReader, Cursor}; -use std::sync::Arc; #[cfg(any(feature = "parquet", feature = "parquet2"))] use super::{CheckPoint, DeltaTableConfig}; @@ -382,137 +378,10 @@ impl DeltaTableState { }); Ok(actions) } - - /// Get the add actions as a RecordBatch - pub fn add_actions_table( - &self, - flatten: bool, - ) -> Result { - let mut paths = arrow::array::StringBuilder::with_capacity( - self.files.len(), - self.files.iter().map(|add| add.path.len()).sum(), - ); - let mut size = arrow::array::Int64Builder::with_capacity(self.files.len()); - let mut mod_time = - arrow::array::TimestampMillisecondBuilder::with_capacity(self.files.len()); - let mut data_change = arrow::array::BooleanBuilder::with_capacity(self.files.len()); - let mut stats = arrow::array::StringBuilder::with_capacity( - self.files.len(), - self.files - .iter() - .map(|add| add.stats.as_ref().map(|s| s.len()).unwrap_or(0)) - .sum(), - ); - - for action in &self.files { - paths.append_value(&action.path); - size.append_value(action.size); - mod_time.append_value(action.modification_time); - data_change.append_value(action.data_change); - - if let Some(action_stats) = &action.get_stats()? { - stats.append_value(serde_json::to_string(action_stats)?); - } else { - stats.append_null(); - } - } - - let mut arrays: Vec<(Cow, ArrayRef)> = vec![ - (Cow::Borrowed("path"), Arc::new(paths.finish())), - (Cow::Borrowed("size_bytes"), Arc::new(size.finish())), - ( - Cow::Borrowed("modification_time"), - Arc::new(mod_time.finish()), - ), - (Cow::Borrowed("data_change"), Arc::new(data_change.finish())), - (Cow::Borrowed("stats"), Arc::new(stats.finish())), - ]; - let metadata = self - .current_metadata - .as_ref() - .ok_or(DeltaTableError::NoMetadata)?; - let partition_column_types: Vec = metadata - .partition_columns - .iter() - .map( - |name| -> Result { - let field = metadata.schema.get_field_with_name(name)?; - Ok(field.get_type().try_into()?) - }, - ) - .collect::>()?; - - // Create builder for each - let mut builders = metadata - .partition_columns - .iter() - .map(|name| { - let builder = arrow::array::StringBuilder::new(); - (name.as_str(), builder) - }) - .collect::>(); - - // Append values - for action in &self.files { - for (name, maybe_value) in action.partition_values.iter() { - if let Some(value) = maybe_value { - builders.get_mut(name.as_str()).unwrap().append_value(value); - } else { - builders.get_mut(name.as_str()).unwrap().append_null(); - } - } - } - - // Cast them to their appropriate types - let partition_columns: Vec = metadata - .partition_columns - .iter() - // Get the builders in their original order - .map(|name| builders.remove(name.as_str()).unwrap()) - .zip(partition_column_types.iter()) - .map(|(mut builder, datatype)| { - let string_arr: ArrayRef = Arc::new(builder.finish()); - Ok(cast(&string_arr, datatype)?) - }) - .collect::>()?; - - // if flatten, append columns, otherwise combine into a struct column - let partition_columns: Vec<(Cow, ArrayRef)> = if flatten { - partition_columns - .into_iter() - .zip(metadata.partition_columns.iter()) - .map(|(array, name)| { - let name: Cow = Cow::Owned(format!("partition_{}", name)); - (name, array) - }) - .collect() - } else { - let fields = partition_column_types - .into_iter() - .zip(metadata.partition_columns.iter()) - .map(|(datatype, name)| arrow::datatypes::Field::new(name, datatype, true)); - let field_arrays = fields - .zip(partition_columns.into_iter()) - .collect::>(); - if field_arrays.is_empty() { - vec![] - } else { - let arr = Arc::new(arrow::array::StructArray::from(field_arrays)); - vec![(Cow::Borrowed("partition_values"), arr)] - } - }; - - arrays.extend(partition_columns.into_iter()); - - Ok(arrow::record_batch::RecordBatch::try_from_iter(arrays)?) - } } - -#[cfg(test)] mod tests { use super::*; use pretty_assertions::assert_eq; - use std::collections::HashMap; #[test] fn state_round_trip() { diff --git a/rust/src/table_state_arrow.rs b/rust/src/table_state_arrow.rs new file mode 100644 index 0000000000..3c8d87230a --- /dev/null +++ b/rust/src/table_state_arrow.rs @@ -0,0 +1,424 @@ +//! Methods to get Delta Table state in Arrow structures + +use crate::action::ColumnCountStat; +use crate::action::ColumnValueStat; +use crate::action::Stats; +use crate::arrow::array::TimestampMicrosecondArray; +use crate::table_state::DeltaTableState; +use crate::DeltaDataTypeLong; +use crate::DeltaTableError; +use crate::SchemaDataType; +use crate::SchemaTypeStruct; +use arrow::array::ArrayRef; +use arrow::array::BinaryArray; +use arrow::array::BooleanArray; +use arrow::array::Date32Array; +use arrow::array::Int64Array; +use arrow::array::PrimitiveArray; +use arrow::array::StringArray; +use arrow::array::TimestampMillisecondArray; +use arrow::compute::cast; +use arrow::compute::kernels::cast_utils::Parser; +use arrow::datatypes::ArrowPrimitiveType; +use arrow::datatypes::DataType; +use arrow::datatypes::Date32Type; +use arrow::datatypes::TimeUnit; +use arrow::datatypes::TimestampMicrosecondType; +use std::borrow::Cow; +use std::collections::HashMap; +use std::collections::VecDeque; +use std::sync::Arc; + +impl DeltaTableState { + /// Get the add actions as a RecordBatch + pub fn add_actions_table( + &self, + flatten: bool, + parse_stats: bool, + ) -> Result { + let mut paths = arrow::array::StringBuilder::with_capacity( + self.files().len(), + self.files().iter().map(|add| add.path.len()).sum(), + ); + for action in self.files() { + paths.append_value(&action.path); + } + + let size = self + .files() + .iter() + .map(|file| file.size) + .collect::(); + let mod_time: TimestampMillisecondArray = self + .files() + .iter() + .map(|file| file.modification_time) + .collect::>() + .into(); + let data_change = self + .files() + .iter() + .map(|file| Some(file.data_change)) + .collect::(); + + let mut arrays: Vec<(Cow, ArrayRef)> = vec![ + (Cow::Borrowed("path"), Arc::new(paths.finish())), + (Cow::Borrowed("size_bytes"), Arc::new(size)), + (Cow::Borrowed("modification_time"), Arc::new(mod_time)), + (Cow::Borrowed("data_change"), Arc::new(data_change)), + ]; + let metadata = self.current_metadata().ok_or(DeltaTableError::NoMetadata)?; + let partition_column_types: Vec = metadata + .partition_columns + .iter() + .map( + |name| -> Result { + let field = metadata.schema.get_field_with_name(name)?; + Ok(field.get_type().try_into()?) + }, + ) + .collect::>()?; + + // Create builder for each + let mut builders = metadata + .partition_columns + .iter() + .map(|name| { + let builder = arrow::array::StringBuilder::new(); + (name.as_str(), builder) + }) + .collect::>(); + + // Append values + for action in self.files() { + for (name, maybe_value) in action.partition_values.iter() { + if let Some(value) = maybe_value { + builders.get_mut(name.as_str()).unwrap().append_value(value); + } else { + builders.get_mut(name.as_str()).unwrap().append_null(); + } + } + } + + // Cast them to their appropriate types + let partition_columns: Vec = metadata + .partition_columns + .iter() + // Get the builders in their original order + .map(|name| builders.remove(name.as_str()).unwrap()) + .zip(partition_column_types.iter()) + .map(|(mut builder, datatype)| { + let string_arr: ArrayRef = Arc::new(builder.finish()); + Ok(cast(&string_arr, datatype)?) + }) + .collect::>()?; + + // if flatten, append columns, otherwise combine into a struct column + let partition_columns: Vec<(Cow, ArrayRef)> = if flatten { + partition_columns + .into_iter() + .zip(metadata.partition_columns.iter()) + .map(|(array, name)| { + let name: Cow = Cow::Owned(format!("partition_{}", name)); + (name, array) + }) + .collect() + } else { + let fields = partition_column_types + .into_iter() + .zip(metadata.partition_columns.iter()) + .map(|(datatype, name)| arrow::datatypes::Field::new(name, datatype, true)); + let field_arrays = fields + .zip(partition_columns.into_iter()) + .collect::>(); + if field_arrays.is_empty() { + vec![] + } else { + let arr = Arc::new(arrow::array::StructArray::from(field_arrays)); + vec![(Cow::Borrowed("partition_values"), arr)] + } + }; + + if parse_stats { + let stats = self.stats_as_batch(flatten)?; + arrays.extend( + stats + .schema() + .fields + .iter() + .map(|field| Cow::Owned(field.name().clone())) + .zip(stats.columns().iter().map(Arc::clone)), + ); + } else { + let mut stats_builder = arrow::array::StringBuilder::with_capacity( + self.files().len(), + self.files() + .iter() + .map(|add| add.stats.as_ref().map(|s| s.len()).unwrap_or(0)) + .sum(), + ); + + for action in self.files() { + if let Some(stats) = &action.get_stats()? { + stats_builder.append_value(&serde_json::to_string(stats)?); + } else { + stats_builder.append_null(); + } + } + arrays.push((Cow::Borrowed("stats"), Arc::new(stats_builder.finish()))); + } + + arrays.extend(partition_columns.into_iter()); + + Ok(arrow::record_batch::RecordBatch::try_from_iter(arrays)?) + } + + fn stats_as_batch( + &self, + flatten: bool, + ) -> Result { + let stats: Vec> = self + .files() + .iter() + .map(|f| { + f.get_stats() + .map_err(|err| DeltaTableError::InvalidJson { source: err }) + }) + .collect::>()?; + + let num_records = arrow::array::Int64Array::from( + stats + .iter() + .map(|maybe_stat| maybe_stat.as_ref().map(|stat| stat.num_records)) + .collect::>>(), + ); + let metadata = self.current_metadata().ok_or(DeltaTableError::NoMetadata)?; + let schema = &metadata.schema; + + struct ColStats<'a> { + path: Vec<&'a str>, + null_count: Option, + min_values: Option, + max_values: Option, + } + let columnar_stats: Vec = SchemaLeafIterator::new(schema) + .filter(|(_path, datatype)| !matches!(datatype, SchemaDataType::r#struct(_))) + .map(|(path, datatype)| -> Result { + let null_count: Option = stats + .iter() + .flat_map(|maybe_stat| { + maybe_stat + .as_ref() + .map(|stat| resolve_column_count_stat(&stat.null_count, &path)) + }) + .collect::>>() + .map(arrow::array::Int64Array::from) + .map(|arr| -> ArrayRef { Arc::new(arr) }); + + let arrow_type: arrow::datatypes::DataType = datatype.try_into()?; + + // Min and max are collected for primitive values, not list or maps + let min_values = if matches!(datatype, SchemaDataType::primitive(_)) { + stats + .iter() + .flat_map(|maybe_stat| { + maybe_stat + .as_ref() + .map(|stat| resolve_column_value_stat(&stat.min_values, &path)) + }) + .collect::>>() + .map(|min_values| { + json_value_to_array_general(&arrow_type, min_values.into_iter()) + }) + .transpose()? + } else { + None + }; + + let max_values = if matches!(datatype, SchemaDataType::primitive(_)) { + stats + .iter() + .flat_map(|maybe_stat| { + maybe_stat + .as_ref() + .map(|stat| resolve_column_value_stat(&stat.max_values, &path)) + }) + .collect::>>() + .map(|max_values| { + json_value_to_array_general(&arrow_type, max_values.into_iter()) + }) + .transpose()? + } else { + None + }; + + Ok(ColStats { + path, + null_count, + min_values, + max_values, + }) + }) + .collect::>()?; + + let mut out_columns: Vec<(Cow, ArrayRef)> = + vec![(Cow::Borrowed("num_records"), Arc::new(num_records))]; + if flatten { + for col_stats in columnar_stats { + if let Some(null_count) = col_stats.null_count { + out_columns.push(( + Cow::Owned(format!("null_count.{}", col_stats.path.join("."))), + null_count, + )); + } + if let Some(min_values) = col_stats.min_values { + out_columns.push(( + Cow::Owned(format!("min.{}", col_stats.path.join("."))), + min_values, + )); + } + if let Some(max_values) = col_stats.max_values { + out_columns.push(( + Cow::Owned(format!("max.{}", col_stats.path.join("."))), + max_values, + )); + } + } + } else { + todo!() + } + + Ok(arrow::record_batch::RecordBatch::try_from_iter( + out_columns, + )?) + } +} + +fn resolve_column_value_stat<'a>( + values: &'a HashMap, + path: &[&'a str], +) -> Option<&'a serde_json::Value> { + let mut current = values; + let (&name, path) = path.split_last()?; + for &segment in path { + current = current.get(segment)?.as_column()?; + } + let current = current.get(name)?; + current.as_value() +} + +fn resolve_column_count_stat( + values: &HashMap, + path: &[&str], +) -> Option { + let mut current = values; + let (&name, path) = path.split_last()?; + for &segment in path { + current = current.get(segment)?.as_column()?; + } + let current = current.get(name)?; + current.as_value() +} + +struct SchemaLeafIterator<'a> { + fields_remaining: VecDeque<(Vec<&'a str>, &'a SchemaDataType)>, +} + +impl<'a> SchemaLeafIterator<'a> { + fn new(schema: &'a SchemaTypeStruct) -> Self { + SchemaLeafIterator { + fields_remaining: schema + .get_fields() + .iter() + .map(|field| (vec![field.get_name()], field.get_type())) + .collect(), + } + } +} + +impl<'a> std::iter::Iterator for SchemaLeafIterator<'a> { + type Item = (Vec<&'a str>, &'a SchemaDataType); + + fn next(&mut self) -> Option { + if let Some((path, datatype)) = self.fields_remaining.pop_front() { + if let SchemaDataType::r#struct(struct_type) = datatype { + // push child fields to front + for field in struct_type.get_fields() { + let mut new_path = path.clone(); + new_path.push(field.get_name()); + self.fields_remaining + .push_front((new_path, field.get_type())); + } + }; + + Some((path, datatype)) + } else { + None + } + } +} + +fn json_value_to_array_general<'a>( + datatype: &arrow::datatypes::DataType, + values: impl Iterator, +) -> Result { + match datatype { + DataType::Boolean => Ok(Arc::new( + values + .map(|value| value.as_bool()) + .collect::(), + )), + DataType::Int64 => json_value_to_array::(values), + DataType::Int32 => json_value_to_array::(values), + DataType::Int16 => json_value_to_array::(values), + DataType::Int8 => json_value_to_array::(values), + DataType::Float32 => json_value_to_array::(values), + DataType::Float64 => json_value_to_array::(values), + DataType::Utf8 => Ok(Arc::new( + values.map(|value| value.as_str()).collect::(), + )), + DataType::Binary => Ok(Arc::new( + values.map(|value| value.as_str()).collect::(), + )), + DataType::Timestamp(TimeUnit::Microsecond, None) => { + Ok(Arc::new(TimestampMicrosecondArray::from( + values + .map(|value| value.as_str().and_then(TimestampMicrosecondType::parse)) + .collect::>>(), + ))) + } + DataType::Date32 => Ok(Arc::new(Date32Array::from( + values + .map(|value| value.as_str().and_then(Date32Type::parse)) + .collect::>>(), + ))), + DataType::Decimal128(_, _) => { + let float_arr = json_value_to_array::(values)?; + Ok(arrow::compute::cast(&float_arr, datatype)?) + } + _ => Err(DeltaTableError::Generic("Invalid datatype".to_string())), + } +} + +fn json_value_to_array<'a, T>( + values: impl Iterator, +) -> Result +where + T: ArrowPrimitiveType, + T::Native: num::NumCast, +{ + // Adapted from + Ok(Arc::new( + values + .map(|value| -> Option { + if value.is_i64() { + value.as_i64().and_then(num::cast::cast) + } else if value.is_u64() { + value.as_u64().and_then(num::cast::cast) + } else { + value.as_f64().and_then(num::cast::cast) + } + }) + .collect::>(), + )) +} diff --git a/rust/tests/add_actions_test.rs b/rust/tests/add_actions_test.rs index 377d796caf..bf957160a2 100644 --- a/rust/tests/add_actions_test.rs +++ b/rust/tests/add_actions_test.rs @@ -1,8 +1,9 @@ #![cfg(feature = "arrow")] use arrow::array::{self, ArrayRef, StringArray}; +use arrow::compute::kernels::cast_utils::Parser; use arrow::compute::sort_to_indices; -use arrow::datatypes::{DataType, Field}; +use arrow::datatypes::{DataType, Date32Type, Field, TimestampMicrosecondType}; use arrow::record_batch::RecordBatch; use std::sync::Arc; @@ -24,12 +25,14 @@ fn sort_batch_by(batch: &RecordBatch, column: &str) -> arrow::error::Result = vec![ @@ -49,7 +52,7 @@ async fn test_add_action_table() { assert_eq!(expected, actions); - let actions = table.get_state().add_actions_table(false).unwrap(); + let actions = table.get_state().add_actions_table(false, false).unwrap(); let actions = sort_batch_by(&actions, "path").unwrap(); expected_columns[5] = ( @@ -62,12 +65,15 @@ async fn test_add_action_table() { let expected = RecordBatch::try_from_iter(expected_columns).unwrap(); assert_eq!(expected, actions); +} +#[tokio::test] +async fn test_without_partitions() { // test table without partitions let path = "./tests/data/simple_table"; let table = deltalake::open_table(path).await.unwrap(); - let actions = table.get_state().add_actions_table(true).unwrap(); + let actions = table.get_state().add_actions_table(true, false).unwrap(); let actions = sort_batch_by(&actions, "path").unwrap(); let expected_columns: Vec<(&str, ArrayRef)> = vec![ @@ -110,7 +116,7 @@ async fn test_add_action_table() { assert_eq!(expected, actions); - let actions = table.get_state().add_actions_table(false).unwrap(); + let actions = table.get_state().add_actions_table(false, false).unwrap(); let actions = sort_batch_by(&actions, "path").unwrap(); // For now, this column is ignored. @@ -121,37 +127,76 @@ async fn test_add_action_table() { let expected = RecordBatch::try_from_iter(expected_columns.clone()).unwrap(); assert_eq!(expected, actions); +} +#[tokio::test] +async fn test_with_stats() { // test table with stats let path = "./tests/data/delta-0.8.0"; let table = deltalake::open_table(path).await.unwrap(); - let actions = table.get_state().add_actions_table(true).unwrap(); + let actions = table.get_state().add_actions_table(true, false).unwrap(); let actions = sort_batch_by(&actions, "path").unwrap(); + let stats_col_index = actions.schema().column_with_name("stats").unwrap().0; + let result_stats: &StringArray = actions + .column(stats_col_index) + .as_any() + .downcast_ref() + .unwrap(); + let actions = actions + .project( + &(0..actions.num_columns()) + .into_iter() + .filter(|i| i != &stats_col_index) + .collect::>(), + ) + .unwrap(); + let expected_columns: Vec<(&str, ArrayRef)> = vec![ - ("path", Arc::new(array::StringArray::from(vec![ - "part-00000-04ec9591-0b73-459e-8d18-ba5711d6cbe1-c000.snappy.parquet", - "part-00000-c9b90f86-73e6-46c8-93ba-ff6bfaf892a1-c000.snappy.parquet", - ]))), - ("size_bytes", Arc::new(array::Int64Array::from(vec![440, 440]))), - ("modification_time", Arc::new(arrow::array::TimestampMillisecondArray::from(vec![ - 1615043776000, 1615043767000 - ]))), - ("data_change", Arc::new(array::BooleanArray::from(vec![true, true]))), - ("stats", Arc::new(array::StringArray::from(vec![ - "{\"numRecords\":2,\"minValues\":{\"value\":2},\"maxValues\":{\"value\":4},\"nullCount\":{\"value\":0}}", - "{\"numRecords\":2,\"minValues\":{\"value\":0},\"maxValues\":{\"value\":2},\"nullCount\":{\"value\":0}}", - ]))), + ( + "path", + Arc::new(array::StringArray::from(vec![ + "part-00000-04ec9591-0b73-459e-8d18-ba5711d6cbe1-c000.snappy.parquet", + "part-00000-c9b90f86-73e6-46c8-93ba-ff6bfaf892a1-c000.snappy.parquet", + ])), + ), + ( + "size_bytes", + Arc::new(array::Int64Array::from(vec![440, 440])), + ), + ( + "modification_time", + Arc::new(arrow::array::TimestampMillisecondArray::from(vec![ + 1615043776000, + 1615043767000, + ])), + ), + ( + "data_change", + Arc::new(array::BooleanArray::from(vec![true, true])), + ), ]; let expected = RecordBatch::try_from_iter(expected_columns.clone()).unwrap(); assert_eq!(expected, actions); + let expected_stats = vec![ + "{\"numRecords\":2,\"minValues\":{\"value\":2},\"maxValues\":{\"value\":4},\"nullCount\":{\"value\":0}}", + "{\"numRecords\":2,\"minValues\":{\"value\":0},\"maxValues\":{\"value\":2},\"nullCount\":{\"value\":0}}", + ]; + + for (maybe_result_stat, expected_stat) in result_stats.iter().zip(expected_stats.into_iter()) { + assert_json_equal(maybe_result_stat.unwrap(), expected_stat); + } +} + +#[tokio::test] +async fn test_only_struct_stats() { // test table with no json stats let path = "./tests/data/delta-1.2.1-only-struct-stats"; let table = deltalake::open_table(path).await.unwrap(); - let actions = table.get_state().add_actions_table(true).unwrap(); + let actions = table.get_state().add_actions_table(true, false).unwrap(); let actions = sort_batch_by(&actions, "path").unwrap(); let stats_col_index = actions.schema().column_with_name("stats").unwrap().0; @@ -238,9 +283,187 @@ async fn test_add_action_table() { ]; for (maybe_result_stat, expected_stat) in result_stats.iter().zip(expected_stats.into_iter()) { - let result_value: serde_json::Value = - serde_json::from_str(maybe_result_stat.unwrap()).unwrap(); - let expected_value: serde_json::Value = serde_json::from_str(expected_stat).unwrap(); - assert_eq!(result_value, expected_value); + assert_json_equal(maybe_result_stat.unwrap(), expected_stat); } } + +#[tokio::test] +async fn test_parsed_stats() { + // test table with no json stats + let path = "./tests/data/delta-1.2.1-only-struct-stats"; + let mut table = deltalake::open_table(path).await.unwrap(); + table.load_version(1).await.unwrap(); + + let actions = table.get_state().add_actions_table(true, true).unwrap(); + + let expected_columns: Vec<(&str, ArrayRef)> = vec![ + ( + "path", + Arc::new(array::StringArray::from(vec![ + "part-00000-7a509247-4f58-4453-9202-51d75dee59af-c000.snappy.parquet", + ])), + ), + ("size_bytes", Arc::new(array::Int64Array::from(vec![5489]))), + ( + "modification_time", + Arc::new(arrow::array::TimestampMillisecondArray::from(vec![ + 1666652373000, + ])), + ), + ( + "data_change", + Arc::new(array::BooleanArray::from(vec![true])), + ), + ("num_records", Arc::new(array::Int64Array::from(vec![1]))), + ( + "null_count.integer", + Arc::new(array::Int64Array::from(vec![0])), + ), + ("min.integer", Arc::new(array::Int32Array::from(vec![0]))), + ("max.integer", Arc::new(array::Int32Array::from(vec![0]))), + ( + "null_count.null", + Arc::new(array::Int64Array::from(vec![1])), + ), + ( + "null_count.boolean", + Arc::new(array::Int64Array::from(vec![0])), + ), + ( + "null_count.double", + Arc::new(array::Int64Array::from(vec![0])), + ), + ( + "min.double", + Arc::new(array::Float64Array::from(vec![1.234])), + ), + ( + "max.double", + Arc::new(array::Float64Array::from(vec![1.234])), + ), + ( + "null_count.decimal", + Arc::new(array::Int64Array::from(vec![0])), + ), + ( + "min.decimal", + Arc::new( + array::Decimal128Array::from_iter_values([-567800]) + .with_precision_and_scale(8, 5) + .unwrap(), + ), + ), + ( + "max.decimal", + Arc::new( + array::Decimal128Array::from_iter_values([-567800]) + .with_precision_and_scale(8, 5) + .unwrap(), + ), + ), + ( + "null_count.string", + Arc::new(array::Int64Array::from(vec![0])), + ), + ( + "min.string", + Arc::new(array::StringArray::from(vec!["string"])), + ), + ( + "max.string", + Arc::new(array::StringArray::from(vec!["string"])), + ), + ( + "null_count.binary", + Arc::new(array::Int64Array::from(vec![0])), + ), + ( + "null_count.date", + Arc::new(array::Int64Array::from(vec![0])), + ), + ( + "min.date", + Arc::new(array::Date32Array::from(vec![Date32Type::parse( + "2022-10-24", + )])), + ), + ( + "max.date", + Arc::new(array::Date32Array::from(vec![Date32Type::parse( + "2022-10-24", + )])), + ), + ( + "null_count.timestamp", + Arc::new(array::Int64Array::from(vec![0])), + ), + ( + "min.timestamp", + Arc::new(array::TimestampMicrosecondArray::from(vec![ + TimestampMicrosecondType::parse("2022-10-24T22:59:32.846Z"), + ])), + ), + ( + "max.timestamp", + Arc::new(array::TimestampMicrosecondArray::from(vec![ + TimestampMicrosecondType::parse("2022-10-24T22:59:32.846Z"), + ])), + ), + ( + "null_count.struct.struct_element", + Arc::new(array::Int64Array::from(vec![0])), + ), + ( + "min.struct.struct_element", + Arc::new(array::StringArray::from(vec!["struct_value"])), + ), + ( + "max.struct.struct_element", + Arc::new(array::StringArray::from(vec!["struct_value"])), + ), + ("null_count.map", Arc::new(array::Int64Array::from(vec![0]))), + ( + "null_count.array", + Arc::new(array::Int64Array::from(vec![0])), + ), + ( + "null_count.nested_struct.struct_element.nested_struct_element", + Arc::new(array::Int64Array::from(vec![0])), + ), + ( + "min.nested_struct.struct_element.nested_struct_element", + Arc::new(array::StringArray::from(vec!["nested_struct_value"])), + ), + ( + "max.nested_struct.struct_element.nested_struct_element", + Arc::new(array::StringArray::from(vec!["nested_struct_value"])), + ), + ( + "null_count.struct_of_array_of_map.struct_element", + Arc::new(array::Int64Array::from(vec![0])), + ), + ]; + let expected = RecordBatch::try_from_iter(expected_columns.clone()).unwrap(); + + assert_eq!( + expected + .schema() + .fields() + .iter() + .map(|field| field.name().as_str()) + .collect::>(), + actions + .schema() + .fields() + .iter() + .map(|field| field.name().as_str()) + .collect::>() + ); + assert_eq!(expected, actions); +} + +fn assert_json_equal(left: &str, right: &str) { + let left_value: serde_json::Value = serde_json::from_str(left).unwrap(); + let right_value: serde_json::Value = serde_json::from_str(right).unwrap(); + assert_eq!(left_value, right_value); +} From 290c53ced0796d7f36f475df1af7569e5e364da3 Mon Sep 17 00:00:00 2001 From: Will Jones Date: Tue, 3 Jan 2023 18:32:05 -0800 Subject: [PATCH 07/13] test nested stats --- python/deltalake/table.py | 2 +- rust/Cargo.toml | 1 + rust/src/table_state_arrow.rs | 138 +++++++++++++------ rust/tests/add_actions_test.rs | 239 ++++++++++++++------------------- 4 files changed, 200 insertions(+), 180 deletions(-) diff --git a/python/deltalake/table.py b/python/deltalake/table.py index 1791271e96..d7191e90f9 100644 --- a/python/deltalake/table.py +++ b/python/deltalake/table.py @@ -499,4 +499,4 @@ def get_add_actions_df(self, flatten: bool = False) -> pyarrow.RecordBatch: stats: [["{"numRecords": 1, "minValues": {"y": 5}, "maxValues": {"y": 5}, "nullCount": {"y": 0}}","{"numRecords": 1, "minValues": {"y": 6}, "maxValues": {"y": 6}, "nullCount": {"y": 0}}","{"numRecords": 1, "minValues": {"y": 4}, "maxValues": {"y": 4}, "nullCount": {"y": 0}}"]] partition_x: [["2","3","1"]] """ - return self._table.get_add_actions_df(flatten) + return self._table.get_add_actions_df(flatten, parse_stats) diff --git a/rust/Cargo.toml b/rust/Cargo.toml index c18c8d35a9..d10afbff62 100644 --- a/rust/Cargo.toml +++ b/rust/Cargo.toml @@ -17,6 +17,7 @@ chrono = { version = "0.4.22", default-features = false, features = ["clock"] } cfg-if = "1" errno = "0.2" futures = "0.3" +itertools = "0.10" lazy_static = "1" log = "0" libc = ">=0.2.90, <1" diff --git a/rust/src/table_state_arrow.rs b/rust/src/table_state_arrow.rs index 3c8d87230a..d4248543f3 100644 --- a/rust/src/table_state_arrow.rs +++ b/rust/src/table_state_arrow.rs @@ -1,32 +1,23 @@ //! Methods to get Delta Table state in Arrow structures -use crate::action::ColumnCountStat; -use crate::action::ColumnValueStat; -use crate::action::Stats; -use crate::arrow::array::TimestampMicrosecondArray; +use crate::action::{ColumnCountStat, ColumnValueStat, Stats}; use crate::table_state::DeltaTableState; use crate::DeltaDataTypeLong; use crate::DeltaTableError; use crate::SchemaDataType; use crate::SchemaTypeStruct; -use arrow::array::ArrayRef; -use arrow::array::BinaryArray; -use arrow::array::BooleanArray; -use arrow::array::Date32Array; -use arrow::array::Int64Array; -use arrow::array::PrimitiveArray; -use arrow::array::StringArray; -use arrow::array::TimestampMillisecondArray; +use arrow::array::{ + ArrayRef, BinaryArray, BooleanArray, Date32Array, Int64Array, PrimitiveArray, StringArray, + StructArray, TimestampMicrosecondArray, TimestampMillisecondArray, +}; use arrow::compute::cast; use arrow::compute::kernels::cast_utils::Parser; -use arrow::datatypes::ArrowPrimitiveType; -use arrow::datatypes::DataType; -use arrow::datatypes::Date32Type; -use arrow::datatypes::TimeUnit; -use arrow::datatypes::TimestampMicrosecondType; +use arrow::datatypes::{ + ArrowPrimitiveType, DataType, Date32Type, Field, TimeUnit, TimestampMicrosecondType, +}; +use itertools::Itertools; use std::borrow::Cow; -use std::collections::HashMap; -use std::collections::VecDeque; +use std::collections::{HashMap, VecDeque}; use std::sync::Arc; impl DeltaTableState { @@ -34,7 +25,6 @@ impl DeltaTableState { pub fn add_actions_table( &self, flatten: bool, - parse_stats: bool, ) -> Result { let mut paths = arrow::array::StringBuilder::with_capacity( self.files().len(), @@ -139,7 +129,7 @@ impl DeltaTableState { } }; - if parse_stats { + if self.files().iter().any(|add| add.stats.is_some()) { let stats = self.stats_as_batch(flatten)?; arrays.extend( stats @@ -149,23 +139,6 @@ impl DeltaTableState { .map(|field| Cow::Owned(field.name().clone())) .zip(stats.columns().iter().map(Arc::clone)), ); - } else { - let mut stats_builder = arrow::array::StringBuilder::with_capacity( - self.files().len(), - self.files() - .iter() - .map(|add| add.stats.as_ref().map(|s| s.len()).unwrap_or(0)) - .sum(), - ); - - for action in self.files() { - if let Some(stats) = &action.get_stats()? { - stats_builder.append_value(&serde_json::to_string(stats)?); - } else { - stats_builder.append_null(); - } - } - arrays.push((Cow::Borrowed("stats"), Arc::new(stats_builder.finish()))); } arrays.extend(partition_columns.into_iter()); @@ -195,13 +168,15 @@ impl DeltaTableState { let metadata = self.current_metadata().ok_or(DeltaTableError::NoMetadata)?; let schema = &metadata.schema; + #[derive(Debug)] struct ColStats<'a> { path: Vec<&'a str>, null_count: Option, min_values: Option, max_values: Option, } - let columnar_stats: Vec = SchemaLeafIterator::new(schema) + + let mut columnar_stats: Vec = SchemaLeafIterator::new(schema) .filter(|(_path, datatype)| !matches!(datatype, SchemaDataType::r#struct(_))) .map(|(path, datatype)| -> Result { let null_count: Option = stats @@ -285,7 +260,90 @@ impl DeltaTableState { } } } else { - todo!() + let mut level = columnar_stats + .iter() + .map(|col_stat| col_stat.path.len()) + .max() + .unwrap_or(0); + + let combine_arrays = |sub_fields: &Vec, + getter: for<'a> fn(&'a ColStats) -> &'a Option| + -> Option { + let fields = sub_fields + .iter() + .flat_map(|sub_field| { + if let Some(values) = getter(sub_field) { + let field = Field::new( + sub_field + .path + .last() + .expect("paths must have at least one element"), + values.data_type().clone(), + false, + ); + Some((field, Arc::clone(values))) + } else { + None + } + }) + .collect::>(); + if fields.is_empty() { + None + } else { + Some(Arc::new(StructArray::from(fields))) + } + }; + + while level > 0 { + // Starting with most nested level, iteratively group null_count, min_values, max_values + // into StructArrays, until it is consolidated into a single array. + columnar_stats = columnar_stats + .into_iter() + .group_by(|col_stat| { + if col_stat.path.len() < level { + col_stat.path.clone() + } else { + col_stat.path[0..(level - 1)].to_vec() + } + }) + .into_iter() + .map(|(prefix, group)| { + let current_fields: Vec = group.into_iter().collect(); + if current_fields[0].path.len() < level { + debug_assert_eq!(current_fields.len(), 1); + current_fields.into_iter().next().unwrap() + } else { + ColStats { + path: prefix.to_vec(), + null_count: combine_arrays(¤t_fields, |sub_field| { + &sub_field.null_count + }), + min_values: combine_arrays(¤t_fields, |sub_field| { + &sub_field.min_values + }), + max_values: combine_arrays(¤t_fields, |sub_field| { + &sub_field.max_values + }), + } + } + }) + .collect(); + level -= 1; + } + debug_assert!(columnar_stats.len() == 1); + debug_assert!(columnar_stats + .iter() + .all(|col_stat| col_stat.path.is_empty())); + + if let Some(null_count) = columnar_stats[0].null_count.take() { + out_columns.push((Cow::Borrowed("null_count"), null_count)); + } + if let Some(min_values) = columnar_stats[0].min_values.take() { + out_columns.push((Cow::Borrowed("min"), min_values)); + } + if let Some(max_values) = columnar_stats[0].max_values.take() { + out_columns.push((Cow::Borrowed("max"), max_values)); + } } Ok(arrow::record_batch::RecordBatch::try_from_iter( diff --git a/rust/tests/add_actions_test.rs b/rust/tests/add_actions_test.rs index bf957160a2..b61accbc53 100644 --- a/rust/tests/add_actions_test.rs +++ b/rust/tests/add_actions_test.rs @@ -1,6 +1,6 @@ #![cfg(feature = "arrow")] -use arrow::array::{self, ArrayRef, StringArray}; +use arrow::array::{self, ArrayRef, StructArray}; use arrow::compute::kernels::cast_utils::Parser; use arrow::compute::sort_to_indices; use arrow::datatypes::{DataType, Date32Type, Field, TimestampMicrosecondType}; @@ -25,14 +25,12 @@ fn sort_batch_by(batch: &RecordBatch, column: &str) -> arrow::error::Result = vec![ @@ -45,17 +43,16 @@ async fn test_with_partitions() { 1627990384000, 1627990384000 ]))), ("data_change", Arc::new(array::BooleanArray::from(vec![true, true]))), - ("stats", Arc::new(array::StringArray::from(vec![None, None]))), ("partition_k", Arc::new(array::StringArray::from(vec![Some("A"), None]))), ]; let expected = RecordBatch::try_from_iter(expected_columns.clone()).unwrap(); assert_eq!(expected, actions); - let actions = table.get_state().add_actions_table(false, false).unwrap(); + let actions = table.get_state().add_actions_table(false).unwrap(); let actions = sort_batch_by(&actions, "path").unwrap(); - expected_columns[5] = ( + expected_columns[4] = ( "partition_values", Arc::new(array::StructArray::from(vec![( Field::new("k", DataType::Utf8, true), @@ -73,7 +70,7 @@ async fn test_without_partitions() { let path = "./tests/data/simple_table"; let table = deltalake::open_table(path).await.unwrap(); - let actions = table.get_state().add_actions_table(true, false).unwrap(); + let actions = table.get_state().add_actions_table(true).unwrap(); let actions = sort_batch_by(&actions, "path").unwrap(); let expected_columns: Vec<(&str, ArrayRef)> = vec![ @@ -107,16 +104,12 @@ async fn test_without_partitions() { true, true, true, true, true, ])), ), - ( - "stats", - Arc::new(array::StringArray::from(vec![None, None, None, None, None])), - ), ]; let expected = RecordBatch::try_from_iter(expected_columns.clone()).unwrap(); assert_eq!(expected, actions); - let actions = table.get_state().add_actions_table(false, false).unwrap(); + let actions = table.get_state().add_actions_table(false).unwrap(); let actions = sort_batch_by(&actions, "path").unwrap(); // For now, this column is ignored. @@ -134,24 +127,9 @@ async fn test_with_stats() { // test table with stats let path = "./tests/data/delta-0.8.0"; let table = deltalake::open_table(path).await.unwrap(); - let actions = table.get_state().add_actions_table(true, false).unwrap(); + let actions = table.get_state().add_actions_table(true).unwrap(); let actions = sort_batch_by(&actions, "path").unwrap(); - let stats_col_index = actions.schema().column_with_name("stats").unwrap().0; - let result_stats: &StringArray = actions - .column(stats_col_index) - .as_any() - .downcast_ref() - .unwrap(); - let actions = actions - .project( - &(0..actions.num_columns()) - .into_iter() - .filter(|i| i != &stats_col_index) - .collect::>(), - ) - .unwrap(); - let expected_columns: Vec<(&str, ArrayRef)> = vec![ ( "path", @@ -175,126 +153,27 @@ async fn test_with_stats() { "data_change", Arc::new(array::BooleanArray::from(vec![true, true])), ), - ]; - let expected = RecordBatch::try_from_iter(expected_columns.clone()).unwrap(); - - assert_eq!(expected, actions); - - let expected_stats = vec![ - "{\"numRecords\":2,\"minValues\":{\"value\":2},\"maxValues\":{\"value\":4},\"nullCount\":{\"value\":0}}", - "{\"numRecords\":2,\"minValues\":{\"value\":0},\"maxValues\":{\"value\":2},\"nullCount\":{\"value\":0}}", - ]; - - for (maybe_result_stat, expected_stat) in result_stats.iter().zip(expected_stats.into_iter()) { - assert_json_equal(maybe_result_stat.unwrap(), expected_stat); - } -} - -#[tokio::test] -async fn test_only_struct_stats() { - // test table with no json stats - let path = "./tests/data/delta-1.2.1-only-struct-stats"; - let table = deltalake::open_table(path).await.unwrap(); - - let actions = table.get_state().add_actions_table(true, false).unwrap(); - let actions = sort_batch_by(&actions, "path").unwrap(); - - let stats_col_index = actions.schema().column_with_name("stats").unwrap().0; - let result_stats: &StringArray = actions - .column(stats_col_index) - .as_any() - .downcast_ref() - .unwrap(); - let actions = actions - .project( - &(0..actions.num_columns()) - .into_iter() - .filter(|i| i != &stats_col_index) - .collect::>(), - ) - .unwrap(); - - let expected_columns: Vec<(&str, ArrayRef)> = vec![ - ( - "path", - Arc::new(array::StringArray::from(vec![ - "part-00000-1c2d1a32-02dc-484f-87ff-4328ea56045d-c000.snappy.parquet", - "part-00000-28925d3a-bdf2-411e-bca9-b067444cbcb0-c000.snappy.parquet", - "part-00000-6630b7c4-0aca-405b-be86-68a812f2e4c8-c000.snappy.parquet", - "part-00000-74151571-7ec6-4bd6-9293-b5daab2ce667-c000.snappy.parquet", - "part-00000-7a509247-4f58-4453-9202-51d75dee59af-c000.snappy.parquet", - "part-00000-8e0aefe1-6645-4601-ac29-68cba64023b5-c000.snappy.parquet", - "part-00000-b26ba634-874c-45b0-a7ff-2f0395a53966-c000.snappy.parquet", - "part-00000-c4c8caec-299d-42a4-b50c-5a4bf724c037-c000.snappy.parquet", - "part-00000-ce300400-58ff-4b8f-8ba9-49422fdf9f2e-c000.snappy.parquet", - "part-00000-e1262b3e-2959-4910-aea9-4eaf92f0c68c-c000.snappy.parquet", - "part-00000-e8e3753f-e2f6-4c9f-98f9-8f3d346727ba-c000.snappy.parquet", - "part-00000-f73ff835-0571-4d67-ac43-4fbf948bfb9b-c000.snappy.parquet", - ])), - ), + ("num_records", Arc::new(array::Int64Array::from(vec![2, 2]))), ( - "size_bytes", - Arc::new(array::Int64Array::from(vec![ - 5488, 5489, 5489, 5489, 5489, 5489, 5489, 5489, 5489, 5489, 5489, 5731, - ])), - ), - ( - "modification_time", - Arc::new(arrow::array::TimestampMillisecondArray::from(vec![ - 1666652376000, - 1666652374000, - 1666652378000, - 1666652377000, - 1666652373000, - 1666652385000, - 1666652375000, - 1666652379000, - 1666652382000, - 1666652386000, - 1666652380000, - 1666652383000, - ])), - ), - ( - "data_change", - Arc::new(array::BooleanArray::from(vec![ - false, false, false, false, false, true, false, false, false, true, false, false, - ])), + "null_count.value", + Arc::new(array::Int64Array::from(vec![0, 0])), ), + ("min.value", Arc::new(array::Int32Array::from(vec![2, 0]))), + ("max.value", Arc::new(array::Int32Array::from(vec![4, 2]))), ]; let expected = RecordBatch::try_from_iter(expected_columns.clone()).unwrap(); assert_eq!(expected, actions); - - // For stats in checkpoints, the serialization order isn't deterministic, so we can't compare with strings - let expected_stats = vec![ - "{\"numRecords\":1,\"minValues\":{\"struct\":{\"struct_element\":\"struct_value\"},\"double\":1.234,\"decimal\":-5.678,\"nested_struct\":{\"struct_element\":{\"nested_struct_element\":\"nested_struct_value\"}},\"timestamp\":\"2022-10-24T22:59:36.177Z\",\"string\":\"string\",\"integer\":3,\"date\":\"2022-10-24\"},\"maxValues\":{\"timestamp\":\"2022-10-24T22:59:36.177Z\",\"integer\":3,\"struct\":{\"struct_element\":\"struct_value\"},\"double\":1.234,\"nested_struct\":{\"struct_element\":{\"nested_struct_element\":\"nested_struct_value\"}},\"string\":\"string\",\"date\":\"2022-10-24\",\"decimal\":-5.678},\"nullCount\":{\"struct\":{\"struct_element\":0},\"double\":0,\"array\":0,\"integer\":0,\"date\":0,\"map\":0,\"struct_of_array_of_map\":{\"struct_element\":0},\"boolean\":0,\"null\":1,\"decimal\":0,\"binary\":0,\"timestamp\":0,\"nested_struct\":{\"struct_element\":{\"nested_struct_element\":0}},\"string\":0}}", - "{\"numRecords\":1,\"minValues\":{\"integer\":1,\"timestamp\":\"2022-10-24T22:59:34.067Z\",\"nested_struct\":{\"struct_element\":{\"nested_struct_element\":\"nested_struct_value\"}},\"struct\":{\"struct_element\":\"struct_value\"},\"string\":\"string\",\"double\":1.234,\"date\":\"2022-10-24\",\"decimal\":-5.678},\"maxValues\":{\"decimal\":-5.678,\"timestamp\":\"2022-10-24T22:59:34.067Z\",\"nested_struct\":{\"struct_element\":{\"nested_struct_element\":\"nested_struct_value\"}},\"integer\":1,\"double\":1.234,\"string\":\"string\",\"struct\":{\"struct_element\":\"struct_value\"},\"date\":\"2022-10-24\"},\"nullCount\":{\"boolean\":0,\"string\":0,\"struct\":{\"struct_element\":0},\"map\":0,\"double\":0,\"array\":0,\"null\":1,\"nested_struct\":{\"struct_element\":{\"nested_struct_element\":0}},\"binary\":0,\"decimal\":0,\"date\":0,\"timestamp\":0,\"struct_of_array_of_map\":{\"struct_element\":0},\"integer\":0}}", - "{\"numRecords\":1,\"minValues\":{\"integer\":5,\"decimal\":-5.678,\"nested_struct\":{\"struct_element\":{\"nested_struct_element\":\"nested_struct_value\"}},\"string\":\"string\",\"timestamp\":\"2022-10-24T22:59:38.358Z\",\"double\":1.234,\"date\":\"2022-10-24\",\"struct\":{\"struct_element\":\"struct_value\"}},\"maxValues\":{\"timestamp\":\"2022-10-24T22:59:38.358Z\",\"integer\":5,\"decimal\":-5.678,\"date\":\"2022-10-24\",\"string\":\"string\",\"nested_struct\":{\"struct_element\":{\"nested_struct_element\":\"nested_struct_value\"}},\"double\":1.234,\"struct\":{\"struct_element\":\"struct_value\"}},\"nullCount\":{\"null\":1,\"struct_of_array_of_map\":{\"struct_element\":0},\"binary\":0,\"string\":0,\"double\":0,\"integer\":0,\"date\":0,\"timestamp\":0,\"map\":0,\"array\":0,\"nested_struct\":{\"struct_element\":{\"nested_struct_element\":0}},\"decimal\":0,\"struct\":{\"struct_element\":0},\"boolean\":0}}", - "{\"numRecords\":1,\"minValues\":{\"string\":\"string\",\"double\":1.234,\"integer\":4,\"nested_struct\":{\"struct_element\":{\"nested_struct_element\":\"nested_struct_value\"}},\"timestamp\":\"2022-10-24T22:59:37.235Z\",\"date\":\"2022-10-24\",\"decimal\":-5.678,\"struct\":{\"struct_element\":\"struct_value\"}},\"maxValues\":{\"timestamp\":\"2022-10-24T22:59:37.235Z\",\"nested_struct\":{\"struct_element\":{\"nested_struct_element\":\"nested_struct_value\"}},\"date\":\"2022-10-24\",\"string\":\"string\",\"struct\":{\"struct_element\":\"struct_value\"},\"double\":1.234,\"integer\":4,\"decimal\":-5.678},\"nullCount\":{\"nested_struct\":{\"struct_element\":{\"nested_struct_element\":0}},\"struct_of_array_of_map\":{\"struct_element\":0},\"binary\":0,\"map\":0,\"string\":0,\"double\":0,\"date\":0,\"null\":1,\"integer\":0,\"boolean\":0,\"decimal\":0,\"timestamp\":0,\"struct\":{\"struct_element\":0},\"array\":0}}", - "{\"numRecords\":1,\"minValues\":{\"struct\":{\"struct_element\":\"struct_value\"},\"decimal\":-5.678,\"timestamp\":\"2022-10-24T22:59:32.846Z\",\"string\":\"string\",\"integer\":0,\"double\":1.234,\"date\":\"2022-10-24\",\"nested_struct\":{\"struct_element\":{\"nested_struct_element\":\"nested_struct_value\"}}},\"maxValues\":{\"string\":\"string\",\"struct\":{\"struct_element\":\"struct_value\"},\"nested_struct\":{\"struct_element\":{\"nested_struct_element\":\"nested_struct_value\"}},\"date\":\"2022-10-24\",\"decimal\":-5.678,\"integer\":0,\"double\":1.234,\"timestamp\":\"2022-10-24T22:59:32.846Z\"},\"nullCount\":{\"timestamp\":0,\"struct\":{\"struct_element\":0},\"double\":0,\"binary\":0,\"string\":0,\"boolean\":0,\"decimal\":0,\"null\":1,\"integer\":0,\"array\":0,\"nested_struct\":{\"struct_element\":{\"nested_struct_element\":0}},\"struct_of_array_of_map\":{\"struct_element\":0},\"date\":0,\"map\":0}}", - "{\"numRecords\":1,\"minValues\":{\"struct\":{\"struct_element\":\"struct_value\"},\"nested_struct\":{\"struct_element\":{\"nested_struct_element\":\"nested_struct_value\"}},\"date\":\"2022-10-24\",\"timestamp\":\"2022-10-24T22:59:44.639Z\",\"double\":1.234,\"integer\":10,\"string\":\"string\",\"decimal\":-5.678},\"maxValues\":{\"struct\":{\"struct_element\":\"struct_value\"},\"date\":\"2022-10-24\",\"integer\":10,\"decimal\":-5.678,\"string\":\"string\",\"double\":1.234,\"timestamp\":\"2022-10-24T22:59:44.639Z\",\"nested_struct\":{\"struct_element\":{\"nested_struct_element\":\"nested_struct_value\"}}},\"nullCount\":{\"binary\":0,\"nested_struct\":{\"struct_element\":{\"nested_struct_element\":0}},\"timestamp\":0,\"boolean\":0,\"string\":0,\"struct_of_array_of_map\":{\"struct_element\":0},\"map\":0,\"null\":1,\"decimal\":0,\"array\":0,\"struct\":{\"struct_element\":0},\"double\":0,\"integer\":0,\"date\":0}}", - "{\"numRecords\":1,\"minValues\":{\"date\":\"2022-10-24\",\"timestamp\":\"2022-10-24T22:59:35.117Z\",\"integer\":2,\"struct\":{\"struct_element\":\"struct_value\"},\"decimal\":-5.678,\"nested_struct\":{\"struct_element\":{\"nested_struct_element\":\"nested_struct_value\"}},\"string\":\"string\",\"double\":1.234},\"maxValues\":{\"integer\":2,\"timestamp\":\"2022-10-24T22:59:35.117Z\",\"struct\":{\"struct_element\":\"struct_value\"},\"decimal\":-5.678,\"double\":1.234,\"nested_struct\":{\"struct_element\":{\"nested_struct_element\":\"nested_struct_value\"}},\"date\":\"2022-10-24\",\"string\":\"string\"},\"nullCount\":{\"struct\":{\"struct_element\":0},\"double\":0,\"map\":0,\"array\":0,\"boolean\":0,\"integer\":0,\"date\":0,\"binary\":0,\"decimal\":0,\"string\":0,\"nested_struct\":{\"struct_element\":{\"nested_struct_element\":0}},\"struct_of_array_of_map\":{\"struct_element\":0},\"null\":1,\"timestamp\":0}}", - "{\"numRecords\":1,\"minValues\":{\"timestamp\":\"2022-10-24T22:59:39.489Z\",\"string\":\"string\",\"struct\":{\"struct_element\":\"struct_value\"},\"date\":\"2022-10-24\",\"integer\":6,\"double\":1.234,\"decimal\":-5.678,\"nested_struct\":{\"struct_element\":{\"nested_struct_element\":\"nested_struct_value\"}}},\"maxValues\":{\"timestamp\":\"2022-10-24T22:59:39.489Z\",\"decimal\":-5.678,\"integer\":6,\"nested_struct\":{\"struct_element\":{\"nested_struct_element\":\"nested_struct_value\"}},\"string\":\"string\",\"struct\":{\"struct_element\":\"struct_value\"},\"date\":\"2022-10-24\",\"double\":1.234},\"nullCount\":{\"double\":0,\"decimal\":0,\"boolean\":0,\"struct\":{\"struct_element\":0},\"date\":0,\"nested_struct\":{\"struct_element\":{\"nested_struct_element\":0}},\"timestamp\":0,\"array\":0,\"binary\":0,\"map\":0,\"null\":1,\"struct_of_array_of_map\":{\"struct_element\":0},\"string\":0,\"integer\":0}}", - "{\"numRecords\":1,\"minValues\":{\"timestamp\":\"2022-10-24T22:59:41.637Z\",\"decimal\":-5.678,\"date\":\"2022-10-24\",\"integer\":8,\"string\":\"string\",\"double\":1.234,\"struct\":{\"struct_element\":\"struct_value\"},\"nested_struct\":{\"struct_element\":{\"nested_struct_element\":\"nested_struct_value\"}}},\"maxValues\":{\"date\":\"2022-10-24\",\"nested_struct\":{\"struct_element\":{\"nested_struct_element\":\"nested_struct_value\"}},\"decimal\":-5.678,\"double\":1.234,\"integer\":8,\"timestamp\":\"2022-10-24T22:59:41.637Z\",\"string\":\"string\",\"struct\":{\"struct_element\":\"struct_value\"}},\"nullCount\":{\"array\":0,\"nested_struct\":{\"struct_element\":{\"nested_struct_element\":0}},\"timestamp\":0,\"string\":0,\"struct_of_array_of_map\":{\"struct_element\":0},\"map\":0,\"integer\":0,\"binary\":0,\"double\":0,\"null\":1,\"date\":0,\"decimal\":0,\"boolean\":0,\"struct\":{\"struct_element\":0}}}", - "{\"numRecords\":1,\"minValues\":{\"timestamp\":\"2022-10-24T22:59:46.083Z\",\"date\":\"2022-10-24\",\"struct\":{\"struct_element\":\"struct_value\"},\"integer\":11,\"string\":\"string\",\"double\":1.234,\"decimal\":-5.678,\"nested_struct\":{\"struct_element\":{\"nested_struct_element\":\"nested_struct_value\"}}},\"maxValues\":{\"decimal\":-5.678,\"nested_struct\":{\"struct_element\":{\"nested_struct_element\":\"nested_struct_value\"}},\"date\":\"2022-10-24\",\"timestamp\":\"2022-10-24T22:59:46.083Z\",\"string\":\"string\",\"struct\":{\"struct_element\":\"struct_value\"},\"double\":1.234,\"integer\":11},\"nullCount\":{\"nested_struct\":{\"struct_element\":{\"nested_struct_element\":0}},\"binary\":0,\"date\":0,\"double\":0,\"map\":0,\"null\":1,\"struct_of_array_of_map\":{\"struct_element\":0},\"string\":0,\"boolean\":0,\"integer\":0,\"struct\":{\"struct_element\":0},\"decimal\":0,\"timestamp\":0,\"array\":0}}", - "{\"numRecords\":1,\"minValues\":{\"date\":\"2022-10-24\",\"nested_struct\":{\"struct_element\":{\"nested_struct_element\":\"nested_struct_value\"}},\"decimal\":-5.678,\"timestamp\":\"2022-10-24T22:59:40.572Z\",\"struct\":{\"struct_element\":\"struct_value\"},\"string\":\"string\",\"integer\":7,\"double\":1.234},\"maxValues\":{\"integer\":7,\"nested_struct\":{\"struct_element\":{\"nested_struct_element\":\"nested_struct_value\"}},\"struct\":{\"struct_element\":\"struct_value\"},\"double\":1.234,\"decimal\":-5.678,\"string\":\"string\",\"timestamp\":\"2022-10-24T22:59:40.572Z\",\"date\":\"2022-10-24\"},\"nullCount\":{\"double\":0,\"binary\":0,\"boolean\":0,\"timestamp\":0,\"array\":0,\"null\":1,\"nested_struct\":{\"struct_element\":{\"nested_struct_element\":0}},\"struct\":{\"struct_element\":0},\"struct_of_array_of_map\":{\"struct_element\":0},\"map\":0,\"integer\":0,\"date\":0,\"string\":0,\"decimal\":0}}", - "{\"numRecords\":1,\"minValues\":{\"double\":1.234,\"string\":\"string\",\"date\":\"2022-10-24\",\"struct\":{\"struct_element\":\"struct_value\"},\"nested_struct\":{\"struct_element\":{\"nested_struct_element\":\"nested_struct_value\"}},\"timestamp\":\"2022-10-24T22:59:42.908Z\",\"integer\":9,\"decimal\":-5.678,\"new_column\":0},\"maxValues\":{\"date\":\"2022-10-24\",\"nested_struct\":{\"struct_element\":{\"nested_struct_element\":\"nested_struct_value\"}},\"decimal\":-5.678,\"integer\":9,\"double\":1.234,\"string\":\"string\",\"struct\":{\"struct_element\":\"struct_value\"},\"timestamp\":\"2022-10-24T22:59:42.908Z\",\"new_column\":0},\"nullCount\":{\"double\":0,\"new_column\":0,\"struct_of_array_of_map\":{\"struct_element\":0},\"date\":0,\"map\":0,\"decimal\":0,\"null\":1,\"timestamp\":0,\"struct\":{\"struct_element\":0},\"boolean\":0,\"nested_struct\":{\"struct_element\":{\"nested_struct_element\":0}},\"binary\":0,\"integer\":0,\"array\":0,\"string\":0}}", - ]; - - for (maybe_result_stat, expected_stat) in result_stats.iter().zip(expected_stats.into_iter()) { - assert_json_equal(maybe_result_stat.unwrap(), expected_stat); - } } #[tokio::test] -async fn test_parsed_stats() { +async fn test_only_struct_stats() { // test table with no json stats let path = "./tests/data/delta-1.2.1-only-struct-stats"; let mut table = deltalake::open_table(path).await.unwrap(); table.load_version(1).await.unwrap(); - let actions = table.get_state().add_actions_table(true, true).unwrap(); + let actions = table.get_state().add_actions_table(true).unwrap(); let expected_columns: Vec<(&str, ArrayRef)> = vec![ ( @@ -460,10 +339,92 @@ async fn test_parsed_stats() { .collect::>() ); assert_eq!(expected, actions); + + let actions = table.get_state().add_actions_table(false).unwrap(); + // For brevity, just checking a few nested columns in stats + + assert_eq!( + actions + .get_field_at_path(&vec![ + "null_count", + "nested_struct", + "struct_element", + "nested_struct_element" + ]) + .unwrap() + .as_any() + .downcast_ref::() + .unwrap(), + &array::Int64Array::from(vec![0]), + ); + + assert_eq!( + actions + .get_field_at_path(&vec![ + "min", + "nested_struct", + "struct_element", + "nested_struct_element" + ]) + .unwrap() + .as_any() + .downcast_ref::() + .unwrap(), + &array::StringArray::from(vec!["nested_struct_value"]), + ); + + assert_eq!( + actions + .get_field_at_path(&vec![ + "max", + "nested_struct", + "struct_element", + "nested_struct_element" + ]) + .unwrap() + .as_any() + .downcast_ref::() + .unwrap(), + &array::StringArray::from(vec!["nested_struct_value"]), + ); + + assert_eq!( + actions + .get_field_at_path(&vec![ + "null_count", + "struct_of_array_of_map", + "struct_element" + ]) + .unwrap() + .as_any() + .downcast_ref::() + .unwrap(), + &array::Int64Array::from(vec![0]) + ); +} + +/// Trait to make it easier to access nested fields +trait NestedTabular { + fn get_field_at_path(&self, path: &[&str]) -> Option; } -fn assert_json_equal(left: &str, right: &str) { - let left_value: serde_json::Value = serde_json::from_str(left).unwrap(); - let right_value: serde_json::Value = serde_json::from_str(right).unwrap(); - assert_eq!(left_value, right_value); +impl NestedTabular for RecordBatch { + fn get_field_at_path(&self, path: &[&str]) -> Option { + // First, get array in the batch + let (first_key, remainder) = path.split_at(1); + let mut col = self.column(self.schema().column_with_name(first_key[0])?.0); + + if remainder.is_empty() { + return Some(Arc::clone(col)); + } + + for segment in remainder { + col = col + .as_any() + .downcast_ref::()? + .column_by_name(segment)?; + } + + Some(Arc::clone(col)) + } } From 33bfdac11b80c52fb0b862b302792a27f43210a1 Mon Sep 17 00:00:00 2001 From: Will Jones Date: Tue, 3 Jan 2023 20:26:57 -0800 Subject: [PATCH 08/13] feat: also handle tags --- rust/src/table_state_arrow.rs | 130 +++++++++++++++++++++++++++++---- rust/tests/add_actions_test.rs | 18 +++++ 2 files changed, 134 insertions(+), 14 deletions(-) diff --git a/rust/src/table_state_arrow.rs b/rust/src/table_state_arrow.rs index d4248543f3..5276c166d1 100644 --- a/rust/src/table_state_arrow.rs +++ b/rust/src/table_state_arrow.rs @@ -17,7 +17,7 @@ use arrow::datatypes::{ }; use itertools::Itertools; use std::borrow::Cow; -use std::collections::{HashMap, VecDeque}; +use std::collections::{HashMap, HashSet, VecDeque}; use std::sync::Arc; impl DeltaTableState { @@ -57,7 +57,58 @@ impl DeltaTableState { (Cow::Borrowed("modification_time"), Arc::new(mod_time)), (Cow::Borrowed("data_change"), Arc::new(data_change)), ]; + + let metadata = self.current_metadata().ok_or(DeltaTableError::NoMetadata)?; + + if !metadata.partition_columns.is_empty() { + let partition_cols_batch = self.partition_columns_as_batch(flatten)?; + arrays.extend( + partition_cols_batch + .schema() + .fields + .iter() + .map(|field| Cow::Owned(field.name().clone())) + .zip(partition_cols_batch.columns().iter().map(Arc::clone)), + ) + } + + if self.files().iter().any(|add| add.stats.is_some()) { + let stats = self.stats_as_batch(flatten)?; + arrays.extend( + stats + .schema() + .fields + .iter() + .map(|field| Cow::Owned(field.name().clone())) + .zip(stats.columns().iter().map(Arc::clone)), + ); + } + + if self.files().iter().any(|add| { + add.tags + .as_ref() + .map(|tags| !tags.is_empty()) + .unwrap_or(false) + }) { + let tags = self.tags_as_batch(flatten)?; + arrays.extend( + tags.schema() + .fields + .iter() + .map(|field| Cow::Owned(field.name().clone())) + .zip(tags.columns().iter().map(Arc::clone)), + ); + } + + Ok(arrow::record_batch::RecordBatch::try_from_iter(arrays)?) + } + + fn partition_columns_as_batch( + &self, + flatten: bool, + ) -> Result { let metadata = self.current_metadata().ok_or(DeltaTableError::NoMetadata)?; + let partition_column_types: Vec = metadata .partition_columns .iter() @@ -129,21 +180,72 @@ impl DeltaTableState { } }; - if self.files().iter().any(|add| add.stats.is_some()) { - let stats = self.stats_as_batch(flatten)?; - arrays.extend( - stats - .schema() - .fields - .iter() - .map(|field| Cow::Owned(field.name().clone())) - .zip(stats.columns().iter().map(Arc::clone)), - ); - } + Ok(arrow::record_batch::RecordBatch::try_from_iter( + partition_columns, + )?) + } - arrays.extend(partition_columns.into_iter()); + fn tags_as_batch( + &self, + flatten: bool, + ) -> Result { + let tag_keys: HashSet<&str> = self + .files() + .iter() + .flat_map(|add| add.tags.as_ref().map(|tags| tags.keys())) + .flatten() + .map(|key| key.as_str()) + .collect(); + let mut builder_map: HashMap<&str, arrow::array::StringBuilder> = tag_keys + .iter() + .map(|&key| { + ( + key, + arrow::array::StringBuilder::with_capacity(self.files().len(), 64), + ) + }) + .collect(); + + for add in self.files() { + for &key in &tag_keys { + if let Some(value) = add + .tags + .as_ref() + .and_then(|tags| tags.get(key)) + .and_then(|val| val.as_deref()) + { + builder_map.get_mut(key).unwrap().append_value(value); + } else { + builder_map.get_mut(key).unwrap().append_null(); + } + } + } - Ok(arrow::record_batch::RecordBatch::try_from_iter(arrays)?) + let mut arrays: Vec<(&str, ArrayRef)> = builder_map + .into_iter() + .map(|(key, mut builder)| (key, Arc::new(builder.finish()) as ArrayRef)) + .collect(); + // Sorted for consistent order + arrays.sort_by(|(key1, _), (key2, _)| key1.cmp(key2)); + if flatten { + Ok(arrow::record_batch::RecordBatch::try_from_iter( + arrays + .into_iter() + .map(|(key, array)| (format!("tags.{}", key), array)), + )?) + } else { + Ok(arrow::record_batch::RecordBatch::try_from_iter(vec![( + "tags", + Arc::new(StructArray::from( + arrays + .into_iter() + .map(|(key, array)| { + (Field::new(key, array.data_type().clone(), true), array) + }) + .collect_vec(), + )) as ArrayRef, + )])?) + } } fn stats_as_batch( diff --git a/rust/tests/add_actions_test.rs b/rust/tests/add_actions_test.rs index b61accbc53..c7c7462c2b 100644 --- a/rust/tests/add_actions_test.rs +++ b/rust/tests/add_actions_test.rs @@ -321,6 +321,14 @@ async fn test_only_struct_stats() { "null_count.struct_of_array_of_map.struct_element", Arc::new(array::Int64Array::from(vec![0])), ), + ( + "tags.INSERTION_TIME", + Arc::new(array::StringArray::from(vec!["1666652373000000"])), + ), + ( + "tags.OPTIMIZE_TARGET_SIZE", + Arc::new(array::StringArray::from(vec!["268435456"])), + ), ]; let expected = RecordBatch::try_from_iter(expected_columns.clone()).unwrap(); @@ -401,6 +409,16 @@ async fn test_only_struct_stats() { .unwrap(), &array::Int64Array::from(vec![0]) ); + + assert_eq!( + actions + .get_field_at_path(&vec!["tags", "OPTIMIZE_TARGET_SIZE"]) + .unwrap() + .as_any() + .downcast_ref::() + .unwrap(), + &array::StringArray::from(vec!["268435456"]) + ); } /// Trait to make it easier to access nested fields From ac8fb8af4894197851da11903a6307b3a817159a Mon Sep 17 00:00:00 2001 From: Will Jones Date: Tue, 3 Jan 2023 21:03:50 -0800 Subject: [PATCH 09/13] doc: document new method --- python/deltalake/table.py | 57 ++++++++++------------------------ rust/src/lib.rs | 1 + rust/src/table_state_arrow.rs | 32 +++++++++++++++++-- rust/tests/add_actions_test.rs | 2 +- 4 files changed, 49 insertions(+), 43 deletions(-) diff --git a/python/deltalake/table.py b/python/deltalake/table.py index d7191e90f9..f0f27798b7 100644 --- a/python/deltalake/table.py +++ b/python/deltalake/table.py @@ -451,12 +451,12 @@ def get_add_actions_df(self, flatten: bool = False) -> pyarrow.RecordBatch: Add actions represent the files that currently make up the table. This data is a low-level representation parsed from the transaction log. - :param flatten: whether to use a flattened schema in output (default False). - When False, all partition values are in a single map column, - `partition_values`. When True, each partition column becomes it's own - field, with the prefix `partition_` in the name. + :param flatten: whether to flatten the schema. Partition values columns are + given the prefix `partition.`, statistics (null_count, min, and max) are + given the prefix `null_count.`, `min.`, and `max.`, and tags the + prefix `tags.`. Nested field names are concatenated with `.`. - :returns: a PyArrow Table containing the add action data. + :returns: a PyArrow RecordBatch containing the add action data. Examples: @@ -465,38 +465,15 @@ def get_add_actions_df(self, flatten: bool = False) -> pyarrow.RecordBatch: >>> data = pa.table({"x": [1, 2, 3], "y": [4, 5, 6]}) >>> write_deltalake("tmp", data, partition_by=["x"]) >>> dt = DeltaTable("tmp") - >>> dt.get_add_actions_df() - pyarrow.Table - path: string - size_bytes: int64 - modification_time: timestamp[ms] - data_change: bool - stats: string - partition_values: map - child 0, entries: struct not null - child 0, key: string not null - child 1, value: string - ---- - path: [["x=2/0-6e6952dd-ed03-4488-810e-ebe76a1a1200-0.parquet","x=3/0-6e6952dd-ed03-4488-810e-ebe76a1a1200-0.parquet","x=1/0-6e6952dd-ed03-4488-810e-ebe76a1a1200-0.parquet"]] - size_bytes: [[565,565,565]] - modification_time: [[1970-01-20 08:20:49.399,1970-01-20 08:20:49.399,1970-01-20 08:20:49.399]] - data_change: [[true,true,true]] - stats: [["{"numRecords": 1, "minValues": {"y": 5}, "maxValues": {"y": 5}, "nullCount": {"y": 0}}","{"numRecords": 1, "minValues": {"y": 6}, "maxValues": {"y": 6}, "nullCount": {"y": 0}}","{"numRecords": 1, "minValues": {"y": 4}, "maxValues": {"y": 4}, "nullCount": {"y": 0}}"]] - partition_values: [[keys:["x"]values:["2"],keys:["x"]values:["3"],keys:["x"]values:["1"]]] - >>> dt.get_add_actions_df(flatten=True) - pyarrow.Table - path: string - size_bytes: int64 - modification_time: timestamp[ms] - data_change: bool - stats: string - partition_x: string - ---- - path: [["x=2/0-6e6952dd-ed03-4488-810e-ebe76a1a1200-0.parquet","x=3/0-6e6952dd-ed03-4488-810e-ebe76a1a1200-0.parquet","x=1/0-6e6952dd-ed03-4488-810e-ebe76a1a1200-0.parquet"]] - size_bytes: [[565,565,565]] - modification_time: [[1970-01-20 08:20:49.399,1970-01-20 08:20:49.399,1970-01-20 08:20:49.399]] - data_change: [[true,true,true]] - stats: [["{"numRecords": 1, "minValues": {"y": 5}, "maxValues": {"y": 5}, "nullCount": {"y": 0}}","{"numRecords": 1, "minValues": {"y": 6}, "maxValues": {"y": 6}, "nullCount": {"y": 0}}","{"numRecords": 1, "minValues": {"y": 4}, "maxValues": {"y": 4}, "nullCount": {"y": 0}}"]] - partition_x: [["2","3","1"]] - """ - return self._table.get_add_actions_df(flatten, parse_stats) + >>> dt.get_add_actions_df().to_pandas() + path size_bytes modification_time data_change partition_values num_records null_count min max + 0 x=2/0-91820cbf-f698-45fb-886d-5d5f5669530b-0.p... 565 1970-01-20 08:40:08.071 True {'x': 2} 1 {'y': 0} {'y': 5} {'y': 5} + 1 x=3/0-91820cbf-f698-45fb-886d-5d5f5669530b-0.p... 565 1970-01-20 08:40:08.071 True {'x': 3} 1 {'y': 0} {'y': 6} {'y': 6} + 2 x=1/0-91820cbf-f698-45fb-886d-5d5f5669530b-0.p... 565 1970-01-20 08:40:08.071 True {'x': 1} 1 {'y': 0} {'y': 4} {'y': 4} + >>> dt.get_add_actions_df(flatten=True).to_pandas() + path size_bytes modification_time data_change partition.x num_records null_count.y min.y max.y + 0 x=2/0-91820cbf-f698-45fb-886d-5d5f5669530b-0.p... 565 1970-01-20 08:40:08.071 True 2 1 0 5 5 + 1 x=3/0-91820cbf-f698-45fb-886d-5d5f5669530b-0.p... 565 1970-01-20 08:40:08.071 True 3 1 0 6 6 + 2 x=1/0-91820cbf-f698-45fb-886d-5d5f5669530b-0.p... 565 1970-01-20 08:40:08.071 True 1 1 0 4 4 + """ + return self._table.get_add_actions_df(flatten) diff --git a/rust/src/lib.rs b/rust/src/lib.rs index 529f98cbd2..d90692470f 100644 --- a/rust/src/lib.rs +++ b/rust/src/lib.rs @@ -70,6 +70,7 @@ #![deny(warnings)] #![deny(missing_docs)] +#![allow(rustdoc::invalid_html_tags)] #[cfg(all(feature = "parquet", feature = "parquet2"))] compile_error!( diff --git a/rust/src/table_state_arrow.rs b/rust/src/table_state_arrow.rs index 5276c166d1..56defaaf2b 100644 --- a/rust/src/table_state_arrow.rs +++ b/rust/src/table_state_arrow.rs @@ -1,4 +1,6 @@ //! Methods to get Delta Table state in Arrow structures +//! +//! See [crate::table_state::DeltaTableState]. use crate::action::{ColumnCountStat, ColumnValueStat, Stats}; use crate::table_state::DeltaTableState; @@ -21,7 +23,33 @@ use std::collections::{HashMap, HashSet, VecDeque}; use std::sync::Arc; impl DeltaTableState { - /// Get the add actions as a RecordBatch + /// Get an [arrow::record_batch::RecordBatch] containing add action data. + /// + /// # Arguments + /// + /// * `flatten` - whether to flatten the schema. Partition values columns are + /// given the prefix `partition.`, statistics (null_count, min, and max) are + /// given the prefix `null_count.`, `min.`, and `max.`, and tags the + /// prefix `tags.`. Nested field names are concatenated with `.`. + /// + /// # Data schema + /// + /// Each row represents a file that is a part of the selected tables state. + /// + /// * `path` (String): relative or absolute to a file. + /// * `size_bytes` (Int64): size of file in bytes. + /// * `modification_time` (Millisecond Timestamp): time the file was created. + /// * `data_change` (Boolean): false if data represents data moved from other files + /// in the same transaction. + /// * `partition.{partition column name}` (matches column type): value of + /// partition the file corresponds to. + /// * `null_count.{col_name}` (Int64): number of null values for column in + /// this file. + /// * `min.{col_name}` (matches column type): minimum value of column in file + /// (if available). + /// * `max.{col_name}` (matches column type): maximum value of column in file + /// (if available). + /// * `tag.{tag_key}` (String): value of a metadata tag for the file. pub fn add_actions_table( &self, flatten: bool, @@ -160,7 +188,7 @@ impl DeltaTableState { .into_iter() .zip(metadata.partition_columns.iter()) .map(|(array, name)| { - let name: Cow = Cow::Owned(format!("partition_{}", name)); + let name: Cow = Cow::Owned(format!("partition.{}", name)); (name, array) }) .collect() diff --git a/rust/tests/add_actions_test.rs b/rust/tests/add_actions_test.rs index c7c7462c2b..0ba51b6bbb 100644 --- a/rust/tests/add_actions_test.rs +++ b/rust/tests/add_actions_test.rs @@ -43,7 +43,7 @@ async fn test_with_partitions() { 1627990384000, 1627990384000 ]))), ("data_change", Arc::new(array::BooleanArray::from(vec![true, true]))), - ("partition_k", Arc::new(array::StringArray::from(vec![Some("A"), None]))), + ("partition.k", Arc::new(array::StringArray::from(vec![Some("A"), None]))), ]; let expected = RecordBatch::try_from_iter(expected_columns.clone()).unwrap(); From 1a29210cec1a5145f839caa22a1a0e42fd9fda18 Mon Sep 17 00:00:00 2001 From: Will Jones Date: Tue, 3 Jan 2023 21:17:37 -0800 Subject: [PATCH 10/13] format --- python/deltalake/table.py | 2 +- rust/src/table_state_arrow.rs | 16 ++++++++-------- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/python/deltalake/table.py b/python/deltalake/table.py index f0f27798b7..3549bdc048 100644 --- a/python/deltalake/table.py +++ b/python/deltalake/table.py @@ -453,7 +453,7 @@ def get_add_actions_df(self, flatten: bool = False) -> pyarrow.RecordBatch: :param flatten: whether to flatten the schema. Partition values columns are given the prefix `partition.`, statistics (null_count, min, and max) are - given the prefix `null_count.`, `min.`, and `max.`, and tags the + given the prefix `null_count.`, `min.`, and `max.`, and tags the prefix `tags.`. Nested field names are concatenated with `.`. :returns: a PyArrow RecordBatch containing the add action data. diff --git a/rust/src/table_state_arrow.rs b/rust/src/table_state_arrow.rs index 56defaaf2b..9bebed511f 100644 --- a/rust/src/table_state_arrow.rs +++ b/rust/src/table_state_arrow.rs @@ -1,5 +1,5 @@ //! Methods to get Delta Table state in Arrow structures -//! +//! //! See [crate::table_state::DeltaTableState]. use crate::action::{ColumnCountStat, ColumnValueStat, Stats}; @@ -24,24 +24,24 @@ use std::sync::Arc; impl DeltaTableState { /// Get an [arrow::record_batch::RecordBatch] containing add action data. - /// + /// /// # Arguments - /// + /// /// * `flatten` - whether to flatten the schema. Partition values columns are /// given the prefix `partition.`, statistics (null_count, min, and max) are - /// given the prefix `null_count.`, `min.`, and `max.`, and tags the + /// given the prefix `null_count.`, `min.`, and `max.`, and tags the /// prefix `tags.`. Nested field names are concatenated with `.`. - /// + /// /// # Data schema - /// + /// /// Each row represents a file that is a part of the selected tables state. - /// + /// /// * `path` (String): relative or absolute to a file. /// * `size_bytes` (Int64): size of file in bytes. /// * `modification_time` (Millisecond Timestamp): time the file was created. /// * `data_change` (Boolean): false if data represents data moved from other files /// in the same transaction. - /// * `partition.{partition column name}` (matches column type): value of + /// * `partition.{partition column name}` (matches column type): value of /// partition the file corresponds to. /// * `null_count.{col_name}` (Int64): number of null values for column in /// this file. From 53b939530330ca598be1f2db3e92ee06910db7a6 Mon Sep 17 00:00:00 2001 From: Will Jones Date: Wed, 4 Jan 2023 19:07:32 -0800 Subject: [PATCH 11/13] eliminate one new dependency --- rust/Cargo.toml | 1 - rust/src/table_state_arrow.rs | 49 +++++++++-------------------------- 2 files changed, 12 insertions(+), 38 deletions(-) diff --git a/rust/Cargo.toml b/rust/Cargo.toml index d10afbff62..7f823bb96c 100644 --- a/rust/Cargo.toml +++ b/rust/Cargo.toml @@ -36,7 +36,6 @@ tokio = { version = "1", features = ["macros", "rt", "parking_lot"] } regex = "1" uuid = { version = "1", features = ["serde", "v4"] } url = "2.3" -num = { version = "0.4", default-features = false, features = ["std"] } # S3 lock client rusoto_core = { version = "0.48", default-features = false, optional = true } diff --git a/rust/src/table_state_arrow.rs b/rust/src/table_state_arrow.rs index 9bebed511f..21a3f9a7d2 100644 --- a/rust/src/table_state_arrow.rs +++ b/rust/src/table_state_arrow.rs @@ -9,14 +9,12 @@ use crate::DeltaTableError; use crate::SchemaDataType; use crate::SchemaTypeStruct; use arrow::array::{ - ArrayRef, BinaryArray, BooleanArray, Date32Array, Int64Array, PrimitiveArray, StringArray, + ArrayRef, BinaryArray, BooleanArray, Date32Array, Float64Array, Int64Array, StringArray, StructArray, TimestampMicrosecondArray, TimestampMillisecondArray, }; use arrow::compute::cast; use arrow::compute::kernels::cast_utils::Parser; -use arrow::datatypes::{ - ArrowPrimitiveType, DataType, Date32Type, Field, TimeUnit, TimestampMicrosecondType, -}; +use arrow::datatypes::{DataType, Date32Type, Field, TimeUnit, TimestampMicrosecondType}; use itertools::Itertools; use std::borrow::Cow; use std::collections::{HashMap, HashSet, VecDeque}; @@ -556,12 +554,16 @@ fn json_value_to_array_general<'a>( .map(|value| value.as_bool()) .collect::(), )), - DataType::Int64 => json_value_to_array::(values), - DataType::Int32 => json_value_to_array::(values), - DataType::Int16 => json_value_to_array::(values), - DataType::Int8 => json_value_to_array::(values), - DataType::Float32 => json_value_to_array::(values), - DataType::Float64 => json_value_to_array::(values), + DataType::Int64 | DataType::Int32 | DataType::Int16 | DataType::Int8 => { + let i64_arr: ArrayRef = + Arc::new(values.map(|value| value.as_i64()).collect::()); + Ok(arrow::compute::cast(&i64_arr, datatype)?) + } + DataType::Float32 | DataType::Float64 | DataType::Decimal128(_, _) => { + let f64_arr: ArrayRef = + Arc::new(values.map(|value| value.as_f64()).collect::()); + Ok(arrow::compute::cast(&f64_arr, datatype)?) + } DataType::Utf8 => Ok(Arc::new( values.map(|value| value.as_str()).collect::(), )), @@ -580,33 +582,6 @@ fn json_value_to_array_general<'a>( .map(|value| value.as_str().and_then(Date32Type::parse)) .collect::>>(), ))), - DataType::Decimal128(_, _) => { - let float_arr = json_value_to_array::(values)?; - Ok(arrow::compute::cast(&float_arr, datatype)?) - } _ => Err(DeltaTableError::Generic("Invalid datatype".to_string())), } } - -fn json_value_to_array<'a, T>( - values: impl Iterator, -) -> Result -where - T: ArrowPrimitiveType, - T::Native: num::NumCast, -{ - // Adapted from - Ok(Arc::new( - values - .map(|value| -> Option { - if value.is_i64() { - value.as_i64().and_then(num::cast::cast) - } else if value.is_u64() { - value.as_u64().and_then(num::cast::cast) - } else { - value.as_f64().and_then(num::cast::cast) - } - }) - .collect::>(), - )) -} From a9b0b2b1cae44c2fe69a8c37b65a95d455121b7d Mon Sep 17 00:00:00 2001 From: Will Jones Date: Mon, 9 Jan 2023 18:07:06 -0800 Subject: [PATCH 12/13] fix test --- python/tests/test_table_read.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/python/tests/test_table_read.py b/python/tests/test_table_read.py index 3266c29849..277bd84d39 100644 --- a/python/tests/test_table_read.py +++ b/python/tests/test_table_read.py @@ -295,9 +295,9 @@ def test_add_actions_table(flatten: bool): ) if flatten: - partition_year = actions_df["partition_year"] - partition_month = actions_df["partition_month"] - partition_day = actions_df["partition_day"] + partition_year = actions_df["partition.year"] + partition_month = actions_df["partition.month"] + partition_day = actions_df["partition.day"] else: partition_year = actions_df["partition_values"].field("year") partition_month = actions_df["partition_values"].field("month") From 8b5bab99f86e88adcb595203c5203e4b5a74ecb2 Mon Sep 17 00:00:00 2001 From: Will Jones Date: Tue, 10 Jan 2023 20:18:09 -0800 Subject: [PATCH 13/13] drop the _df --- python/deltalake/table.py | 4 ++-- python/src/lib.rs | 2 +- python/tests/test_table_read.py | 2 +- rust/src/table_state.rs | 2 ++ 4 files changed, 6 insertions(+), 4 deletions(-) diff --git a/python/deltalake/table.py b/python/deltalake/table.py index 3549bdc048..b93c27f6cb 100644 --- a/python/deltalake/table.py +++ b/python/deltalake/table.py @@ -444,7 +444,7 @@ def __stringify_partition_values( out.append((field, op, str_value)) return out - def get_add_actions_df(self, flatten: bool = False) -> pyarrow.RecordBatch: + def get_add_actions(self, flatten: bool = False) -> pyarrow.RecordBatch: """ Return a dataframe with all current add actions. @@ -476,4 +476,4 @@ def get_add_actions_df(self, flatten: bool = False) -> pyarrow.RecordBatch: 1 x=3/0-91820cbf-f698-45fb-886d-5d5f5669530b-0.p... 565 1970-01-20 08:40:08.071 True 3 1 0 6 6 2 x=1/0-91820cbf-f698-45fb-886d-5d5f5669530b-0.p... 565 1970-01-20 08:40:08.071 True 1 1 0 4 4 """ - return self._table.get_add_actions_df(flatten) + return self._table.get_add_actions(flatten) diff --git a/python/src/lib.rs b/python/src/lib.rs index 6e41a5157d..6fc786be05 100644 --- a/python/src/lib.rs +++ b/python/src/lib.rs @@ -454,7 +454,7 @@ impl RawDeltaTable { Ok(()) } - pub fn get_add_actions_df(&self) -> PyResult> { + pub fn get_add_actions(&self, flatten: bool) -> PyResult> { Ok(PyArrowType( self._table .get_state() diff --git a/python/tests/test_table_read.py b/python/tests/test_table_read.py index 277bd84d39..164a32c008 100644 --- a/python/tests/test_table_read.py +++ b/python/tests/test_table_read.py @@ -273,7 +273,7 @@ def test_history_partitioned_table_metadata(): def test_add_actions_table(flatten: bool): table_path = "../rust/tests/data/delta-0.8.0-partitioned" dt = DeltaTable(table_path) - actions_df = dt.get_add_actions_df(flatten) + actions_df = dt.get_add_actions(flatten) # RecordBatch doesn't have a sort_by method yet actions_df = pa.Table.from_batches([actions_df]).sort_by("path").to_batches()[0] diff --git a/rust/src/table_state.rs b/rust/src/table_state.rs index 23f7812395..9c8a3da089 100644 --- a/rust/src/table_state.rs +++ b/rust/src/table_state.rs @@ -379,6 +379,8 @@ impl DeltaTableState { Ok(actions) } } + +#[cfg(test)] mod tests { use super::*; use pretty_assertions::assert_eq;