Skip to content

Commit

Permalink
feat: add feature operation
Browse files Browse the repository at this point in the history
  • Loading branch information
ion-elgreco committed Sep 14, 2024
1 parent 73107a7 commit 6e78f47
Show file tree
Hide file tree
Showing 11 changed files with 585 additions and 7 deletions.
117 changes: 117 additions & 0 deletions crates/core/src/kernel/models/actions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,97 @@ impl Protocol {
}
}

/// High level table features
#[derive(Serialize, Deserialize, Debug, Clone, Eq, PartialEq, Hash)]
#[serde(rename_all = "camelCase")]
pub enum TableFeatures {
/// Mapping of one column to another
ColumnMapping,
/// Deletion vectors for merge, update, delete
DeletionVectors,
/// timestamps without timezone support
#[serde(rename = "timestampNtz")]
TimestampWithoutTimezone,
/// version 2 of checkpointing
V2Checkpoint,
/// Append Only Tables
AppendOnly,
/// Table invariants
Invariants,
/// Check constraints on columns
CheckConstraints,
/// CDF on a table
ChangeDataFeed,
/// Columns with generated values
GeneratedColumns,
/// ID Columns
IdentityColumns,
/// Row tracking on tables
RowTracking,
/// domain specific metadata
DomainMetadata,
/// Iceberg compatibility support
IcebergCompatV1,
}

impl FromStr for TableFeatures {
type Err = ();

fn from_str(value: &str) -> Result<Self, Self::Err> {
match value {
"columnMapping" => Ok(TableFeatures::ColumnMapping),
"deletionVectors" => Ok(TableFeatures::DeletionVectors),
"timestampNtz" => Ok(TableFeatures::TimestampWithoutTimezone),
"v2Checkpoint" => Ok(TableFeatures::V2Checkpoint),
"appendOnly" => Ok(TableFeatures::AppendOnly),
"invariants" => Ok(TableFeatures::Invariants),
"checkConstraints" => Ok(TableFeatures::CheckConstraints),
"changeDataFeed" => Ok(TableFeatures::ChangeDataFeed),
"generatedColumns" => Ok(TableFeatures::GeneratedColumns),
"identityColumns" => Ok(TableFeatures::IdentityColumns),
"rowTracking" => Ok(TableFeatures::RowTracking),
"domainMetadata" => Ok(TableFeatures::DomainMetadata),
"icebergCompatV1" => Ok(TableFeatures::IcebergCompatV1),
_ => Err(()),
}
}
}

impl AsRef<str> for TableFeatures {
fn as_ref(&self) -> &str {
match self {
TableFeatures::ColumnMapping => "columnMapping",
TableFeatures::DeletionVectors => "deletionVectors",
TableFeatures::TimestampWithoutTimezone => "timestampNtz",
TableFeatures::V2Checkpoint => "v2Checkpoint",
TableFeatures::AppendOnly => "appendOnly",
TableFeatures::Invariants => "invariants",
TableFeatures::CheckConstraints => "checkConstraints",
TableFeatures::ChangeDataFeed => "changeDataFeed",
TableFeatures::GeneratedColumns => "generatedColumns",
TableFeatures::IdentityColumns => "identityColumns",
TableFeatures::RowTracking => "rowTracking",
TableFeatures::DomainMetadata => "domainMetadata",
TableFeatures::IcebergCompatV1 => "icebergCompatV1",
}
}
}

impl fmt::Display for TableFeatures {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", self.as_ref())
}
}

impl TableFeatures {
/// Convert table feature to respective reader or/and write feature
pub fn to_reader_writer_features(&self) -> (Option<ReaderFeatures>, Option<WriterFeatures>) {
let reader_feature = ReaderFeatures::try_from(self).ok();
let writer_feature = WriterFeatures::try_from(self).ok();
(reader_feature, writer_feature)
}
}

/// Features table readers can support as well as let users know
/// what is supported
#[derive(Serialize, Deserialize, Debug, Clone, Eq, PartialEq, Hash)]
Expand Down Expand Up @@ -449,6 +540,19 @@ impl fmt::Display for ReaderFeatures {
}
}

impl TryFrom<&TableFeatures> for ReaderFeatures {
type Error = String;

fn try_from(value: &TableFeatures) -> Result<Self, Self::Error> {
match ReaderFeatures::from(value.as_ref()) {
ReaderFeatures::Other(_) => {
Err(format!("Table feature {} is not a reader feature", value))
}
value => Ok(value),
}
}
}

/// Features table writers can support as well as let users know
/// what is supported
#[derive(Serialize, Deserialize, Debug, Clone, Eq, PartialEq, Hash)]
Expand Down Expand Up @@ -540,6 +644,19 @@ impl fmt::Display for WriterFeatures {
}
}

impl TryFrom<&TableFeatures> for WriterFeatures {
type Error = String;

fn try_from(value: &TableFeatures) -> Result<Self, Self::Error> {
match WriterFeatures::from(value.as_ref()) {
WriterFeatures::Other(_) => {
Err(format!("Table feature {} is not a writer feature", value))
}
value => Ok(value),
}
}
}

impl From<&parquet::record::Field> for WriterFeatures {
fn from(value: &parquet::record::Field) -> Self {
match value {
Expand Down
196 changes: 196 additions & 0 deletions crates/core/src/operations/add_feature.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,196 @@
//! Enable table features

use futures::future::BoxFuture;
use itertools::Itertools;

use super::transaction::{CommitBuilder, CommitProperties};
use crate::kernel::{ReaderFeatures, TableFeatures, WriterFeatures};
use crate::logstore::LogStoreRef;
use crate::protocol::DeltaOperation;
use crate::table::state::DeltaTableState;
use crate::DeltaTable;
use crate::{DeltaResult, DeltaTableError};

/// Enable table features for a table
pub struct AddTableFeatureBuilder {
/// A snapshot of the table's state
snapshot: DeltaTableState,
/// Name of the feature
name: Vec<TableFeatures>,
/// Allow protocol versions to be increased by setting features
allow_protocol_versions_increase: bool,
/// Delta object store for handling data files
log_store: LogStoreRef,
/// Additional information to add to the commit
commit_properties: CommitProperties,
}

impl super::Operation<()> for AddTableFeatureBuilder {}

impl AddTableFeatureBuilder {
/// Create a new builder
pub fn new(log_store: LogStoreRef, snapshot: DeltaTableState) -> Self {
Self {
name: vec![],
allow_protocol_versions_increase: false,
snapshot,
log_store,
commit_properties: CommitProperties::default(),
}
}

/// Specify the features to be added
pub fn with_feature<S: Into<TableFeatures>>(mut self, name: S) -> Self {
self.name.push(name.into());
self
}

/// Specify the features to be added
pub fn with_features<S: Into<TableFeatures>>(mut self, name: Vec<S>) -> Self {
self.name
.extend(name.into_iter().map(Into::into).collect_vec());
self
}

/// Specify if you want to allow protocol version to be increased
pub fn with_allow_protocol_versions_increase(mut self, allow: bool) -> Self {
self.allow_protocol_versions_increase = allow;
self
}

/// Additional metadata to be added to commit info
pub fn with_commit_properties(mut self, commit_properties: CommitProperties) -> Self {
self.commit_properties = commit_properties;
self
}
}

impl std::future::IntoFuture for AddTableFeatureBuilder {
type Output = DeltaResult<DeltaTable>;

type IntoFuture = BoxFuture<'static, Self::Output>;

fn into_future(self) -> Self::IntoFuture {
let this = self;

Box::pin(async move {
let name = if this.name.is_empty() {
return Err(DeltaTableError::Generic("No features provided".to_string()));
} else {
this.name
};
let (reader_features, writer_features): (
Vec<Option<ReaderFeatures>>,
Vec<Option<WriterFeatures>>,
) = name.iter().map(|v| v.to_reader_writer_features()).unzip();
let reader_features = reader_features.into_iter().flatten().collect_vec();
let writer_features = writer_features.into_iter().flatten().collect_vec();

let mut protocol = this.snapshot.protocol().clone();

if !this.allow_protocol_versions_increase {
if !reader_features.is_empty()
&& !writer_features.is_empty()
&& !(protocol.min_reader_version == 3 && protocol.min_writer_version == 7)
{
return Err(DeltaTableError::Generic("Table feature enables reader and writer feature, but reader is not v3, and writer not v7. Set allow_protocol_versions_increase or increase versions explicitly through set_tbl_properties".to_string()));
} else if !reader_features.is_empty() && protocol.min_reader_version < 3 {
return Err(DeltaTableError::Generic("Table feature enables reader feature, but min_reader is not v3. Set allow_protocol_versions_increase or increase version explicitly through set_tbl_properties".to_string()));
} else if !writer_features.is_empty() && protocol.min_writer_version < 7 {
return Err(DeltaTableError::Generic("Table feature enables writer feature, but min_writer is not v7. Set allow_protocol_versions_increase or increase version explicitly through set_tbl_properties".to_string()));
}
}

protocol = protocol.with_reader_features(reader_features);
protocol = protocol.with_writer_features(writer_features);

let operation = DeltaOperation::AddFeature { name };

let actions = vec![protocol.into()];

let commit = CommitBuilder::from(this.commit_properties)
.with_actions(actions)
.build(Some(&this.snapshot), this.log_store.clone(), operation)
.await?;

Ok(DeltaTable::new_with_state(
this.log_store,
commit.snapshot(),
))
})
}
}

#[cfg(feature = "datafusion")]
#[cfg(test)]
mod tests {
use delta_kernel::DeltaResult;

use crate::{
kernel::TableFeatures,
writer::test_utils::{create_bare_table, get_record_batch},
DeltaOps,
};

#[tokio::test]
async fn add_feature() -> DeltaResult<()> {
let batch = get_record_batch(None, false);
let write = DeltaOps(create_bare_table())
.write(vec![batch.clone()])
.await
.unwrap();
let table = DeltaOps(write);
let result = table
.add_feature()
.with_feature(TableFeatures::ChangeDataFeed)
.with_allow_protocol_versions_increase(true)
.await
.unwrap();

assert!(&result
.protocol()
.cloned()
.unwrap()
.writer_features
.unwrap_or_default()
.contains(&crate::kernel::WriterFeatures::ChangeDataFeed));

let result = DeltaOps(result)
.add_feature()
.with_feature(TableFeatures::DeletionVectors)
.with_allow_protocol_versions_increase(true)
.await
.unwrap();

let current_protocol = &result.protocol().cloned().unwrap();
assert!(&current_protocol
.writer_features
.clone()
.unwrap_or_default()
.contains(&crate::kernel::WriterFeatures::DeletionVectors));
assert!(&current_protocol
.reader_features
.clone()
.unwrap_or_default()
.contains(&crate::kernel::ReaderFeatures::DeletionVectors));
assert_eq!(result.version(), 2);
Ok(())
}

#[tokio::test]
async fn add_feature_disallowed_increase() -> DeltaResult<()> {
let batch = get_record_batch(None, false);
let write = DeltaOps(create_bare_table())
.write(vec![batch.clone()])
.await
.unwrap();
let table = DeltaOps(write);
let result = table
.add_feature()
.with_feature(TableFeatures::ChangeDataFeed)
.await;

assert!(result.is_err());
Ok(())
}
}
8 changes: 8 additions & 0 deletions crates/core/src/operations/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
//! if the operation returns data as well.
use std::collections::HashMap;

use add_feature::AddTableFeatureBuilder;
#[cfg(feature = "datafusion")]
use arrow_array::RecordBatch;
#[cfg(feature = "datafusion")]
Expand All @@ -31,6 +32,7 @@ use crate::table::builder::DeltaTableBuilder;
use crate::DeltaTable;

pub mod add_column;
pub mod add_feature;
pub mod cast;
pub mod convert_to_delta;
pub mod create;
Expand Down Expand Up @@ -220,6 +222,12 @@ impl DeltaOps {
ConstraintBuilder::new(self.0.log_store, self.0.state.unwrap())
}

/// Enable a table feature for a table
#[must_use]
pub fn add_feature(self) -> AddTableFeatureBuilder {
AddTableFeatureBuilder::new(self.0.log_store, self.0.state.unwrap())
}

/// Drops constraints from a table
#[cfg(feature = "datafusion")]
#[must_use]
Expand Down
Loading

0 comments on commit 6e78f47

Please sign in to comment.