Skip to content

Commit

Permalink
add feature operation
Browse files Browse the repository at this point in the history
  • Loading branch information
ion-elgreco authored and rtyler committed Aug 13, 2024
1 parent c446b12 commit eeb903f
Show file tree
Hide file tree
Showing 9 changed files with 409 additions and 6 deletions.
117 changes: 117 additions & 0 deletions crates/core/src/kernel/models/actions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,97 @@ impl Protocol {
}
}

/// High level table features
#[derive(Serialize, Deserialize, Debug, Clone, Eq, PartialEq, Hash)]
#[serde(rename_all = "camelCase")]
pub enum TableFeatures {
/// Mapping of one column to another
ColumnMapping,
/// Deletion vectors for merge, update, delete
DeletionVectors,
/// timestamps without timezone support
#[serde(rename = "timestampNtz")]
TimestampWithoutTimezone,
/// version 2 of checkpointing
V2Checkpoint,
/// Append Only Tables
AppendOnly,
/// Table invariants
Invariants,
/// Check constraints on columns
CheckConstraints,
/// CDF on a table
ChangeDataFeed,
/// Columns with generated values
GeneratedColumns,
/// ID Columns
IdentityColumns,
/// Row tracking on tables
RowTracking,
/// domain specific metadata
DomainMetadata,
/// Iceberg compatibility support
IcebergCompatV1,
}

impl FromStr for TableFeatures {
type Err = ();

fn from_str(value: &str) -> Result<Self, Self::Err> {
match value {
"columnMapping" => Ok(TableFeatures::ColumnMapping),
"deletionVectors" => Ok(TableFeatures::DeletionVectors),
"timestampNtz" => Ok(TableFeatures::TimestampWithoutTimezone),
"v2Checkpoint" => Ok(TableFeatures::V2Checkpoint),
"appendOnly" => Ok(TableFeatures::AppendOnly),
"invariants" => Ok(TableFeatures::Invariants),
"checkConstraints" => Ok(TableFeatures::CheckConstraints),
"changeDataFeed" => Ok(TableFeatures::ChangeDataFeed),
"generatedColumns" => Ok(TableFeatures::GeneratedColumns),
"identityColumns" => Ok(TableFeatures::IdentityColumns),
"rowTracking" => Ok(TableFeatures::RowTracking),
"domainMetadata" => Ok(TableFeatures::DomainMetadata),
"icebergCompatV1" => Ok(TableFeatures::IcebergCompatV1),
_ => Err(()),
}
}
}

impl AsRef<str> for TableFeatures {
fn as_ref(&self) -> &str {
match self {
TableFeatures::ColumnMapping => "columnMapping",
TableFeatures::DeletionVectors => "deletionVectors",
TableFeatures::TimestampWithoutTimezone => "timestampNtz",
TableFeatures::V2Checkpoint => "v2Checkpoint",
TableFeatures::AppendOnly => "appendOnly",
TableFeatures::Invariants => "invariants",
TableFeatures::CheckConstraints => "checkConstraints",
TableFeatures::ChangeDataFeed => "changeDataFeed",
TableFeatures::GeneratedColumns => "generatedColumns",
TableFeatures::IdentityColumns => "identityColumns",
TableFeatures::RowTracking => "rowTracking",
TableFeatures::DomainMetadata => "domainMetadata",
TableFeatures::IcebergCompatV1 => "icebergCompatV1",
}
}
}

impl fmt::Display for TableFeatures {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", self.as_ref())
}
}

impl TableFeatures {
/// Convert table feature to respective reader or/and write feature
pub fn to_reader_writer_features(&self) -> (Option<ReaderFeatures>, Option<WriterFeatures>) {
let reader_feature = ReaderFeatures::try_from(self).ok();
let writer_feature = WriterFeatures::try_from(self).ok();
(reader_feature, writer_feature)
}
}

/// Features table readers can support as well as let users know
/// what is supported
#[derive(Serialize, Deserialize, Debug, Clone, Eq, PartialEq, Hash)]
Expand Down Expand Up @@ -449,6 +540,19 @@ impl fmt::Display for ReaderFeatures {
}
}

impl TryFrom<&TableFeatures> for ReaderFeatures {
type Error = String;

fn try_from(value: &TableFeatures) -> Result<Self, Self::Error> {
match ReaderFeatures::from(value.as_ref()) {
ReaderFeatures::Other(_) => {
Err(format!("Table feature {} is not a reader feature", value))
}
value => Ok(value),
}
}
}

/// Features table writers can support as well as let users know
/// what is supported
#[derive(Serialize, Deserialize, Debug, Clone, Eq, PartialEq, Hash)]
Expand Down Expand Up @@ -540,6 +644,19 @@ impl fmt::Display for WriterFeatures {
}
}

impl TryFrom<&TableFeatures> for WriterFeatures {
type Error = String;

fn try_from(value: &TableFeatures) -> Result<Self, Self::Error> {
match WriterFeatures::from(value.as_ref()) {
WriterFeatures::Other(_) => {
Err(format!("Table feature {} is not a writer feature", value))
}
value => Ok(value),
}
}
}

impl From<&parquet::record::Field> for WriterFeatures {
fn from(value: &parquet::record::Field) -> Self {
match value {
Expand Down
114 changes: 114 additions & 0 deletions crates/core/src/operations/add_feature.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
//! Enable table features
use futures::future::BoxFuture;

use super::transaction::{CommitBuilder, CommitProperties};
use crate::kernel::TableFeatures;
use crate::logstore::LogStoreRef;
use crate::protocol::DeltaOperation;
use crate::table::state::DeltaTableState;
use crate::DeltaTable;
use crate::{DeltaResult, DeltaTableError};

/// Enable table features for a table
pub struct AddTableFeatureBuilder {
/// A snapshot of the table's state
snapshot: DeltaTableState,
/// Name of the feature
name: Option<TableFeatures>,
/// Allow protocol versions to be increased by setting features
allow_protocol_versions_increase: bool,
/// Delta object store for handling data files
log_store: LogStoreRef,
/// Additional information to add to the commit
commit_properties: CommitProperties,
}

impl super::Operation<()> for AddTableFeatureBuilder {}

impl AddTableFeatureBuilder {
/// Create a new builder
pub fn new(log_store: LogStoreRef, snapshot: DeltaTableState) -> Self {
Self {
name: None,
allow_protocol_versions_increase: false,
snapshot,
log_store,
commit_properties: CommitProperties::default(),
}
}

/// Specify the feature to be added
pub fn with_feature<S: Into<TableFeatures>>(mut self, name: S) -> Self {
self.name = Some(name.into());
self
}

/// Specify if you want to allow protocol version to be increased
pub fn with_allow_protocol_versions_increase(mut self, allow: bool) -> Self {
self.allow_protocol_versions_increase = allow;
self
}

/// Additional metadata to be added to commit info
pub fn with_commit_properties(mut self, commit_properties: CommitProperties) -> Self {
self.commit_properties = commit_properties;
self
}
}

impl std::future::IntoFuture for AddTableFeatureBuilder {
type Output = DeltaResult<DeltaTable>;

type IntoFuture = BoxFuture<'static, Self::Output>;

fn into_future(self) -> Self::IntoFuture {
let this = self;

Box::pin(async move {
let name = this
.name
.ok_or(DeltaTableError::Generic("No features provided".to_string()))?;

let (reader_feature, writer_feature) = name.to_reader_writer_features();

let mut protocol = this.snapshot.protocol().clone();

if !this.allow_protocol_versions_increase {
if reader_feature.is_some()
&& writer_feature.is_some()
&& protocol.min_reader_version == 3
&& protocol.min_writer_version == 7
{
return Err(DeltaTableError::Generic("Table feature enables reader and writer feature, but reader is not v3, and writer not v7. Set allow_protocol_versions_increase or increase versions explicitly through set_tbl_properties".to_string()));
} else if reader_feature.is_some() && protocol.min_reader_version < 3 {
return Err(DeltaTableError::Generic("Table feature enables reader feature, but min_reader is not v3. Set allow_protocol_versions_increase or increase version explicitly through set_tbl_properties".to_string()));
} else if writer_feature.is_some() && protocol.min_writer_version < 7 {
return Err(DeltaTableError::Generic("Table feature enables writer feature, but min_writer is not v7. Set allow_protocol_versions_increase or increase version explicitly through set_tbl_properties".to_string()));
}
}

if let Some(reader_feature) = reader_feature {
protocol = protocol.with_reader_features(vec![reader_feature]);
}

if let Some(writer_feature) = writer_feature {
protocol = protocol.with_writer_features(vec![writer_feature]);
}

let operation = DeltaOperation::AddFeature { name };

let actions = vec![protocol.into()];

let commit = CommitBuilder::from(this.commit_properties)
.with_actions(actions)
.build(Some(&this.snapshot), this.log_store.clone(), operation)
.await?;

Ok(DeltaTable::new_with_state(
this.log_store,
commit.snapshot(),
))
})
}
}
8 changes: 8 additions & 0 deletions crates/core/src/operations/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use crate::DeltaTable;
use std::collections::HashMap;

pub mod add_column;
pub mod add_feature;
pub mod cast;
pub mod convert_to_delta;
pub mod create;
Expand All @@ -35,6 +36,7 @@ use self::{
};
#[cfg(feature = "datafusion")]
pub use ::datafusion::physical_plan::common::collect as collect_sendable_stream;
use add_feature::AddTableFeatureBuilder;
#[cfg(feature = "datafusion")]
use arrow::record_batch::RecordBatch;
use optimize::OptimizeBuilder;
Expand Down Expand Up @@ -220,6 +222,12 @@ impl DeltaOps {
ConstraintBuilder::new(self.0.log_store, self.0.state.unwrap())
}

/// Enable a table feature for a table
#[must_use]
pub fn add_feature(self) -> AddTableFeatureBuilder {
AddTableFeatureBuilder::new(self.0.log_store, self.0.state.unwrap())
}

/// Drops constraints from a table
#[cfg(feature = "datafusion")]
#[must_use]
Expand Down
10 changes: 9 additions & 1 deletion crates/core/src/protocol/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use std::str::FromStr;
use tracing::{debug, error};

use crate::errors::{DeltaResult, DeltaTableError};
use crate::kernel::{Add, CommitInfo, Metadata, Protocol, Remove, StructField};
use crate::kernel::{Add, CommitInfo, Metadata, Protocol, Remove, StructField, TableFeatures};
use crate::logstore::LogStore;
use crate::table::CheckPoint;

Expand Down Expand Up @@ -369,6 +369,12 @@ pub enum DeltaOperation {
expr: String,
},

/// Add table features to a table
AddFeature {
/// Name of the feature
name: TableFeatures,
},

/// Drops constraints from a table
DropConstraint {
/// Constraints name
Expand Down Expand Up @@ -475,6 +481,7 @@ impl DeltaOperation {
DeltaOperation::VacuumEnd { .. } => "VACUUM END",
DeltaOperation::AddConstraint { .. } => "ADD CONSTRAINT",
DeltaOperation::DropConstraint { .. } => "DROP CONSTRAINT",
DeltaOperation::AddFeature { .. } => "ADD FEATURE",
}
}

Expand Down Expand Up @@ -513,6 +520,7 @@ impl DeltaOperation {
Self::Optimize { .. }
| Self::SetTableProperties { .. }
| Self::AddColumn { .. }
| Self::AddFeature { .. }
| Self::VacuumStart { .. }
| Self::VacuumEnd { .. }
| Self::AddConstraint { .. }
Expand Down
1 change: 1 addition & 0 deletions python/deltalake/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from ._internal import TableFeatures as TableFeatures
from ._internal import __version__ as __version__
from ._internal import rust_core_version as rust_core_version
from .data_catalog import DataCatalog as DataCatalog
Expand Down
36 changes: 36 additions & 0 deletions python/deltalake/_internal.pyi
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from enum import Enum
from typing import Any, Dict, List, Literal, Mapping, Optional, Tuple, Union

import pyarrow
Expand All @@ -7,6 +8,34 @@ from deltalake.writer import AddAction

__version__: str

class TableFeatures(Enum):
# Mapping of one column to another
ColumnMapping = "ColumnMapping"
# Deletion vectors for merge, update, delete
DeletionVectors = "DeletionVectors"
# timestamps without timezone support
TimestampWithoutTimezone = "TimestampWithoutTimezone"
# version 2 of checkpointing
V2Checkpoint = "V2Checkpoint"
# Append Only Tables
AppendOnly = "AppendOnly"
# Table invariants
Invariants = "Invariants"
# Check constraints on columns
CheckConstraints = "CheckConstraints"
# CDF on a table
ChangeDataFeed = "ChangeDataFeed"
# Columns with generated values
GeneratedColumns = "GeneratedColumns"
# ID Columns
IdentityColumns = "IdentityColumns"
# Row tracking on tables
RowTracking = "RowTracking"
# domain specific metadata
DomainMetadata = "DomainMetadata"
# Iceberg compatibility support
IcebergCompatV1 = "IcebergCompatV1"

class RawDeltaTableMetaData:
id: int
name: str
Expand Down Expand Up @@ -89,6 +118,13 @@ class RawDeltaTable:
custom_metadata: Optional[Dict[str, str]],
post_commithook_properties: Optional[Dict[str, Optional[bool]]],
) -> None: ...
def add_feature(
self,
feature: TableFeatures,
allow_protocol_versions_increase: bool,
custom_metadata: Optional[Dict[str, str]],
post_commithook_properties: Optional[Dict[str, Optional[bool]]],
) -> None: ...
def add_constraints(
self,
constraints: Dict[str, str],
Expand Down
Loading

0 comments on commit eeb903f

Please sign in to comment.