From 6e78f47f2a05a95dbd7ced8ac48c0655344579a1 Mon Sep 17 00:00:00 2001 From: Ion Koutsouris <15728914+ion-elgreco@users.noreply.github.com> Date: Sat, 14 Sep 2024 11:22:02 +0200 Subject: [PATCH] feat: add feature operation --- crates/core/src/kernel/models/actions.rs | 117 +++++++++++++ crates/core/src/operations/add_feature.rs | 196 ++++++++++++++++++++++ crates/core/src/operations/mod.rs | 8 + crates/core/src/protocol/mod.rs | 10 +- python/deltalake/__init__.py | 1 + python/deltalake/_internal.pyi | 36 ++++ python/deltalake/table.py | 46 ++++- python/src/features.rs | 56 +++++++ python/src/lib.rs | 31 ++++ python/tests/conftest.py | 7 + python/tests/test_alter.py | 84 +++++++++- 11 files changed, 585 insertions(+), 7 deletions(-) create mode 100644 crates/core/src/operations/add_feature.rs create mode 100644 python/src/features.rs diff --git a/crates/core/src/kernel/models/actions.rs b/crates/core/src/kernel/models/actions.rs index f5c1129dfc..739a585509 100644 --- a/crates/core/src/kernel/models/actions.rs +++ b/crates/core/src/kernel/models/actions.rs @@ -377,6 +377,97 @@ impl Protocol { } } +/// High level table features +#[derive(Serialize, Deserialize, Debug, Clone, Eq, PartialEq, Hash)] +#[serde(rename_all = "camelCase")] +pub enum TableFeatures { + /// Mapping of one column to another + ColumnMapping, + /// Deletion vectors for merge, update, delete + DeletionVectors, + /// timestamps without timezone support + #[serde(rename = "timestampNtz")] + TimestampWithoutTimezone, + /// version 2 of checkpointing + V2Checkpoint, + /// Append Only Tables + AppendOnly, + /// Table invariants + Invariants, + /// Check constraints on columns + CheckConstraints, + /// CDF on a table + ChangeDataFeed, + /// Columns with generated values + GeneratedColumns, + /// ID Columns + IdentityColumns, + /// Row tracking on tables + RowTracking, + /// domain specific metadata + DomainMetadata, + /// Iceberg compatibility support + IcebergCompatV1, +} + +impl FromStr for TableFeatures { + type Err = (); + + fn from_str(value: &str) -> Result { + match value { + "columnMapping" => Ok(TableFeatures::ColumnMapping), + "deletionVectors" => Ok(TableFeatures::DeletionVectors), + "timestampNtz" => Ok(TableFeatures::TimestampWithoutTimezone), + "v2Checkpoint" => Ok(TableFeatures::V2Checkpoint), + "appendOnly" => Ok(TableFeatures::AppendOnly), + "invariants" => Ok(TableFeatures::Invariants), + "checkConstraints" => Ok(TableFeatures::CheckConstraints), + "changeDataFeed" => Ok(TableFeatures::ChangeDataFeed), + "generatedColumns" => Ok(TableFeatures::GeneratedColumns), + "identityColumns" => Ok(TableFeatures::IdentityColumns), + "rowTracking" => Ok(TableFeatures::RowTracking), + "domainMetadata" => Ok(TableFeatures::DomainMetadata), + "icebergCompatV1" => Ok(TableFeatures::IcebergCompatV1), + _ => Err(()), + } + } +} + +impl AsRef for TableFeatures { + fn as_ref(&self) -> &str { + match self { + TableFeatures::ColumnMapping => "columnMapping", + TableFeatures::DeletionVectors => "deletionVectors", + TableFeatures::TimestampWithoutTimezone => "timestampNtz", + TableFeatures::V2Checkpoint => "v2Checkpoint", + TableFeatures::AppendOnly => "appendOnly", + TableFeatures::Invariants => "invariants", + TableFeatures::CheckConstraints => "checkConstraints", + TableFeatures::ChangeDataFeed => "changeDataFeed", + TableFeatures::GeneratedColumns => "generatedColumns", + TableFeatures::IdentityColumns => "identityColumns", + TableFeatures::RowTracking => "rowTracking", + TableFeatures::DomainMetadata => "domainMetadata", + TableFeatures::IcebergCompatV1 => "icebergCompatV1", + } + } +} + +impl fmt::Display for TableFeatures { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{}", self.as_ref()) + } +} + +impl TableFeatures { + /// Convert table feature to respective reader or/and write feature + pub fn to_reader_writer_features(&self) -> (Option, Option) { + let reader_feature = ReaderFeatures::try_from(self).ok(); + let writer_feature = WriterFeatures::try_from(self).ok(); + (reader_feature, writer_feature) + } +} + /// Features table readers can support as well as let users know /// what is supported #[derive(Serialize, Deserialize, Debug, Clone, Eq, PartialEq, Hash)] @@ -449,6 +540,19 @@ impl fmt::Display for ReaderFeatures { } } +impl TryFrom<&TableFeatures> for ReaderFeatures { + type Error = String; + + fn try_from(value: &TableFeatures) -> Result { + match ReaderFeatures::from(value.as_ref()) { + ReaderFeatures::Other(_) => { + Err(format!("Table feature {} is not a reader feature", value)) + } + value => Ok(value), + } + } +} + /// Features table writers can support as well as let users know /// what is supported #[derive(Serialize, Deserialize, Debug, Clone, Eq, PartialEq, Hash)] @@ -540,6 +644,19 @@ impl fmt::Display for WriterFeatures { } } +impl TryFrom<&TableFeatures> for WriterFeatures { + type Error = String; + + fn try_from(value: &TableFeatures) -> Result { + match WriterFeatures::from(value.as_ref()) { + WriterFeatures::Other(_) => { + Err(format!("Table feature {} is not a writer feature", value)) + } + value => Ok(value), + } + } +} + impl From<&parquet::record::Field> for WriterFeatures { fn from(value: &parquet::record::Field) -> Self { match value { diff --git a/crates/core/src/operations/add_feature.rs b/crates/core/src/operations/add_feature.rs new file mode 100644 index 0000000000..7200c37d03 --- /dev/null +++ b/crates/core/src/operations/add_feature.rs @@ -0,0 +1,196 @@ +//! Enable table features + +use futures::future::BoxFuture; +use itertools::Itertools; + +use super::transaction::{CommitBuilder, CommitProperties}; +use crate::kernel::{ReaderFeatures, TableFeatures, WriterFeatures}; +use crate::logstore::LogStoreRef; +use crate::protocol::DeltaOperation; +use crate::table::state::DeltaTableState; +use crate::DeltaTable; +use crate::{DeltaResult, DeltaTableError}; + +/// Enable table features for a table +pub struct AddTableFeatureBuilder { + /// A snapshot of the table's state + snapshot: DeltaTableState, + /// Name of the feature + name: Vec, + /// Allow protocol versions to be increased by setting features + allow_protocol_versions_increase: bool, + /// Delta object store for handling data files + log_store: LogStoreRef, + /// Additional information to add to the commit + commit_properties: CommitProperties, +} + +impl super::Operation<()> for AddTableFeatureBuilder {} + +impl AddTableFeatureBuilder { + /// Create a new builder + pub fn new(log_store: LogStoreRef, snapshot: DeltaTableState) -> Self { + Self { + name: vec![], + allow_protocol_versions_increase: false, + snapshot, + log_store, + commit_properties: CommitProperties::default(), + } + } + + /// Specify the features to be added + pub fn with_feature>(mut self, name: S) -> Self { + self.name.push(name.into()); + self + } + + /// Specify the features to be added + pub fn with_features>(mut self, name: Vec) -> Self { + self.name + .extend(name.into_iter().map(Into::into).collect_vec()); + self + } + + /// Specify if you want to allow protocol version to be increased + pub fn with_allow_protocol_versions_increase(mut self, allow: bool) -> Self { + self.allow_protocol_versions_increase = allow; + self + } + + /// Additional metadata to be added to commit info + pub fn with_commit_properties(mut self, commit_properties: CommitProperties) -> Self { + self.commit_properties = commit_properties; + self + } +} + +impl std::future::IntoFuture for AddTableFeatureBuilder { + type Output = DeltaResult; + + type IntoFuture = BoxFuture<'static, Self::Output>; + + fn into_future(self) -> Self::IntoFuture { + let this = self; + + Box::pin(async move { + let name = if this.name.is_empty() { + return Err(DeltaTableError::Generic("No features provided".to_string())); + } else { + this.name + }; + let (reader_features, writer_features): ( + Vec>, + Vec>, + ) = name.iter().map(|v| v.to_reader_writer_features()).unzip(); + let reader_features = reader_features.into_iter().flatten().collect_vec(); + let writer_features = writer_features.into_iter().flatten().collect_vec(); + + let mut protocol = this.snapshot.protocol().clone(); + + if !this.allow_protocol_versions_increase { + if !reader_features.is_empty() + && !writer_features.is_empty() + && !(protocol.min_reader_version == 3 && protocol.min_writer_version == 7) + { + return Err(DeltaTableError::Generic("Table feature enables reader and writer feature, but reader is not v3, and writer not v7. Set allow_protocol_versions_increase or increase versions explicitly through set_tbl_properties".to_string())); + } else if !reader_features.is_empty() && protocol.min_reader_version < 3 { + return Err(DeltaTableError::Generic("Table feature enables reader feature, but min_reader is not v3. Set allow_protocol_versions_increase or increase version explicitly through set_tbl_properties".to_string())); + } else if !writer_features.is_empty() && protocol.min_writer_version < 7 { + return Err(DeltaTableError::Generic("Table feature enables writer feature, but min_writer is not v7. Set allow_protocol_versions_increase or increase version explicitly through set_tbl_properties".to_string())); + } + } + + protocol = protocol.with_reader_features(reader_features); + protocol = protocol.with_writer_features(writer_features); + + let operation = DeltaOperation::AddFeature { name }; + + let actions = vec![protocol.into()]; + + let commit = CommitBuilder::from(this.commit_properties) + .with_actions(actions) + .build(Some(&this.snapshot), this.log_store.clone(), operation) + .await?; + + Ok(DeltaTable::new_with_state( + this.log_store, + commit.snapshot(), + )) + }) + } +} + +#[cfg(feature = "datafusion")] +#[cfg(test)] +mod tests { + use delta_kernel::DeltaResult; + + use crate::{ + kernel::TableFeatures, + writer::test_utils::{create_bare_table, get_record_batch}, + DeltaOps, + }; + + #[tokio::test] + async fn add_feature() -> DeltaResult<()> { + let batch = get_record_batch(None, false); + let write = DeltaOps(create_bare_table()) + .write(vec![batch.clone()]) + .await + .unwrap(); + let table = DeltaOps(write); + let result = table + .add_feature() + .with_feature(TableFeatures::ChangeDataFeed) + .with_allow_protocol_versions_increase(true) + .await + .unwrap(); + + assert!(&result + .protocol() + .cloned() + .unwrap() + .writer_features + .unwrap_or_default() + .contains(&crate::kernel::WriterFeatures::ChangeDataFeed)); + + let result = DeltaOps(result) + .add_feature() + .with_feature(TableFeatures::DeletionVectors) + .with_allow_protocol_versions_increase(true) + .await + .unwrap(); + + let current_protocol = &result.protocol().cloned().unwrap(); + assert!(¤t_protocol + .writer_features + .clone() + .unwrap_or_default() + .contains(&crate::kernel::WriterFeatures::DeletionVectors)); + assert!(¤t_protocol + .reader_features + .clone() + .unwrap_or_default() + .contains(&crate::kernel::ReaderFeatures::DeletionVectors)); + assert_eq!(result.version(), 2); + Ok(()) + } + + #[tokio::test] + async fn add_feature_disallowed_increase() -> DeltaResult<()> { + let batch = get_record_batch(None, false); + let write = DeltaOps(create_bare_table()) + .write(vec![batch.clone()]) + .await + .unwrap(); + let table = DeltaOps(write); + let result = table + .add_feature() + .with_feature(TableFeatures::ChangeDataFeed) + .await; + + assert!(result.is_err()); + Ok(()) + } +} diff --git a/crates/core/src/operations/mod.rs b/crates/core/src/operations/mod.rs index 676098c832..c71141d277 100644 --- a/crates/core/src/operations/mod.rs +++ b/crates/core/src/operations/mod.rs @@ -8,6 +8,7 @@ //! if the operation returns data as well. use std::collections::HashMap; +use add_feature::AddTableFeatureBuilder; #[cfg(feature = "datafusion")] use arrow_array::RecordBatch; #[cfg(feature = "datafusion")] @@ -31,6 +32,7 @@ use crate::table::builder::DeltaTableBuilder; use crate::DeltaTable; pub mod add_column; +pub mod add_feature; pub mod cast; pub mod convert_to_delta; pub mod create; @@ -220,6 +222,12 @@ impl DeltaOps { ConstraintBuilder::new(self.0.log_store, self.0.state.unwrap()) } + /// Enable a table feature for a table + #[must_use] + pub fn add_feature(self) -> AddTableFeatureBuilder { + AddTableFeatureBuilder::new(self.0.log_store, self.0.state.unwrap()) + } + /// Drops constraints from a table #[cfg(feature = "datafusion")] #[must_use] diff --git a/crates/core/src/protocol/mod.rs b/crates/core/src/protocol/mod.rs index 24b25e18c5..f82f48411a 100644 --- a/crates/core/src/protocol/mod.rs +++ b/crates/core/src/protocol/mod.rs @@ -18,7 +18,7 @@ use serde_json::Value; use tracing::{debug, error}; use crate::errors::{DeltaResult, DeltaTableError}; -use crate::kernel::{Add, CommitInfo, Metadata, Protocol, Remove, StructField}; +use crate::kernel::{Add, CommitInfo, Metadata, Protocol, Remove, StructField, TableFeatures}; use crate::logstore::LogStore; use crate::table::CheckPoint; @@ -364,6 +364,12 @@ pub enum DeltaOperation { expr: String, }, + /// Add table features to a table + AddFeature { + /// Name of the feature + name: Vec, + }, + /// Drops constraints from a table DropConstraint { /// Constraints name @@ -470,6 +476,7 @@ impl DeltaOperation { DeltaOperation::VacuumEnd { .. } => "VACUUM END", DeltaOperation::AddConstraint { .. } => "ADD CONSTRAINT", DeltaOperation::DropConstraint { .. } => "DROP CONSTRAINT", + DeltaOperation::AddFeature { .. } => "ADD FEATURE", } } @@ -508,6 +515,7 @@ impl DeltaOperation { Self::Optimize { .. } | Self::SetTableProperties { .. } | Self::AddColumn { .. } + | Self::AddFeature { .. } | Self::VacuumStart { .. } | Self::VacuumEnd { .. } | Self::AddConstraint { .. } diff --git a/python/deltalake/__init__.py b/python/deltalake/__init__.py index 981fda53c0..607a5d988b 100644 --- a/python/deltalake/__init__.py +++ b/python/deltalake/__init__.py @@ -1,3 +1,4 @@ +from ._internal import TableFeatures as TableFeatures from ._internal import __version__ as __version__ from ._internal import rust_core_version as rust_core_version from .data_catalog import DataCatalog as DataCatalog diff --git a/python/deltalake/_internal.pyi b/python/deltalake/_internal.pyi index ee10564b8b..02a3765e02 100644 --- a/python/deltalake/_internal.pyi +++ b/python/deltalake/_internal.pyi @@ -1,3 +1,4 @@ +from enum import Enum from typing import Any, Dict, List, Literal, Mapping, Optional, Tuple, Union import pyarrow @@ -12,6 +13,34 @@ from deltalake.writer import ( __version__: str +class TableFeatures(Enum): + # Mapping of one column to another + ColumnMapping = "ColumnMapping" + # Deletion vectors for merge, update, delete + DeletionVectors = "DeletionVectors" + # timestamps without timezone support + TimestampWithoutTimezone = "TimestampWithoutTimezone" + # version 2 of checkpointing + V2Checkpoint = "V2Checkpoint" + # Append Only Tables + AppendOnly = "AppendOnly" + # Table invariants + Invariants = "Invariants" + # Check constraints on columns + CheckConstraints = "CheckConstraints" + # CDF on a table + ChangeDataFeed = "ChangeDataFeed" + # Columns with generated values + GeneratedColumns = "GeneratedColumns" + # ID Columns + IdentityColumns = "IdentityColumns" + # Row tracking on tables + RowTracking = "RowTracking" + # domain specific metadata + DomainMetadata = "DomainMetadata" + # Iceberg compatibility support + IcebergCompatV1 = "IcebergCompatV1" + class RawDeltaTableMetaData: id: int name: str @@ -94,6 +123,13 @@ class RawDeltaTable: commit_properties: Optional[CommitProperties], post_commithook_properties: Optional[PostCommitHookProperties], ) -> None: ... + def add_feature( + self, + feature: List[TableFeatures], + allow_protocol_versions_increase: bool, + commit_properties: Optional[CommitProperties], + post_commithook_properties: Optional[PostCommitHookProperties], + ) -> None: ... def add_constraints( self, constraints: Dict[str, str], diff --git a/python/deltalake/table.py b/python/deltalake/table.py index dc98db2a5a..9150be697c 100644 --- a/python/deltalake/table.py +++ b/python/deltalake/table.py @@ -42,6 +42,7 @@ DeltaError, PyMergeBuilder, RawDeltaTable, + TableFeatures, ) from deltalake._internal import create_deltalake as _create_deltalake from deltalake._util import encode_partition_value @@ -1799,24 +1800,59 @@ class TableAlterer: def __init__(self, table: DeltaTable) -> None: self.table = table + def add_feature( + self, + feature: Union[TableFeatures, List[TableFeatures]], + allow_protocol_versions_increase: bool = False, + commit_properties: Optional[CommitProperties] = None, + post_commithook_properties: Optional[PostCommitHookProperties] = None, + ) -> None: + """ + Enable a table feature. + + Args: + feature: Table Feature e.g. Deletion Vectors, Change Data Feed + allow_protocol_versions_increase: Allow the protocol to be implicitily bumped to reader 3 or writer 7 + commit_properties: properties of the transaction commit. If None, default values are used. + post_commithook_properties: properties for the post commit hook. If None, default values are used. + + Example: + ```python + from deltalake import DeltaTable + dt = DeltaTable("test_table") + dt.alter.add_feature(TableFeatures.AppendOnly) + ``` + + **Check protocol** + ``` + dt.protocol() + ProtocolVersions(min_reader_version=1, min_writer_version=7, writer_features=['appendOnly'], reader_features=None) + ``` + """ + if isinstance(feature, TableFeatures): + feature = [feature] + self.table._table.add_feature( + feature, + allow_protocol_versions_increase, + commit_properties, + post_commithook_properties, + ) + def add_columns( self, fields: Union[DeltaField, List[DeltaField]], custom_metadata: Optional[Dict[str, str]] = None, - post_commithook_properties: Optional[PostCommitHookProperties] = None, commit_properties: Optional[CommitProperties] = None, + post_commithook_properties: Optional[PostCommitHookProperties] = None, ) -> None: """Add new columns and/or update the fields of a stuctcolumn Args: fields: fields to merge into schema - custom_metadata: Deprecated and will be removed in future versions. Use commit_properties instead. - post_commithook_properties: properties for the post commit hook. If None, default values are used. commit_properties: properties of the transaction commit. If None, default values are used. + post_commithook_properties: properties for the post commit hook. If None, default values are used. Example: - ```python - from deltalake import DeltaTable from deltalake.schema import Field, PrimitiveType, StructType dt = DeltaTable("test_table") new_fields = [ diff --git a/python/src/features.rs b/python/src/features.rs new file mode 100644 index 0000000000..155f7aa365 --- /dev/null +++ b/python/src/features.rs @@ -0,0 +1,56 @@ +use deltalake::kernel::TableFeatures as KernelTableFeatures; +use pyo3::pyclass; + +/// High level table features +#[pyclass] +#[derive(Clone)] +pub enum TableFeatures { + /// Mapping of one column to another + ColumnMapping, + /// Deletion vectors for merge, update, delete + DeletionVectors, + /// timestamps without timezone support + TimestampWithoutTimezone, + /// version 2 of checkpointing + V2Checkpoint, + /// Append Only Tables + AppendOnly, + /// Table invariants + Invariants, + /// Check constraints on columns + CheckConstraints, + /// CDF on a table + ChangeDataFeed, + /// Columns with generated values + GeneratedColumns, + /// ID Columns + IdentityColumns, + /// Row tracking on tables + RowTracking, + /// domain specific metadata + DomainMetadata, + /// Iceberg compatibility support + IcebergCompatV1, +} + +impl From for KernelTableFeatures { + fn from(value: TableFeatures) -> Self { + match value { + TableFeatures::ColumnMapping => KernelTableFeatures::ColumnMapping, + TableFeatures::DeletionVectors => KernelTableFeatures::DeletionVectors, + TableFeatures::TimestampWithoutTimezone => { + KernelTableFeatures::TimestampWithoutTimezone + } + TableFeatures::V2Checkpoint => KernelTableFeatures::V2Checkpoint, + TableFeatures::AppendOnly => KernelTableFeatures::AppendOnly, + TableFeatures::Invariants => KernelTableFeatures::Invariants, + TableFeatures::CheckConstraints => KernelTableFeatures::CheckConstraints, + TableFeatures::ChangeDataFeed => KernelTableFeatures::ChangeDataFeed, + TableFeatures::GeneratedColumns => KernelTableFeatures::GeneratedColumns, + TableFeatures::IdentityColumns => KernelTableFeatures::IdentityColumns, + TableFeatures::RowTracking => KernelTableFeatures::RowTracking, + TableFeatures::DomainMetadata => KernelTableFeatures::DomainMetadata, + TableFeatures::IcebergCompatV1 => KernelTableFeatures::IcebergCompatV1, + } + } +} diff --git a/python/src/lib.rs b/python/src/lib.rs index 1b4194a906..77db334283 100644 --- a/python/src/lib.rs +++ b/python/src/lib.rs @@ -1,4 +1,5 @@ mod error; +mod features; mod filesystem; mod merge; mod schema; @@ -27,6 +28,7 @@ use deltalake::kernel::{ scalars::ScalarExt, Action, Add, Invariant, LogicalFile, Remove, StructType, }; use deltalake::operations::add_column::AddColumnBuilder; +use deltalake::operations::add_feature::AddTableFeatureBuilder; use deltalake::operations::collect_sendable_stream; use deltalake::operations::constraints::ConstraintBuilder; use deltalake::operations::convert_to_delta::{ConvertToDeltaBuilder, PartitionStrategy}; @@ -61,6 +63,7 @@ use serde_json::{Map, Value}; use crate::error::DeltaProtocolError; use crate::error::PythonError; +use crate::features::TableFeatures; use crate::filesystem::FsConfig; use crate::merge::PyMergeBuilder; use crate::schema::{schema_to_pyobject, Field}; @@ -558,7 +561,34 @@ impl RawDeltaTable { { cmd = cmd.with_commit_properties(commit_properties); } + rt().block_on(cmd.into_future()).map_err(PythonError::from) + })?; + self._table.state = table.state; + Ok(()) + } + + #[pyo3(signature = (feature, allow_protocol_versions_increase, commit_properties=None, post_commithook_properties=None))] + pub fn add_feature( + &mut self, + py: Python, + feature: Vec, + allow_protocol_versions_increase: bool, + commit_properties: Option, + post_commithook_properties: Option, + ) -> PyResult<()> { + let table = py.allow_threads(|| { + let mut cmd = AddTableFeatureBuilder::new( + self._table.log_store(), + self._table.snapshot().map_err(PythonError::from)?.clone(), + ) + .with_features(feature) + .with_allow_protocol_versions_increase(allow_protocol_versions_increase); + if let Some(commit_properties) = + maybe_create_commit_properties(commit_properties, post_commithook_properties) + { + cmd = cmd.with_commit_properties(commit_properties); + } rt().block_on(cmd.into_future()).map_err(PythonError::from) })?; self._table.state = table.state; @@ -1976,5 +2006,6 @@ fn _internal(m: &Bound<'_, PyModule>) -> PyResult<()> { m.add_class::()?; m.add_class::()?; m.add_class::()?; + m.add_class::()?; Ok(()) } diff --git a/python/tests/conftest.py b/python/tests/conftest.py index cd3dec4627..8f85f4ab04 100644 --- a/python/tests/conftest.py +++ b/python/tests/conftest.py @@ -228,6 +228,13 @@ def sample_table(): ) +@pytest.fixture() +def existing_sample_table(tmp_path: pathlib.Path, sample_table: pa.Table): + path = str(tmp_path) + write_deltalake(path, sample_table) + return DeltaTable(path) + + @pytest.fixture() def sample_table_with_spaces_numbers(): nrows = 5 diff --git a/python/tests/test_alter.py b/python/tests/test_alter.py index 65ac7e07ac..acc37db822 100644 --- a/python/tests/test_alter.py +++ b/python/tests/test_alter.py @@ -4,7 +4,7 @@ import pyarrow as pa import pytest -from deltalake import DeltaTable, write_deltalake +from deltalake import DeltaTable, TableFeatures, write_deltalake from deltalake.exceptions import DeltaError, DeltaProtocolError from deltalake.schema import Field, PrimitiveType, StructType from deltalake.table import CommitProperties @@ -375,3 +375,85 @@ def test_add_timestamp_ntz_column(tmp_path: pathlib.Path, sample_table: pa.Table assert new_protocol.min_writer_version == 7 assert new_protocol.reader_features == ["timestampNtz"] assert new_protocol.writer_features == ["timestampNtz"] + + +features = [ + TableFeatures.ChangeDataFeed, + TableFeatures.DeletionVectors, + TableFeatures.ColumnMapping, + TableFeatures.TimestampWithoutTimezone, + TableFeatures.V2Checkpoint, + TableFeatures.AppendOnly, + TableFeatures.AppendOnly, + TableFeatures.Invariants, + TableFeatures.CheckConstraints, + TableFeatures.GeneratedColumns, + TableFeatures.IdentityColumns, + TableFeatures.RowTracking, + TableFeatures.DomainMetadata, + TableFeatures.IcebergCompatV1, +] + +all_features = [] +all_features.extend(features) +all_features.append(features) + + +@pytest.mark.parametrize("feature", all_features) +def test_add_feature_variations(existing_table: DeltaTable, feature): + """Existing table already has timestampNtz so it's already at v3,7""" + existing_table.alter.add_feature( + feature=feature, + allow_protocol_versions_increase=False, + ) + last_action = existing_table.history(1)[0] + assert last_action["operation"] == "ADD FEATURE" + assert existing_table.version() == 1 + + +def test_add_features_disallowed_protocol_increase(existing_sample_table: DeltaTable): + with pytest.raises( + DeltaError, + match="Generic DeltaTable error: Table feature enables writer feature, but min_writer is not v7. Set allow_protocol_versions_increase or increase version explicitly through set_tbl_properties", + ): + existing_sample_table.alter.add_feature( + feature=TableFeatures.ChangeDataFeed, + allow_protocol_versions_increase=False, + ) + with pytest.raises( + DeltaError, + match="Generic DeltaTable error: Table feature enables reader and writer feature, but reader is not v3, and writer not v7. Set allow_protocol_versions_increase or increase versions explicitly through set_tbl_properties", + ): + existing_sample_table.alter.add_feature( + feature=TableFeatures.DeletionVectors, + allow_protocol_versions_increase=False, + ) + + +def test_add_feautres(existing_sample_table: DeltaTable): + existing_sample_table.alter.add_feature( + feature=features, + allow_protocol_versions_increase=True, + ) + protocol = existing_sample_table.protocol() + + assert sorted(protocol.reader_features) == sorted( # type: ignore + ["v2Checkpoint", "columnMapping", "deletionVectors", "timestampNtz"] + ) + assert sorted(protocol.writer_features) == sorted( # type: ignore + [ + "appendOnly", + "changeDataFeed", + "checkConstraints", + "columnMapping", + "deletionVectors", + "domainMetadata", + "generatedColumns", + "icebergCompatV1", + "identityColumns", + "invariants", + "rowTracking", + "timestampNtz", + "v2Checkpoint", + ] + ) # type: ignore