From eeb903fec992e54451abd09b15059bcb8b6d1afc Mon Sep 17 00:00:00 2001 From: ion-elgreco <15728914+ion-elgreco@users.noreply.github.com> Date: Sun, 28 Jul 2024 14:33:40 +0300 Subject: [PATCH] add feature operation --- crates/core/src/kernel/models/actions.rs | 117 ++++++++++++++++++++++ crates/core/src/operations/add_feature.rs | 114 +++++++++++++++++++++ crates/core/src/operations/mod.rs | 8 ++ crates/core/src/protocol/mod.rs | 10 +- python/deltalake/__init__.py | 1 + python/deltalake/_internal.pyi | 36 +++++++ python/deltalake/table.py | 41 +++++++- python/src/features.rs | 56 +++++++++++ python/src/lib.rs | 32 ++++++ 9 files changed, 409 insertions(+), 6 deletions(-) create mode 100644 crates/core/src/operations/add_feature.rs create mode 100644 python/src/features.rs diff --git a/crates/core/src/kernel/models/actions.rs b/crates/core/src/kernel/models/actions.rs index f5c1129dfc..739a585509 100644 --- a/crates/core/src/kernel/models/actions.rs +++ b/crates/core/src/kernel/models/actions.rs @@ -377,6 +377,97 @@ impl Protocol { } } +/// High level table features +#[derive(Serialize, Deserialize, Debug, Clone, Eq, PartialEq, Hash)] +#[serde(rename_all = "camelCase")] +pub enum TableFeatures { + /// Mapping of one column to another + ColumnMapping, + /// Deletion vectors for merge, update, delete + DeletionVectors, + /// timestamps without timezone support + #[serde(rename = "timestampNtz")] + TimestampWithoutTimezone, + /// version 2 of checkpointing + V2Checkpoint, + /// Append Only Tables + AppendOnly, + /// Table invariants + Invariants, + /// Check constraints on columns + CheckConstraints, + /// CDF on a table + ChangeDataFeed, + /// Columns with generated values + GeneratedColumns, + /// ID Columns + IdentityColumns, + /// Row tracking on tables + RowTracking, + /// domain specific metadata + DomainMetadata, + /// Iceberg compatibility support + IcebergCompatV1, +} + +impl FromStr for TableFeatures { + type Err = (); + + fn from_str(value: &str) -> Result { + match value { + "columnMapping" => Ok(TableFeatures::ColumnMapping), + "deletionVectors" => Ok(TableFeatures::DeletionVectors), + "timestampNtz" => Ok(TableFeatures::TimestampWithoutTimezone), + "v2Checkpoint" => Ok(TableFeatures::V2Checkpoint), + "appendOnly" => Ok(TableFeatures::AppendOnly), + "invariants" => Ok(TableFeatures::Invariants), + "checkConstraints" => Ok(TableFeatures::CheckConstraints), + "changeDataFeed" => Ok(TableFeatures::ChangeDataFeed), + "generatedColumns" => Ok(TableFeatures::GeneratedColumns), + "identityColumns" => Ok(TableFeatures::IdentityColumns), + "rowTracking" => Ok(TableFeatures::RowTracking), + "domainMetadata" => Ok(TableFeatures::DomainMetadata), + "icebergCompatV1" => Ok(TableFeatures::IcebergCompatV1), + _ => Err(()), + } + } +} + +impl AsRef for TableFeatures { + fn as_ref(&self) -> &str { + match self { + TableFeatures::ColumnMapping => "columnMapping", + TableFeatures::DeletionVectors => "deletionVectors", + TableFeatures::TimestampWithoutTimezone => "timestampNtz", + TableFeatures::V2Checkpoint => "v2Checkpoint", + TableFeatures::AppendOnly => "appendOnly", + TableFeatures::Invariants => "invariants", + TableFeatures::CheckConstraints => "checkConstraints", + TableFeatures::ChangeDataFeed => "changeDataFeed", + TableFeatures::GeneratedColumns => "generatedColumns", + TableFeatures::IdentityColumns => "identityColumns", + TableFeatures::RowTracking => "rowTracking", + TableFeatures::DomainMetadata => "domainMetadata", + TableFeatures::IcebergCompatV1 => "icebergCompatV1", + } + } +} + +impl fmt::Display for TableFeatures { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{}", self.as_ref()) + } +} + +impl TableFeatures { + /// Convert table feature to respective reader or/and write feature + pub fn to_reader_writer_features(&self) -> (Option, Option) { + let reader_feature = ReaderFeatures::try_from(self).ok(); + let writer_feature = WriterFeatures::try_from(self).ok(); + (reader_feature, writer_feature) + } +} + /// Features table readers can support as well as let users know /// what is supported #[derive(Serialize, Deserialize, Debug, Clone, Eq, PartialEq, Hash)] @@ -449,6 +540,19 @@ impl fmt::Display for ReaderFeatures { } } +impl TryFrom<&TableFeatures> for ReaderFeatures { + type Error = String; + + fn try_from(value: &TableFeatures) -> Result { + match ReaderFeatures::from(value.as_ref()) { + ReaderFeatures::Other(_) => { + Err(format!("Table feature {} is not a reader feature", value)) + } + value => Ok(value), + } + } +} + /// Features table writers can support as well as let users know /// what is supported #[derive(Serialize, Deserialize, Debug, Clone, Eq, PartialEq, Hash)] @@ -540,6 +644,19 @@ impl fmt::Display for WriterFeatures { } } +impl TryFrom<&TableFeatures> for WriterFeatures { + type Error = String; + + fn try_from(value: &TableFeatures) -> Result { + match WriterFeatures::from(value.as_ref()) { + WriterFeatures::Other(_) => { + Err(format!("Table feature {} is not a writer feature", value)) + } + value => Ok(value), + } + } +} + impl From<&parquet::record::Field> for WriterFeatures { fn from(value: &parquet::record::Field) -> Self { match value { diff --git a/crates/core/src/operations/add_feature.rs b/crates/core/src/operations/add_feature.rs new file mode 100644 index 0000000000..bab8176aa1 --- /dev/null +++ b/crates/core/src/operations/add_feature.rs @@ -0,0 +1,114 @@ +//! Enable table features + +use futures::future::BoxFuture; + +use super::transaction::{CommitBuilder, CommitProperties}; +use crate::kernel::TableFeatures; +use crate::logstore::LogStoreRef; +use crate::protocol::DeltaOperation; +use crate::table::state::DeltaTableState; +use crate::DeltaTable; +use crate::{DeltaResult, DeltaTableError}; + +/// Enable table features for a table +pub struct AddTableFeatureBuilder { + /// A snapshot of the table's state + snapshot: DeltaTableState, + /// Name of the feature + name: Option, + /// Allow protocol versions to be increased by setting features + allow_protocol_versions_increase: bool, + /// Delta object store for handling data files + log_store: LogStoreRef, + /// Additional information to add to the commit + commit_properties: CommitProperties, +} + +impl super::Operation<()> for AddTableFeatureBuilder {} + +impl AddTableFeatureBuilder { + /// Create a new builder + pub fn new(log_store: LogStoreRef, snapshot: DeltaTableState) -> Self { + Self { + name: None, + allow_protocol_versions_increase: false, + snapshot, + log_store, + commit_properties: CommitProperties::default(), + } + } + + /// Specify the feature to be added + pub fn with_feature>(mut self, name: S) -> Self { + self.name = Some(name.into()); + self + } + + /// Specify if you want to allow protocol version to be increased + pub fn with_allow_protocol_versions_increase(mut self, allow: bool) -> Self { + self.allow_protocol_versions_increase = allow; + self + } + + /// Additional metadata to be added to commit info + pub fn with_commit_properties(mut self, commit_properties: CommitProperties) -> Self { + self.commit_properties = commit_properties; + self + } +} + +impl std::future::IntoFuture for AddTableFeatureBuilder { + type Output = DeltaResult; + + type IntoFuture = BoxFuture<'static, Self::Output>; + + fn into_future(self) -> Self::IntoFuture { + let this = self; + + Box::pin(async move { + let name = this + .name + .ok_or(DeltaTableError::Generic("No features provided".to_string()))?; + + let (reader_feature, writer_feature) = name.to_reader_writer_features(); + + let mut protocol = this.snapshot.protocol().clone(); + + if !this.allow_protocol_versions_increase { + if reader_feature.is_some() + && writer_feature.is_some() + && protocol.min_reader_version == 3 + && protocol.min_writer_version == 7 + { + return Err(DeltaTableError::Generic("Table feature enables reader and writer feature, but reader is not v3, and writer not v7. Set allow_protocol_versions_increase or increase versions explicitly through set_tbl_properties".to_string())); + } else if reader_feature.is_some() && protocol.min_reader_version < 3 { + return Err(DeltaTableError::Generic("Table feature enables reader feature, but min_reader is not v3. Set allow_protocol_versions_increase or increase version explicitly through set_tbl_properties".to_string())); + } else if writer_feature.is_some() && protocol.min_writer_version < 7 { + return Err(DeltaTableError::Generic("Table feature enables writer feature, but min_writer is not v7. Set allow_protocol_versions_increase or increase version explicitly through set_tbl_properties".to_string())); + } + } + + if let Some(reader_feature) = reader_feature { + protocol = protocol.with_reader_features(vec![reader_feature]); + } + + if let Some(writer_feature) = writer_feature { + protocol = protocol.with_writer_features(vec![writer_feature]); + } + + let operation = DeltaOperation::AddFeature { name }; + + let actions = vec![protocol.into()]; + + let commit = CommitBuilder::from(this.commit_properties) + .with_actions(actions) + .build(Some(&this.snapshot), this.log_store.clone(), operation) + .await?; + + Ok(DeltaTable::new_with_state( + this.log_store, + commit.snapshot(), + )) + }) + } +} diff --git a/crates/core/src/operations/mod.rs b/crates/core/src/operations/mod.rs index 3e4180763f..957c06b500 100644 --- a/crates/core/src/operations/mod.rs +++ b/crates/core/src/operations/mod.rs @@ -17,6 +17,7 @@ use crate::DeltaTable; use std::collections::HashMap; pub mod add_column; +pub mod add_feature; pub mod cast; pub mod convert_to_delta; pub mod create; @@ -35,6 +36,7 @@ use self::{ }; #[cfg(feature = "datafusion")] pub use ::datafusion::physical_plan::common::collect as collect_sendable_stream; +use add_feature::AddTableFeatureBuilder; #[cfg(feature = "datafusion")] use arrow::record_batch::RecordBatch; use optimize::OptimizeBuilder; @@ -220,6 +222,12 @@ impl DeltaOps { ConstraintBuilder::new(self.0.log_store, self.0.state.unwrap()) } + /// Enable a table feature for a table + #[must_use] + pub fn add_feature(self) -> AddTableFeatureBuilder { + AddTableFeatureBuilder::new(self.0.log_store, self.0.state.unwrap()) + } + /// Drops constraints from a table #[cfg(feature = "datafusion")] #[must_use] diff --git a/crates/core/src/protocol/mod.rs b/crates/core/src/protocol/mod.rs index 87ed42939a..dd5fe5ae07 100644 --- a/crates/core/src/protocol/mod.rs +++ b/crates/core/src/protocol/mod.rs @@ -21,7 +21,7 @@ use std::str::FromStr; use tracing::{debug, error}; use crate::errors::{DeltaResult, DeltaTableError}; -use crate::kernel::{Add, CommitInfo, Metadata, Protocol, Remove, StructField}; +use crate::kernel::{Add, CommitInfo, Metadata, Protocol, Remove, StructField, TableFeatures}; use crate::logstore::LogStore; use crate::table::CheckPoint; @@ -369,6 +369,12 @@ pub enum DeltaOperation { expr: String, }, + /// Add table features to a table + AddFeature { + /// Name of the feature + name: TableFeatures, + }, + /// Drops constraints from a table DropConstraint { /// Constraints name @@ -475,6 +481,7 @@ impl DeltaOperation { DeltaOperation::VacuumEnd { .. } => "VACUUM END", DeltaOperation::AddConstraint { .. } => "ADD CONSTRAINT", DeltaOperation::DropConstraint { .. } => "DROP CONSTRAINT", + DeltaOperation::AddFeature { .. } => "ADD FEATURE", } } @@ -513,6 +520,7 @@ impl DeltaOperation { Self::Optimize { .. } | Self::SetTableProperties { .. } | Self::AddColumn { .. } + | Self::AddFeature { .. } | Self::VacuumStart { .. } | Self::VacuumEnd { .. } | Self::AddConstraint { .. } diff --git a/python/deltalake/__init__.py b/python/deltalake/__init__.py index 60579a33a0..27296e160c 100644 --- a/python/deltalake/__init__.py +++ b/python/deltalake/__init__.py @@ -1,3 +1,4 @@ +from ._internal import TableFeatures as TableFeatures from ._internal import __version__ as __version__ from ._internal import rust_core_version as rust_core_version from .data_catalog import DataCatalog as DataCatalog diff --git a/python/deltalake/_internal.pyi b/python/deltalake/_internal.pyi index 27033cb9d8..af45d517b2 100644 --- a/python/deltalake/_internal.pyi +++ b/python/deltalake/_internal.pyi @@ -1,3 +1,4 @@ +from enum import Enum from typing import Any, Dict, List, Literal, Mapping, Optional, Tuple, Union import pyarrow @@ -7,6 +8,34 @@ from deltalake.writer import AddAction __version__: str +class TableFeatures(Enum): + # Mapping of one column to another + ColumnMapping = "ColumnMapping" + # Deletion vectors for merge, update, delete + DeletionVectors = "DeletionVectors" + # timestamps without timezone support + TimestampWithoutTimezone = "TimestampWithoutTimezone" + # version 2 of checkpointing + V2Checkpoint = "V2Checkpoint" + # Append Only Tables + AppendOnly = "AppendOnly" + # Table invariants + Invariants = "Invariants" + # Check constraints on columns + CheckConstraints = "CheckConstraints" + # CDF on a table + ChangeDataFeed = "ChangeDataFeed" + # Columns with generated values + GeneratedColumns = "GeneratedColumns" + # ID Columns + IdentityColumns = "IdentityColumns" + # Row tracking on tables + RowTracking = "RowTracking" + # domain specific metadata + DomainMetadata = "DomainMetadata" + # Iceberg compatibility support + IcebergCompatV1 = "IcebergCompatV1" + class RawDeltaTableMetaData: id: int name: str @@ -89,6 +118,13 @@ class RawDeltaTable: custom_metadata: Optional[Dict[str, str]], post_commithook_properties: Optional[Dict[str, Optional[bool]]], ) -> None: ... + def add_feature( + self, + feature: TableFeatures, + allow_protocol_versions_increase: bool, + custom_metadata: Optional[Dict[str, str]], + post_commithook_properties: Optional[Dict[str, Optional[bool]]], + ) -> None: ... def add_constraints( self, constraints: Dict[str, str], diff --git a/python/deltalake/table.py b/python/deltalake/table.py index 5cd0d252cf..69bc31febc 100644 --- a/python/deltalake/table.py +++ b/python/deltalake/table.py @@ -38,7 +38,7 @@ if TYPE_CHECKING: import os -from deltalake._internal import RawDeltaTable +from deltalake._internal import RawDeltaTable, TableFeatures from deltalake._internal import create_deltalake as _create_deltalake from deltalake._util import encode_partition_value from deltalake.data_catalog import DataCatalog @@ -1798,22 +1798,53 @@ class TableAlterer: def __init__(self, table: DeltaTable) -> None: self.table = table - def add_columns( + def add_feature( self, - fields: Union[DeltaField, List[DeltaField]], + feature: TableFeatures, + allow_protocol_versions_increase: bool = False, custom_metadata: Optional[Dict[str, str]] = None, post_commithook_properties: Optional[PostCommitHookProperties] = None, ) -> None: - """Add new columns and/or update the fields of a stuctcolumn + """ + Enable a table feature. Args: - fields: fields to merge into schema + feature: Table Feature e.g. Deletion Vectors, Change Data Feed + allow_protocol_versions_increase: Allow the protocol to be implicitily bumped to reader 3 or writer 7 custom_metadata: custom metadata that will be added to the transaction commit. post_commithook_properties: properties for the post commit hook. If None, default values are used. Example: ```python from deltalake import DeltaTable + dt = DeltaTable("test_table") + dt.alter.add_feature(TableFeatures.AppendOnly) + ``` + + **Check protocol** + ``` + dt.protocol() + ProtocolVersions(min_reader_version=1, min_writer_version=7, writer_features=['appendOnly'], reader_features=None) + ``` + """ + + self.table._table.add_feature( + feature, + allow_protocol_versions_increase, + custom_metadata, + post_commithook_properties.__dict__ if post_commithook_properties else None, + ) + + def add_columns( + self, + fields: Union[DeltaField, List[DeltaField]], + custom_metadata: Optional[Dict[str, str]] = None, + post_commithook_properties: Optional[PostCommitHookProperties] = None, + ) -> None: + """Add new columns and/or update the fields of a stuctcolumn + + Args: + fields: fields to merge into schema from deltalake.schema import Field, PrimitiveType, StructType dt = DeltaTable("test_table") new_fields = [ diff --git a/python/src/features.rs b/python/src/features.rs new file mode 100644 index 0000000000..155f7aa365 --- /dev/null +++ b/python/src/features.rs @@ -0,0 +1,56 @@ +use deltalake::kernel::TableFeatures as KernelTableFeatures; +use pyo3::pyclass; + +/// High level table features +#[pyclass] +#[derive(Clone)] +pub enum TableFeatures { + /// Mapping of one column to another + ColumnMapping, + /// Deletion vectors for merge, update, delete + DeletionVectors, + /// timestamps without timezone support + TimestampWithoutTimezone, + /// version 2 of checkpointing + V2Checkpoint, + /// Append Only Tables + AppendOnly, + /// Table invariants + Invariants, + /// Check constraints on columns + CheckConstraints, + /// CDF on a table + ChangeDataFeed, + /// Columns with generated values + GeneratedColumns, + /// ID Columns + IdentityColumns, + /// Row tracking on tables + RowTracking, + /// domain specific metadata + DomainMetadata, + /// Iceberg compatibility support + IcebergCompatV1, +} + +impl From for KernelTableFeatures { + fn from(value: TableFeatures) -> Self { + match value { + TableFeatures::ColumnMapping => KernelTableFeatures::ColumnMapping, + TableFeatures::DeletionVectors => KernelTableFeatures::DeletionVectors, + TableFeatures::TimestampWithoutTimezone => { + KernelTableFeatures::TimestampWithoutTimezone + } + TableFeatures::V2Checkpoint => KernelTableFeatures::V2Checkpoint, + TableFeatures::AppendOnly => KernelTableFeatures::AppendOnly, + TableFeatures::Invariants => KernelTableFeatures::Invariants, + TableFeatures::CheckConstraints => KernelTableFeatures::CheckConstraints, + TableFeatures::ChangeDataFeed => KernelTableFeatures::ChangeDataFeed, + TableFeatures::GeneratedColumns => KernelTableFeatures::GeneratedColumns, + TableFeatures::IdentityColumns => KernelTableFeatures::IdentityColumns, + TableFeatures::RowTracking => KernelTableFeatures::RowTracking, + TableFeatures::DomainMetadata => KernelTableFeatures::DomainMetadata, + TableFeatures::IcebergCompatV1 => KernelTableFeatures::IcebergCompatV1, + } + } +} diff --git a/python/src/lib.rs b/python/src/lib.rs index 3947216285..d615b2caee 100644 --- a/python/src/lib.rs +++ b/python/src/lib.rs @@ -1,4 +1,5 @@ mod error; +mod features; mod filesystem; mod schema; mod utils; @@ -27,10 +28,12 @@ use deltalake::datafusion::prelude::SessionContext; use deltalake::delta_datafusion::cdf::FileAction; use deltalake::delta_datafusion::DeltaDataChecker; use deltalake::errors::DeltaTableError; +use deltalake::kernel::TableFeatures as KernelTableFeatures; use deltalake::kernel::{ scalars::ScalarExt, Action, Add, Invariant, LogicalFile, Remove, StructType, }; use deltalake::operations::add_column::AddColumnBuilder; +use deltalake::operations::add_feature::AddTableFeatureBuilder; use deltalake::operations::collect_sendable_stream; use deltalake::operations::constraints::ConstraintBuilder; use deltalake::operations::convert_to_delta::{ConvertToDeltaBuilder, PartitionStrategy}; @@ -63,6 +66,7 @@ use serde_json::{Map, Value}; use crate::error::DeltaProtocolError; use crate::error::PythonError; +use crate::features::TableFeatures; use crate::filesystem::FsConfig; use crate::schema::{schema_to_pyobject, Field}; use crate::utils::rt; @@ -567,7 +571,34 @@ impl RawDeltaTable { { cmd = cmd.with_commit_properties(commit_properties); } + rt().block_on(cmd.into_future()).map_err(PythonError::from) + })?; + self._table.state = table.state; + Ok(()) + } + + #[pyo3(signature = (feature, allow_protocol_versions_increase, custom_metadata=None, post_commithook_properties=None))] + pub fn add_feature( + &mut self, + py: Python, + feature: TableFeatures, + allow_protocol_versions_increase: bool, + custom_metadata: Option>, + post_commithook_properties: Option>>, + ) -> PyResult<()> { + let table = py.allow_threads(|| { + let mut cmd = AddTableFeatureBuilder::new( + self._table.log_store(), + self._table.snapshot().map_err(PythonError::from)?.clone(), + ) + .with_feature(Into::::into(feature)) + .with_allow_protocol_versions_increase(allow_protocol_versions_increase); + if let Some(commit_properties) = + maybe_create_commit_properties(custom_metadata, post_commithook_properties) + { + cmd = cmd.with_commit_properties(commit_properties); + } rt().block_on(cmd.into_future()).map_err(PythonError::from) })?; self._table.state = table.state; @@ -2025,5 +2056,6 @@ fn _internal(m: &Bound<'_, PyModule>) -> PyResult<()> { m.add_class::()?; m.add_class::()?; m.add_class::()?; + m.add_class::()?; Ok(()) }