diff --git a/Cargo.toml b/Cargo.toml index 4358d912ff..d0660a23bc 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -26,7 +26,7 @@ debug = true debug = "line-tables-only" [workspace.dependencies] -delta_kernel = { version = "0.6.0", features = ["default-engine"] } +delta_kernel = { version = "=0.6.0", features = ["default-engine"] } #delta_kernel = { path = "../delta-kernel-rs/kernel", features = ["sync-engine"] } # arrow @@ -59,6 +59,8 @@ datafusion-sql = { version = "44" } # serde serde = { version = "1.0.194", features = ["derive"] } serde_json = "1" +strum = { version = "*"} + # "stdlib" bytes = { version = "1" } diff --git a/crates/core/Cargo.toml b/crates/core/Cargo.toml index 47be50fdb3..3e159aaf9b 100644 --- a/crates/core/Cargo.toml +++ b/crates/core/Cargo.toml @@ -49,6 +49,7 @@ datafusion-functions-aggregate = { workspace = true, optional = true } # serde serde = { workspace = true, features = ["derive"] } serde_json = { workspace = true } +strum = { workspace = true} # "stdlib" bytes = { workspace = true } diff --git a/crates/core/src/kernel/models/actions.rs b/crates/core/src/kernel/models/actions.rs index 119f561b80..3812dc4838 100644 --- a/crates/core/src/kernel/models/actions.rs +++ b/crates/core/src/kernel/models/actions.rs @@ -10,6 +10,7 @@ use url::Url; use super::schema::StructType; use crate::kernel::{error::Error, DeltaResult}; use crate::TableProperty; +use delta_kernel::table_features::{ReaderFeatures, WriterFeatures}; /// Defines a file format used in table #[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)] @@ -183,7 +184,14 @@ impl Protocol { mut self, configuration: &HashMap>, ) -> Protocol { + fn parse_bool(value: &Option) -> bool { + value + .as_ref() + .is_some_and(|v| v.to_ascii_lowercase().parse::().is_ok_and(|v| v)) + } + if self.min_writer_version >= 7 { + // TODO: move this is in future to use delta_kernel::table_properties let mut converted_writer_features = configuration .iter() .filter(|(_, value)| { @@ -191,10 +199,22 @@ impl Protocol { .as_ref() .is_some_and(|v| v.to_ascii_lowercase().parse::().is_ok_and(|v| v)) }) - .collect::>>() - .keys() - .map(|key| (*key).clone().into()) - .filter(|v| !matches!(v, WriterFeatures::Other(_))) + .filter_map(|(key, value)| match key.as_str() { + "delta.enableChangeDataFeed" if parse_bool(value) => { + Some(WriterFeatures::ChangeDataFeed) + } + "delta.appendOnly" if parse_bool(value) => Some(WriterFeatures::AppendOnly), + "delta.enableDeletionVectors" if parse_bool(value) => { + Some(WriterFeatures::DeletionVectors) + } + "delta.enableRowTracking" if parse_bool(value) => { + Some(WriterFeatures::RowTracking) + } + "delta.checkpointPolicy" if value.clone().unwrap_or_default() == "v2" => { + Some(WriterFeatures::V2Checkpoint) + } + _ => None, + }) .collect::>(); if configuration @@ -215,13 +235,15 @@ impl Protocol { if self.min_reader_version >= 3 { let converted_reader_features = configuration .iter() - .filter(|(_, value)| { - value - .as_ref() - .is_some_and(|v| v.to_ascii_lowercase().parse::().is_ok_and(|v| v)) + .filter_map(|(key, value)| match key.as_str() { + "delta.enableDeletionVectors" if parse_bool(value) => { + Some(ReaderFeatures::DeletionVectors) + } + "delta.checkpointPolicy" if value.clone().unwrap_or_default() == "v2" => { + Some(ReaderFeatures::V2Checkpoint) + } + _ => None, }) - .map(|(key, _)| (*key).clone().into()) - .filter(|v| !matches!(v, ReaderFeatures::Other(_))) .collect::>(); match self.reader_features { Some(mut features) => { @@ -459,225 +481,28 @@ impl fmt::Display for TableFeatures { } } -impl TableFeatures { - /// Convert table feature to respective reader or/and write feature - pub fn to_reader_writer_features(&self) -> (Option, Option) { - let reader_feature = ReaderFeatures::try_from(self).ok(); - let writer_feature = WriterFeatures::try_from(self).ok(); - (reader_feature, writer_feature) - } -} - -/// Features table readers can support as well as let users know -/// what is supported -#[derive(Serialize, Deserialize, Debug, Clone, Eq, PartialEq, Hash)] -#[serde(rename_all = "camelCase")] -pub enum ReaderFeatures { - /// Mapping of one column to another - ColumnMapping, - /// Deletion vectors for merge, update, delete - DeletionVectors, - /// timestamps without timezone support - #[serde(rename = "timestampNtz")] - TimestampWithoutTimezone, - /// version 2 of checkpointing - V2Checkpoint, - /// If we do not match any other reader features - #[serde(untagged)] - Other(String), -} - -impl From<&parquet::record::Field> for ReaderFeatures { - fn from(value: &parquet::record::Field) -> Self { - match value { - parquet::record::Field::Str(feature) => match feature.as_str() { - "columnMapping" => ReaderFeatures::ColumnMapping, - "deletionVectors" | "delta.enableDeletionVectors" => { - ReaderFeatures::DeletionVectors - } - "timestampNtz" => ReaderFeatures::TimestampWithoutTimezone, - "v2Checkpoint" => ReaderFeatures::V2Checkpoint, - f => ReaderFeatures::Other(f.to_string()), - }, - f => ReaderFeatures::Other(f.to_string()), - } - } -} - -impl From for ReaderFeatures { - fn from(value: String) -> Self { - value.as_str().into() - } -} - -impl From<&str> for ReaderFeatures { - fn from(value: &str) -> Self { - match value { - "columnMapping" => ReaderFeatures::ColumnMapping, - "deletionVectors" => ReaderFeatures::DeletionVectors, - "timestampNtz" => ReaderFeatures::TimestampWithoutTimezone, - "v2Checkpoint" => ReaderFeatures::V2Checkpoint, - f => ReaderFeatures::Other(f.to_string()), - } - } -} - -impl AsRef for ReaderFeatures { - fn as_ref(&self) -> &str { - match self { - ReaderFeatures::ColumnMapping => "columnMapping", - ReaderFeatures::DeletionVectors => "deletionVectors", - ReaderFeatures::TimestampWithoutTimezone => "timestampNtz", - ReaderFeatures::V2Checkpoint => "v2Checkpoint", - ReaderFeatures::Other(f) => f, - } - } -} - -impl fmt::Display for ReaderFeatures { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "{}", self.as_ref()) - } -} - impl TryFrom<&TableFeatures> for ReaderFeatures { - type Error = String; + type Error = strum::ParseError; fn try_from(value: &TableFeatures) -> Result { - match ReaderFeatures::from(value.as_ref()) { - ReaderFeatures::Other(_) => { - Err(format!("Table feature {} is not a reader feature", value)) - } - value => Ok(value), - } - } -} - -/// Features table writers can support as well as let users know -/// what is supported -#[derive(Serialize, Deserialize, Debug, Clone, Eq, PartialEq, Hash)] -#[serde(rename_all = "camelCase")] -pub enum WriterFeatures { - /// Append Only Tables - AppendOnly, - /// Table invariants - Invariants, - /// Check constraints on columns - CheckConstraints, - /// CDF on a table - ChangeDataFeed, - /// Columns with generated values - GeneratedColumns, - /// Mapping of one column to another - ColumnMapping, - /// ID Columns - IdentityColumns, - /// Deletion vectors for merge, update, delete - DeletionVectors, - /// Row tracking on tables - RowTracking, - /// timestamps without timezone support - #[serde(rename = "timestampNtz")] - TimestampWithoutTimezone, - /// domain specific metadata - DomainMetadata, - /// version 2 of checkpointing - V2Checkpoint, - /// Iceberg compatibility support - IcebergCompatV1, - /// If we do not match any other reader features - #[serde(untagged)] - Other(String), -} - -impl From for WriterFeatures { - fn from(value: String) -> Self { - value.as_str().into() - } -} - -impl From<&str> for WriterFeatures { - fn from(value: &str) -> Self { - match value { - "appendOnly" | "delta.appendOnly" => WriterFeatures::AppendOnly, - "invariants" => WriterFeatures::Invariants, - "checkConstraints" => WriterFeatures::CheckConstraints, - "changeDataFeed" | "delta.enableChangeDataFeed" => WriterFeatures::ChangeDataFeed, - "generatedColumns" => WriterFeatures::GeneratedColumns, - "columnMapping" => WriterFeatures::ColumnMapping, - "identityColumns" => WriterFeatures::IdentityColumns, - "deletionVectors" | "delta.enableDeletionVectors" => WriterFeatures::DeletionVectors, - "rowTracking" | "delta.enableRowTracking" => WriterFeatures::RowTracking, - "timestampNtz" => WriterFeatures::TimestampWithoutTimezone, - "domainMetadata" => WriterFeatures::DomainMetadata, - "v2Checkpoint" => WriterFeatures::V2Checkpoint, - "icebergCompatV1" => WriterFeatures::IcebergCompatV1, - f => WriterFeatures::Other(f.to_string()), - } - } -} - -impl AsRef for WriterFeatures { - fn as_ref(&self) -> &str { - match self { - WriterFeatures::AppendOnly => "appendOnly", - WriterFeatures::Invariants => "invariants", - WriterFeatures::CheckConstraints => "checkConstraints", - WriterFeatures::ChangeDataFeed => "changeDataFeed", - WriterFeatures::GeneratedColumns => "generatedColumns", - WriterFeatures::ColumnMapping => "columnMapping", - WriterFeatures::IdentityColumns => "identityColumns", - WriterFeatures::DeletionVectors => "deletionVectors", - WriterFeatures::RowTracking => "rowTracking", - WriterFeatures::TimestampWithoutTimezone => "timestampNtz", - WriterFeatures::DomainMetadata => "domainMetadata", - WriterFeatures::V2Checkpoint => "v2Checkpoint", - WriterFeatures::IcebergCompatV1 => "icebergCompatV1", - WriterFeatures::Other(f) => f, - } - } -} - -impl fmt::Display for WriterFeatures { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "{}", self.as_ref()) + ReaderFeatures::try_from(value.as_ref()) } } impl TryFrom<&TableFeatures> for WriterFeatures { - type Error = String; + type Error = strum::ParseError; fn try_from(value: &TableFeatures) -> Result { - match WriterFeatures::from(value.as_ref()) { - WriterFeatures::Other(_) => { - Err(format!("Table feature {} is not a writer feature", value)) - } - value => Ok(value), - } + WriterFeatures::try_from(value.as_ref()) } } -impl From<&parquet::record::Field> for WriterFeatures { - fn from(value: &parquet::record::Field) -> Self { - match value { - parquet::record::Field::Str(feature) => match feature.as_str() { - "appendOnly" => WriterFeatures::AppendOnly, - "invariants" => WriterFeatures::Invariants, - "checkConstraints" => WriterFeatures::CheckConstraints, - "changeDataFeed" => WriterFeatures::ChangeDataFeed, - "generatedColumns" => WriterFeatures::GeneratedColumns, - "columnMapping" => WriterFeatures::ColumnMapping, - "identityColumns" => WriterFeatures::IdentityColumns, - "deletionVectors" => WriterFeatures::DeletionVectors, - "rowTracking" => WriterFeatures::RowTracking, - "timestampNtz" => WriterFeatures::TimestampWithoutTimezone, - "domainMetadata" => WriterFeatures::DomainMetadata, - "v2Checkpoint" => WriterFeatures::V2Checkpoint, - "icebergCompatV1" => WriterFeatures::IcebergCompatV1, - f => WriterFeatures::Other(f.to_string()), - }, - f => WriterFeatures::Other(f.to_string()), - } +impl TableFeatures { + /// Convert table feature to respective reader or/and write feature + pub fn to_reader_writer_features(&self) -> (Option, Option) { + let reader_feature = ReaderFeatures::try_from(self).ok(); + let writer_feature = WriterFeatures::try_from(self).ok(); + (reader_feature, writer_feature) } } diff --git a/crates/core/src/kernel/snapshot/log_segment.rs b/crates/core/src/kernel/snapshot/log_segment.rs index 62e4cbb8f7..11bab3e771 100644 --- a/crates/core/src/kernel/snapshot/log_segment.rs +++ b/crates/core/src/kernel/snapshot/log_segment.rs @@ -533,7 +533,9 @@ pub(super) async fn list_log_files( #[cfg(test)] pub(super) mod tests { + use delta_kernel::table_features::{ReaderFeatures, WriterFeatures}; use deltalake_test::utils::*; + use maplit::hashset; use tokio::task::JoinHandle; use crate::{ @@ -637,8 +639,8 @@ pub(super) mod tests { let expected = Protocol { min_reader_version: 3, min_writer_version: 7, - reader_features: Some(vec!["deletionVectors".into()].into_iter().collect()), - writer_features: Some(vec!["deletionVectors".into()].into_iter().collect()), + reader_features: Some(hashset! {ReaderFeatures::DeletionVectors}), + writer_features: Some(hashset! {WriterFeatures::DeletionVectors}), }; assert_eq!(protocol, expected); diff --git a/crates/core/src/kernel/snapshot/parse.rs b/crates/core/src/kernel/snapshot/parse.rs index e8630cbe0c..baef8eba65 100644 --- a/crates/core/src/kernel/snapshot/parse.rs +++ b/crates/core/src/kernel/snapshot/parse.rs @@ -3,6 +3,7 @@ use arrow_array::{ Array, BooleanArray, Int32Array, Int64Array, ListArray, MapArray, StringArray, StructArray, }; +use delta_kernel::table_features::{ReaderFeatures, WriterFeatures}; use percent_encoding::percent_decode_str; use crate::kernel::arrow::extract::{self as ex, ProvidesColumnByName}; @@ -63,10 +64,18 @@ pub(super) fn read_protocol(batch: &dyn ProvidesColumnByName) -> DeltaResult::try_into(v.as_str())) + .filter_map(|v| v.ok()) + .collect() + }), + writer_features: collect_string_list(&maybe_writer_features, idx).map(|v| { + v.into_iter() + .map(|v| TryInto::::try_into(v.as_str())) + .filter_map(|v| v.ok()) + .collect() + }), })); } } diff --git a/crates/core/src/operations/add_feature.rs b/crates/core/src/operations/add_feature.rs index 40ab7fcf42..0e7f88ee7f 100644 --- a/crates/core/src/operations/add_feature.rs +++ b/crates/core/src/operations/add_feature.rs @@ -2,12 +2,13 @@ use std::sync::Arc; +use delta_kernel::table_features::{ReaderFeatures, WriterFeatures}; use futures::future::BoxFuture; use itertools::Itertools; use super::transaction::{CommitBuilder, CommitProperties}; use super::{CustomExecuteHandler, Operation}; -use crate::kernel::{ReaderFeatures, TableFeatures, WriterFeatures}; +use crate::kernel::TableFeatures; use crate::logstore::LogStoreRef; use crate::protocol::DeltaOperation; use crate::table::state::DeltaTableState; @@ -151,13 +152,13 @@ impl std::future::IntoFuture for AddTableFeatureBuilder { #[cfg(feature = "datafusion")] #[cfg(test)] mod tests { - use delta_kernel::DeltaResult; - use crate::{ kernel::TableFeatures, writer::test_utils::{create_bare_table, get_record_batch}, DeltaOps, }; + use delta_kernel::table_features::{ReaderFeatures, WriterFeatures}; + use delta_kernel::DeltaResult; #[tokio::test] async fn add_feature() -> DeltaResult<()> { @@ -180,7 +181,7 @@ mod tests { .unwrap() .writer_features .unwrap_or_default() - .contains(&crate::kernel::WriterFeatures::ChangeDataFeed)); + .contains(&WriterFeatures::ChangeDataFeed)); let result = DeltaOps(result) .add_feature() @@ -194,12 +195,12 @@ mod tests { .writer_features .clone() .unwrap_or_default() - .contains(&crate::kernel::WriterFeatures::DeletionVectors)); + .contains(&WriterFeatures::DeletionVectors)); assert!(¤t_protocol .reader_features .clone() .unwrap_or_default() - .contains(&crate::kernel::ReaderFeatures::DeletionVectors)); + .contains(&ReaderFeatures::DeletionVectors)); assert_eq!(result.version(), 2); Ok(()) } diff --git a/crates/core/src/operations/cdc.rs b/crates/core/src/operations/cdc.rs index b04c794c61..c9d0ca0665 100644 --- a/crates/core/src/operations/cdc.rs +++ b/crates/core/src/operations/cdc.rs @@ -70,7 +70,7 @@ pub(crate) fn should_write_cdc(snapshot: &DeltaTableState) -> DeltaResult // the Option> can get filled with an empty set, checking for the value // explicitly if snapshot.protocol().min_writer_version == 7 - && !features.contains(&crate::kernel::WriterFeatures::ChangeDataFeed) + && !features.contains(&delta_kernel::table_features::WriterFeatures::ChangeDataFeed) { // If the writer feature has not been set, then the table should not have CDC written // to it. Otherwise fallback to the configured table configuration @@ -95,6 +95,7 @@ mod tests { use arrow_schema::Schema; use datafusion::assert_batches_sorted_eq; use datafusion::datasource::{MemTable, TableProvider}; + use delta_kernel::table_features::WriterFeatures; /// A simple test which validates primitive writer version 1 tables should /// not write Change Data Files @@ -173,8 +174,8 @@ mod tests { /// therefore should write CDC files #[tokio::test] async fn test_should_write_cdc_v7_table_with_writer_feature() { - let protocol = Protocol::new(1, 7) - .with_writer_features(vec![crate::kernel::WriterFeatures::ChangeDataFeed]); + let protocol = + Protocol::new(1, 7).with_writer_features(vec![WriterFeatures::ChangeDataFeed]); let actions = vec![Action::Protocol(protocol)]; let mut table: DeltaTable = DeltaOps::new_in_memory() .create() diff --git a/crates/core/src/operations/constraints.rs b/crates/core/src/operations/constraints.rs index 479055ab40..2f9c172646 100644 --- a/crates/core/src/operations/constraints.rs +++ b/crates/core/src/operations/constraints.rs @@ -14,13 +14,14 @@ use crate::delta_datafusion::expr::fmt_expr_to_sql; use crate::delta_datafusion::{ register_store, DeltaDataChecker, DeltaScanBuilder, DeltaSessionContext, }; -use crate::kernel::{Protocol, WriterFeatures}; +use crate::kernel::Protocol; use crate::logstore::LogStoreRef; use crate::operations::datafusion_utils::Expression; use crate::protocol::DeltaOperation; use crate::table::state::DeltaTableState; use crate::table::Constraint; use crate::{DeltaResult, DeltaTable, DeltaTableError}; +use delta_kernel::table_features::WriterFeatures; use super::datafusion_utils::into_expr; use super::transaction::{CommitBuilder, CommitProperties}; diff --git a/crates/core/src/operations/create.rs b/crates/core/src/operations/create.rs index 571ab7dc7c..5f6ef47bc0 100644 --- a/crates/core/src/operations/create.rs +++ b/crates/core/src/operations/create.rs @@ -14,14 +14,13 @@ use uuid::Uuid; use super::transaction::{CommitBuilder, TableReference, PROTOCOL}; use super::{CustomExecuteHandler, Operation}; use crate::errors::{DeltaResult, DeltaTableError}; -use crate::kernel::{ - Action, DataType, Metadata, Protocol, ReaderFeatures, StructField, StructType, WriterFeatures, -}; +use crate::kernel::{Action, DataType, Metadata, Protocol, StructField, StructType}; use crate::logstore::{LogStore, LogStoreRef}; use crate::protocol::{DeltaOperation, SaveMode}; use crate::table::builder::ensure_table_uri; use crate::table::config::TableProperty; use crate::{DeltaTable, DeltaTableBuilder}; +use delta_kernel::table_features::{ReaderFeatures, WriterFeatures}; #[derive(thiserror::Error, Debug)] enum CreateError { diff --git a/crates/core/src/operations/transaction/mod.rs b/crates/core/src/operations/transaction/mod.rs index d98b707e43..c97a488dd3 100644 --- a/crates/core/src/operations/transaction/mod.rs +++ b/crates/core/src/operations/transaction/mod.rs @@ -89,16 +89,14 @@ use uuid::Uuid; use self::conflict_checker::{TransactionInfo, WinningCommitSummary}; use crate::checkpoints::{cleanup_expired_logs_for, create_checkpoint_for}; use crate::errors::DeltaTableError; -use crate::kernel::{ - Action, CommitInfo, EagerSnapshot, Metadata, Protocol, ReaderFeatures, Transaction, - WriterFeatures, -}; +use crate::kernel::{Action, CommitInfo, EagerSnapshot, Metadata, Protocol, Transaction}; use crate::logstore::{CommitOrBytes, LogStoreRef}; use crate::protocol::DeltaOperation; use crate::storage::ObjectStoreRef; use crate::table::config::TableConfig; use crate::table::state::DeltaTableState; use crate::{crate_version, DeltaResult}; +use delta_kernel::table_features::{ReaderFeatures, WriterFeatures}; pub use self::conflict_checker::CommitConflictError; pub use self::protocol::INSTANCE as PROTOCOL; diff --git a/crates/core/src/operations/transaction/protocol.rs b/crates/core/src/operations/transaction/protocol.rs index b9ea7d65aa..ef88fbf8e6 100644 --- a/crates/core/src/operations/transaction/protocol.rs +++ b/crates/core/src/operations/transaction/protocol.rs @@ -5,11 +5,10 @@ use once_cell::sync::Lazy; use tracing::log::*; use super::{TableReference, TransactionError}; -use crate::kernel::{ - Action, DataType, EagerSnapshot, ReaderFeatures, Schema, StructField, WriterFeatures, -}; +use crate::kernel::{Action, DataType, EagerSnapshot, Schema, StructField}; use crate::protocol::DeltaOperation; use crate::table::state::DeltaTableState; +use delta_kernel::table_features::{ReaderFeatures, WriterFeatures}; lazy_static! { static ref READER_V2: HashSet = @@ -585,8 +584,7 @@ mod tests { let checker_5 = ProtocolChecker::new(READER_V2.clone(), WRITER_V4.clone()); let actions = vec![ Action::Protocol( - Protocol::new(2, 4) - .with_writer_features(vec![crate::kernel::WriterFeatures::ChangeDataFeed]), + Protocol::new(2, 4).with_writer_features(vec![WriterFeatures::ChangeDataFeed]), ), metadata_action(None).into(), ]; @@ -603,8 +601,7 @@ mod tests { let checker_5 = ProtocolChecker::new(READER_V2.clone(), WRITER_V4.clone()); let actions = vec![ Action::Protocol( - Protocol::new(2, 4) - .with_writer_features(vec![crate::kernel::WriterFeatures::GeneratedColumns]), + Protocol::new(2, 4).with_writer_features(vec![WriterFeatures::GeneratedColumns]), ), metadata_action(None).into(), ]; diff --git a/crates/core/src/operations/write.rs b/crates/core/src/operations/write.rs index 4d73b0d6e5..0be11f05cf 100644 --- a/crates/core/src/operations/write.rs +++ b/crates/core/src/operations/write.rs @@ -984,7 +984,7 @@ impl std::future::IntoFuture for WriteBuilder { .reader_features .clone() .unwrap_or_default() - .contains(&crate::kernel::ReaderFeatures::TimestampWithoutTimezone) + .contains(&delta_kernel::table_features::ReaderFeatures::TimestampWithoutTimezone) // We can check only reader features, as reader and writer timestampNtz // should be always enabled together { @@ -1082,7 +1082,9 @@ impl std::future::IntoFuture for WriteBuilder { .reader_features .clone() .unwrap_or_default() - .contains(&crate::kernel::ReaderFeatures::TimestampWithoutTimezone) + .contains( + &delta_kernel::table_features::ReaderFeatures::TimestampWithoutTimezone, + ) // We can check only reader features, as reader and writer timestampNtz // should be always enabled together { diff --git a/crates/core/src/protocol/parquet_read/mod.rs b/crates/core/src/protocol/parquet_read/mod.rs index 655dcb05f3..54e4a095a5 100644 --- a/crates/core/src/protocol/parquet_read/mod.rs +++ b/crates/core/src/protocol/parquet_read/mod.rs @@ -1,6 +1,7 @@ use std::{collections::HashMap, str::FromStr}; use chrono::{SecondsFormat, TimeZone, Utc}; +use delta_kernel::table_features::{ReaderFeatures, WriterFeatures}; use num_bigint::BigInt; use num_traits::cast::ToPrimitive; use parquet::record::{Field, ListAccessor, MapAccessor, RowAccessor}; @@ -644,13 +645,33 @@ impl Protocol { "readerFeatures" => { re.reader_features = record .get_list(i) - .map(|l| l.elements().iter().map(From::from).collect()) + .map(|l| { + l.elements() + .iter() + .filter_map(|v| match v { + Field::Str(feature) => { + ReaderFeatures::try_from(feature.as_str()).ok() + } + _ => None, + }) + .collect() + }) .ok() } "writerFeatures" => { re.writer_features = record .get_list(i) - .map(|l| l.elements().iter().map(From::from).collect()) + .map(|l| { + l.elements() + .iter() + .filter_map(|v| match v { + Field::Str(feature) => { + WriterFeatures::try_from(feature.as_str()).ok() + } + _ => None, + }) + .collect() + }) .ok() } _ => { diff --git a/crates/core/src/test_utils/factories/actions.rs b/crates/core/src/test_utils/factories/actions.rs index 92778f33bf..a96db5e5d7 100644 --- a/crates/core/src/test_utils/factories/actions.rs +++ b/crates/core/src/test_utils/factories/actions.rs @@ -9,8 +9,9 @@ use object_store::ObjectMeta; use super::{get_parquet_bytes, DataFactory, FileStats}; use crate::kernel::arrow::extract::{self as ex}; use crate::kernel::partitions_schema; -use crate::kernel::{Add, Metadata, Protocol, ReaderFeatures, Remove, StructType, WriterFeatures}; +use crate::kernel::{Add, Metadata, Protocol, Remove, StructType}; use crate::operations::transaction::PROTOCOL; +use delta_kernel::table_features::{ReaderFeatures, WriterFeatures}; pub struct ActionFactory; diff --git a/crates/core/tests/read_delta_log_test.rs b/crates/core/tests/read_delta_log_test.rs index 92915f9162..12cc12c352 100644 --- a/crates/core/tests/read_delta_log_test.rs +++ b/crates/core/tests/read_delta_log_test.rs @@ -158,7 +158,7 @@ async fn test_read_table_features() -> DeltaResult<()> { assert!(rf.is_some()); assert!(wf.is_some()); - assert_eq!(rf.unwrap().len(), 5); + assert_eq!(rf.unwrap().len(), 4); assert_eq!(wf.unwrap().len(), 13); Ok(()) }