From 7f0454e47a45db95e87a2f19275613ecbf5e1833 Mon Sep 17 00:00:00 2001 From: Ion Koutsouris <15728914+ion-elgreco@users.noreply.github.com> Date: Tue, 27 Feb 2024 18:41:29 +0100 Subject: [PATCH] feat(rust, python): add `drop constraint` operation (#2071) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit # Description Adds `drop constraint` as an operation, and also exposed it to python under the alter namespace 😄 # Related Issue(s) - closes https://github.com/delta-io/delta-rs/issues/2070 --------- Co-authored-by: Robert Pack <42610831+roeap@users.noreply.github.com> Co-authored-by: R. Tyler Croy --- crates/core/src/operations/constraints.rs | 5 +- .../core/src/operations/drop_constraints.rs | 205 ++++++++++++++++++ crates/core/src/operations/mod.rs | 11 +- crates/core/src/protocol/mod.rs | 10 +- python/deltalake/_internal.pyi | 6 + python/deltalake/table.py | 34 +++ python/src/lib.rs | 28 +++ python/tests/test_alter.py | 53 +++++ 8 files changed, 346 insertions(+), 6 deletions(-) create mode 100644 crates/core/src/operations/drop_constraints.rs diff --git a/crates/core/src/operations/constraints.rs b/crates/core/src/operations/constraints.rs index 9bf5f2d22c..1a6328c176 100644 --- a/crates/core/src/operations/constraints.rs +++ b/crates/core/src/operations/constraints.rs @@ -190,10 +190,7 @@ impl std::future::IntoFuture for ConstraintBuilder { expr: expr_str.clone(), }; - let app_metadata = match this.app_metadata { - Some(metadata) => metadata, - None => HashMap::default(), - }; + let app_metadata = this.app_metadata.unwrap_or_default(); let commit_info = CommitInfo { timestamp: Some(Utc::now().timestamp_millis()), diff --git a/crates/core/src/operations/drop_constraints.rs b/crates/core/src/operations/drop_constraints.rs new file mode 100644 index 0000000000..417f73d8e8 --- /dev/null +++ b/crates/core/src/operations/drop_constraints.rs @@ -0,0 +1,205 @@ +//! Drop a constraint from a table + +use std::collections::HashMap; + +use chrono::Utc; +use futures::future::BoxFuture; +use serde_json::json; + +use crate::kernel::{Action, CommitInfo, IsolationLevel}; +use crate::logstore::LogStoreRef; +use crate::operations::transaction::commit; +use crate::protocol::DeltaOperation; +use crate::table::state::DeltaTableState; +use crate::DeltaTable; +use crate::{DeltaResult, DeltaTableError}; + +/// Remove constraints from the table +pub struct DropConstraintBuilder { + /// A snapshot of the table's state + snapshot: DeltaTableState, + /// Name of the constraint + name: Option, + /// Raise if constraint doesn't exist + raise_if_not_exists: bool, + /// Delta object store for handling data files + log_store: LogStoreRef, + /// Additional metadata to be added to commit + app_metadata: Option>, +} + +impl DropConstraintBuilder { + /// Create a new builder + pub fn new(log_store: LogStoreRef, snapshot: DeltaTableState) -> Self { + Self { + name: None, + raise_if_not_exists: true, + snapshot, + log_store, + app_metadata: None, + } + } + + /// Specify the constraint to be removed + pub fn with_constraint>(mut self, name: S) -> Self { + self.name = Some(name.into()); + self + } + + /// Specify if you want to raise if the constraint does not exist + pub fn with_raise_if_not_exists(mut self, raise: bool) -> Self { + self.raise_if_not_exists = raise; + self + } + + /// Additional metadata to be added to commit info + pub fn with_metadata( + mut self, + metadata: impl IntoIterator, + ) -> Self { + self.app_metadata = Some(HashMap::from_iter(metadata)); + self + } +} + +impl std::future::IntoFuture for DropConstraintBuilder { + type Output = DeltaResult; + + type IntoFuture = BoxFuture<'static, Self::Output>; + + fn into_future(self) -> Self::IntoFuture { + let mut this = self; + + Box::pin(async move { + let name = this + .name + .ok_or(DeltaTableError::Generic("No name provided".to_string()))?; + + let mut metadata = this.snapshot.metadata().clone(); + let configuration_key = format!("delta.constraints.{}", name); + + if metadata.configuration.remove(&configuration_key).is_none() { + if this.raise_if_not_exists { + return Err(DeltaTableError::Generic(format!( + "Constraint with name: {} doesn't exists", + name + ))); + } + return Ok(DeltaTable::new_with_state(this.log_store, this.snapshot)); + } + let operational_parameters = HashMap::from_iter([("name".to_string(), json!(&name))]); + + let operations = DeltaOperation::DropConstraint { name: name.clone() }; + + let app_metadata = this.app_metadata.unwrap_or_default(); + + let commit_info = CommitInfo { + timestamp: Some(Utc::now().timestamp_millis()), + operation: Some(operations.name().to_string()), + operation_parameters: Some(operational_parameters), + read_version: Some(this.snapshot.version()), + isolation_level: Some(IsolationLevel::Serializable), + is_blind_append: Some(false), + info: app_metadata, + ..Default::default() + }; + + let actions = vec![Action::CommitInfo(commit_info), Action::Metadata(metadata)]; + + let version = commit( + this.log_store.as_ref(), + &actions, + operations.clone(), + Some(&this.snapshot), + None, + ) + .await?; + + this.snapshot.merge(actions, &operations, version)?; + Ok(DeltaTable::new_with_state(this.log_store, this.snapshot)) + }) + } +} + +#[cfg(feature = "datafusion")] +#[cfg(test)] +mod tests { + use crate::writer::test_utils::{create_bare_table, get_record_batch}; + use crate::{DeltaOps, DeltaResult, DeltaTable}; + + async fn get_constraint_op_params(table: &mut DeltaTable) -> String { + let commit_info = table.history(None).await.unwrap(); + let last_commit = &commit_info[0]; + + last_commit + .operation_parameters + .as_ref() + .unwrap() + .get("name") + .unwrap() + .as_str() + .unwrap() + .to_owned() + } + + #[tokio::test] + async fn drop_valid_constraint() -> DeltaResult<()> { + let batch = get_record_batch(None, false); + let write = DeltaOps(create_bare_table()) + .write(vec![batch.clone()]) + .await?; + let table = DeltaOps(write); + + let table = table + .add_constraint() + .with_constraint("id", "value < 1000") + .await?; + + let mut table = DeltaOps(table) + .drop_constraints() + .with_constraint("id") + .await?; + + let expected_name = "id"; + assert_eq!(get_constraint_op_params(&mut table).await, expected_name); + assert_eq!(table.metadata().unwrap().configuration.get("id"), None); + Ok(()) + } + + #[tokio::test] + async fn drop_invalid_constraint_not_existing() -> DeltaResult<()> { + let batch = get_record_batch(None, false); + let write = DeltaOps(create_bare_table()) + .write(vec![batch.clone()]) + .await?; + + let table = DeltaOps(write) + .drop_constraints() + .with_constraint("not_existing") + .await; + assert!(table.is_err()); + + Ok(()) + } + + #[tokio::test] + async fn drop_invalid_constraint_ignore() -> DeltaResult<()> { + let batch = get_record_batch(None, false); + let write = DeltaOps(create_bare_table()) + .write(vec![batch.clone()]) + .await?; + + let version = write.version(); + + let table = DeltaOps(write) + .drop_constraints() + .with_constraint("not_existing") + .with_raise_if_not_exists(false) + .await?; + + let version_after = table.version(); + + assert_eq!(version, version_after); + Ok(()) + } +} diff --git a/crates/core/src/operations/mod.rs b/crates/core/src/operations/mod.rs index 2271f36641..666b2dc66a 100644 --- a/crates/core/src/operations/mod.rs +++ b/crates/core/src/operations/mod.rs @@ -18,6 +18,7 @@ use std::collections::HashMap; pub mod cast; pub mod convert_to_delta; pub mod create; +pub mod drop_constraints; pub mod filesystem_check; pub mod optimize; pub mod restore; @@ -27,7 +28,8 @@ pub mod vacuum; #[cfg(feature = "datafusion")] use self::{ constraints::ConstraintBuilder, datafusion_utils::Expression, delete::DeleteBuilder, - load::LoadBuilder, merge::MergeBuilder, update::UpdateBuilder, write::WriteBuilder, + drop_constraints::DropConstraintBuilder, load::LoadBuilder, merge::MergeBuilder, + update::UpdateBuilder, write::WriteBuilder, }; #[cfg(feature = "datafusion")] pub use ::datafusion::physical_plan::common::collect as collect_sendable_stream; @@ -199,6 +201,13 @@ impl DeltaOps { pub fn add_constraint(self) -> ConstraintBuilder { ConstraintBuilder::new(self.0.log_store, self.0.state.unwrap()) } + + /// Drops constraints from a table + #[cfg(feature = "datafusion")] + #[must_use] + pub fn drop_constraints(self) -> DropConstraintBuilder { + DropConstraintBuilder::new(self.0.log_store, self.0.state.unwrap()) + } } impl From for DeltaOps { diff --git a/crates/core/src/protocol/mod.rs b/crates/core/src/protocol/mod.rs index 3be8a734fa..9f3e5598e5 100644 --- a/crates/core/src/protocol/mod.rs +++ b/crates/core/src/protocol/mod.rs @@ -371,6 +371,12 @@ pub enum DeltaOperation { expr: String, }, + /// Drops constraints from a table + DropConstraint { + /// Constraints name + name: String, + }, + /// Merge data with a source data with the following predicate #[serde(rename_all = "camelCase")] Merge { @@ -458,6 +464,7 @@ impl DeltaOperation { DeltaOperation::VacuumStart { .. } => "VACUUM START", DeltaOperation::VacuumEnd { .. } => "VACUUM END", DeltaOperation::AddConstraint { .. } => "ADD CONSTRAINT", + DeltaOperation::DropConstraint { .. } => "DROP CONSTRAINT", } } @@ -496,7 +503,8 @@ impl DeltaOperation { Self::Optimize { .. } | Self::VacuumStart { .. } | Self::VacuumEnd { .. } - | Self::AddConstraint { .. } => false, + | Self::AddConstraint { .. } + | Self::DropConstraint { .. } => false, Self::Create { .. } | Self::FileSystemCheck {} | Self::StreamingUpdate { .. } diff --git a/python/deltalake/_internal.pyi b/python/deltalake/_internal.pyi index 6319412c8e..e8994983f1 100644 --- a/python/deltalake/_internal.pyi +++ b/python/deltalake/_internal.pyi @@ -84,6 +84,12 @@ class RawDeltaTable: constraints: Dict[str, str], custom_metadata: Optional[Dict[str, str]], ) -> None: ... + def drop_constraints( + self, + name: str, + raise_if_not_exists: bool, + custom_metadata: Optional[Dict[str, str]], + ) -> None: ... def restore( self, target: Optional[Any], diff --git a/python/deltalake/table.py b/python/deltalake/table.py index b6dd27f49d..d80aa8632f 100644 --- a/python/deltalake/table.py +++ b/python/deltalake/table.py @@ -1775,6 +1775,40 @@ def add_constraint( self.table._table.add_constraints(constraints, custom_metadata) + def drop_constraint( + self, + name: str, + raise_if_not_exists: bool = True, + custom_metadata: Optional[Dict[str, str]] = None, + ) -> None: + """ + Drop constraints from a table. Limited to `single constraint` at once. + + Args: + name: constraint name which to drop. + raise_if_not_exists: set if should raise if not exists. + custom_metadata: custom metadata that will be added to the transaction commit. + Example: + ```python + from deltalake import DeltaTable + dt = DeltaTable("test_table_constraints") + dt.metadata().configuration + {'delta.constraints.value_gt_5': 'value > 5'} + ``` + + **Drop the constraint** + ```python + dt.alter.drop_constraint(name = "value_gt_5") + ``` + + **Configuration after dropping** + ```python + dt.metadata().configuration + {} + ``` + """ + self.table._table.drop_constraints(name, raise_if_not_exists, custom_metadata) + class TableOptimizer: """API for various table optimization commands.""" diff --git a/python/src/lib.rs b/python/src/lib.rs index 53efe489f2..1992bae642 100644 --- a/python/src/lib.rs +++ b/python/src/lib.rs @@ -29,6 +29,7 @@ use deltalake::kernel::{Action, Add, Invariant, LogicalFile, Remove, Scalar, Str use deltalake::operations::constraints::ConstraintBuilder; use deltalake::operations::convert_to_delta::{ConvertToDeltaBuilder, PartitionStrategy}; use deltalake::operations::delete::DeleteBuilder; +use deltalake::operations::drop_constraints::DropConstraintBuilder; use deltalake::operations::filesystem_check::FileSystemCheckBuilder; use deltalake::operations::merge::MergeBuilder; use deltalake::operations::optimize::{OptimizeBuilder, OptimizeType}; @@ -470,6 +471,33 @@ impl RawDeltaTable { Ok(()) } + #[pyo3(signature = (name, raise_if_not_exists, custom_metadata=None))] + pub fn drop_constraints( + &mut self, + name: String, + raise_if_not_exists: bool, + custom_metadata: Option>, + ) -> PyResult<()> { + let mut cmd = DropConstraintBuilder::new( + self._table.log_store(), + self._table.snapshot().map_err(PythonError::from)?.clone(), + ) + .with_constraint(name) + .with_raise_if_not_exists(raise_if_not_exists); + + if let Some(metadata) = custom_metadata { + let json_metadata: Map = + metadata.into_iter().map(|(k, v)| (k, v.into())).collect(); + cmd = cmd.with_metadata(json_metadata); + }; + + let table = rt()? + .block_on(cmd.into_future()) + .map_err(PythonError::from)?; + self._table.state = table.state; + Ok(()) + } + #[allow(clippy::too_many_arguments)] #[pyo3(signature = (source, predicate, diff --git a/python/tests/test_alter.py b/python/tests/test_alter.py index 7069124581..4bc902d330 100644 --- a/python/tests/test_alter.py +++ b/python/tests/test_alter.py @@ -20,6 +20,7 @@ def test_add_constraint(tmp_path: pathlib.Path, sample_table: pa.Table): assert dt.metadata().configuration == { "delta.constraints.check_price": "price >= 0" } + assert dt.protocol().min_writer_version == 3 with pytest.raises(DeltaError): # Invalid constraint @@ -60,3 +61,55 @@ def test_add_constraint_roundtrip_metadata( ) assert dt.history(1)[0]["userName"] == "John Doe" + + +def test_drop_constraint(tmp_path: pathlib.Path, sample_table: pa.Table): + write_deltalake(tmp_path, sample_table) + + dt = DeltaTable(tmp_path) + + dt.alter.add_constraint({"check_price": "price >= 0"}) + assert dt.protocol().min_writer_version == 3 + dt.alter.drop_constraint(name="check_price") + last_action = dt.history(1)[0] + assert last_action["operation"] == "DROP CONSTRAINT" + assert dt.version() == 2 + assert dt.metadata().configuration == {} + assert dt.protocol().min_writer_version == 3 + + +def test_drop_constraint_invalid(tmp_path: pathlib.Path, sample_table: pa.Table): + write_deltalake(tmp_path, sample_table) + + dt = DeltaTable(tmp_path) + + dt.alter.add_constraint({"check_price": "price >= 0"}) + with pytest.raises(DeltaError): + dt.alter.drop_constraint(name="invalid_constraint_name") + + assert dt.metadata().configuration == { + "delta.constraints.check_price": "price >= 0" + } + assert dt.protocol().min_writer_version == 3 + + +def test_drop_constraint_invalid_ignore(tmp_path: pathlib.Path, sample_table: pa.Table): + write_deltalake(tmp_path, sample_table) + + dt = DeltaTable(tmp_path) + + dt.alter.add_constraint({"check_price": "price >= 0"}) + dt.alter.drop_constraint(name="invalid_constraint_name", raise_if_not_exists=False) + + +def test_drop_constraint_roundtrip_metadata( + tmp_path: pathlib.Path, sample_table: pa.Table +): + write_deltalake(tmp_path, sample_table, mode="append", engine="rust") + + dt = DeltaTable(tmp_path) + + dt.alter.add_constraint({"check_price2": "price >= 0"}) + dt.alter.drop_constraint("check_price2", custom_metadata={"userName": "John Doe"}) + + assert dt.history(1)[0]["userName"] == "John Doe"