diff --git a/crates/iceberg-catalog/src/catalog/tables.rs b/crates/iceberg-catalog/src/catalog/tables.rs index ea174c56..28ff34be 100644 --- a/crates/iceberg-catalog/src/catalog/tables.rs +++ b/crates/iceberg-catalog/src/catalog/tables.rs @@ -370,6 +370,8 @@ impl // serialize body before moving it let body = maybe_body_to_json(&request); + let updates = copy_updates(&updates); + let transaction_request = CommitTransactionRequest { table_changes: vec![request], }; @@ -398,7 +400,12 @@ impl .r#type("NoResultFromCommitTableTransaction".to_string()) .build(), )?; - + state + .v1_state + .table_check + .check(&updates, table_id, &result.previous_table_metadata) + .await? + .into_result()?; // We don't commit the transaction yet, first we need to write the metadata file. let storage_secret = if let Some(secret_id) = &result.storage_config.storage_secret_ident { Some( @@ -493,6 +500,12 @@ impl C::drop_table(&warehouse_id, &table_id, transaction.transaction()).await?; // ToDo: Delete metadata files + state + .v1_state + .table_check + .check_drop(table_id) + .await? + .into_result()?; transaction.commit().await?; @@ -756,11 +769,13 @@ impl // serialize request body before moving it here let mut events = vec![]; let mut event_table_ids: Vec<(TableIdent, TableIdentUuid)> = vec![]; + let mut updates = vec![]; for req in &request.table_changes { if let Some(id) = &req.identifier { if let Some(uuid) = table_ids.get(id) { events.push(maybe_body_to_json(&request)); event_table_ids.push((id.clone(), *uuid)); + updates.push(copy_updates(&req.updates)); } } } @@ -819,6 +834,19 @@ impl )); } + for ((update, (_, table_uuid)), response) in updates + .into_iter() + .zip(&event_table_ids) + .zip(&commit_response) + { + state + .v1_state + .table_check + .check(&update, *table_uuid, &response.previous_table_metadata) + .await? + .into_result()?; + } + futures::future::try_join_all(write_futures).await?; transaction.commit().await?; @@ -929,3 +957,65 @@ fn maybe_body_to_json(request: impl Serialize) -> serde_json::Value { serde_json::Value::Null } } + +// TableUpdate is not clone but all containing fields are so we use this function to clone.. +fn copy_updates(updates: &[TableUpdate]) -> Vec { + updates + .iter() + .map(|u| match u { + TableUpdate::AddSnapshot { snapshot } => TableUpdate::AddSnapshot { + snapshot: snapshot.clone(), + }, + TableUpdate::AssignUuid { uuid } => TableUpdate::AssignUuid { uuid: uuid.clone() }, + TableUpdate::AddSortOrder { sort_order } => TableUpdate::AddSortOrder { + sort_order: sort_order.clone(), + }, + TableUpdate::AddSpec { spec } => TableUpdate::AddSpec { spec: spec.clone() }, + TableUpdate::AddSchema { + schema, + last_column_id, + } => TableUpdate::AddSchema { + schema: schema.clone(), + last_column_id: last_column_id.clone(), + }, + TableUpdate::UpgradeFormatVersion { format_version } => { + TableUpdate::UpgradeFormatVersion { + format_version: format_version.clone(), + } + } + TableUpdate::SetCurrentSchema { schema_id } => TableUpdate::SetCurrentSchema { + schema_id: schema_id.clone(), + }, + TableUpdate::SetDefaultSortOrder { sort_order_id } => { + TableUpdate::SetDefaultSortOrder { + sort_order_id: *sort_order_id, + } + } + TableUpdate::RemoveSnapshotRef { ref_name } => TableUpdate::RemoveSnapshotRef { + ref_name: ref_name.clone(), + }, + TableUpdate::SetDefaultSpec { spec_id } => { + TableUpdate::SetDefaultSpec { spec_id: *spec_id } + } + TableUpdate::SetSnapshotRef { + ref_name, + reference, + } => TableUpdate::SetSnapshotRef { + ref_name: ref_name.clone(), + reference: reference.clone(), + }, + TableUpdate::RemoveSnapshots { snapshot_ids } => TableUpdate::RemoveSnapshots { + snapshot_ids: snapshot_ids.clone(), + }, + TableUpdate::SetLocation { location } => TableUpdate::SetLocation { + location: location.clone(), + }, + TableUpdate::SetProperties { updates } => TableUpdate::SetProperties { + updates: updates.clone(), + }, + TableUpdate::RemoveProperties { removals } => TableUpdate::RemoveProperties { + removals: removals.clone(), + }, + }) + .collect() +} diff --git a/crates/iceberg-catalog/src/implementations/postgres/table.rs b/crates/iceberg-catalog/src/implementations/postgres/table.rs index 78ea4ddb..bf78453f 100644 --- a/crates/iceberg-catalog/src/implementations/postgres/table.rs +++ b/crates/iceberg-catalog/src/implementations/postgres/table.rs @@ -688,6 +688,7 @@ fn apply_commits(commits: Vec) -> Result) -> Result { pub catalog: C::State, pub secrets: S::State, pub publisher: CloudEventsPublisher, + pub table_check: TableChangeCheckers, } impl ServiceState for State {} diff --git a/crates/iceberg-catalog/src/service/router.rs b/crates/iceberg-catalog/src/service/router.rs index d0c1336f..f035160f 100644 --- a/crates/iceberg-catalog/src/service/router.rs +++ b/crates/iceberg-catalog/src/service/router.rs @@ -1,8 +1,10 @@ use crate::service::event_publisher::CloudEventsPublisher; use crate::tracing::{MakeRequestUuid7, RestMakeSpan}; +use std::sync::Arc; use crate::api::management::ApiServer; use crate::api::{iceberg::v1::new_v1_full_router, shutdown_signal, ApiContext}; +use crate::service::table_change_check::TableChangeCheckers; use axum::{routing::get, Router}; use tower::ServiceBuilder; use tower_http::{ @@ -66,6 +68,9 @@ pub fn new_full_router< catalog: catalog_state, secrets: secrets_state, publisher, + table_check: TableChangeCheckers::new(vec![Arc::new( + crate::service::table_change_check::DenyAllChecker, + )]), }, }) } diff --git a/crates/iceberg-catalog/src/service/table_change_check.rs b/crates/iceberg-catalog/src/service/table_change_check.rs new file mode 100644 index 00000000..b1f26b48 --- /dev/null +++ b/crates/iceberg-catalog/src/service/table_change_check.rs @@ -0,0 +1,252 @@ +use crate::service::TableIdentUuid; +use async_trait::async_trait; +use iceberg::spec::{Schema, TableMetadata}; +use iceberg::TableUpdate; +use iceberg_ext::catalog::rest::ErrorModel; +use std::collections::HashMap; +use std::fmt::Debug; +use std::sync::Arc; +use tokio::sync::Mutex; + +#[derive(Debug, Clone)] +pub struct TableChangeCheckers { + checkers: Vec>, +} + +impl TableChangeCheckers { + pub fn new(checkers: Vec>) -> Self { + Self { checkers } + } + + pub async fn check( + &self, + table_updates: &[TableUpdate], + table_ident_uuid: TableIdentUuid, + current_metadata: &TableMetadata, + ) -> Result { + for checker in &self.checkers { + match checker + .check(table_updates, table_ident_uuid, current_metadata) + .await + { + Ok(TableCheckResult::Clear {}) => {} + Ok(block_result @ TableCheckResult::Block { error_model: _ }) => { + tracing::info!( + "Checker {} blocked change on table '{}'", + checker.name(), + table_ident_uuid + ); + return Ok(block_result); + } + Err(error) => { + tracing::warn!("Checker {} failed", checker.name()); + return Err(error); + } + } + } + + Ok(TableCheckResult::Clear {}) + } + pub async fn check_drop( + &self, + table_ident_uuid: TableIdentUuid, + ) -> Result { + for checker in &self.checkers { + match checker.check_drop(table_ident_uuid).await { + Ok(TableCheckResult::Clear {}) => {} + Ok(block_result @ TableCheckResult::Block { error_model: _ }) => { + tracing::info!( + "Checker {} blocked drop on table '{}'", + checker.name(), + table_ident_uuid + ); + return Ok(block_result); + } + Err(error) => { + tracing::warn!("Checker {} failed", checker.name()); + return Err(error); + } + } + } + Ok(TableCheckResult::Clear {}) + } +} + +#[async_trait] +pub trait TableChangeCheck: Debug { + fn name(&self) -> &'static str; + async fn check( + &self, + table_updates: &[TableUpdate], + table_ident_uuid: TableIdentUuid, + current_metadata: &TableMetadata, + ) -> Result; + + async fn check_drop( + &self, + table_ident_uuid: TableIdentUuid, + ) -> Result; +} + +#[derive(Debug)] +pub enum TableCheckResult { + Clear {}, + Block { error_model: ErrorModel }, +} + +impl TableCheckResult { + pub fn into_result(self) -> Result<(), ErrorModel> { + match self { + TableCheckResult::Clear {} => Ok(()), + TableCheckResult::Block { error_model } => Err(error_model), + } + } +} + +#[derive(Debug)] +pub struct InMemoryChecker { + pub contracts: Arc>>, +} + +#[async_trait] +impl TableChangeCheck for InMemoryChecker { + fn name(&self) -> &'static str { + "InMemoryChecker" + } + + async fn check( + &self, + table_updates: &[TableUpdate], + table_ident_uuid: TableIdentUuid, + current_metadata: &TableMetadata, + ) -> Result { + let contracts = self.contracts.lock().await; + if let Some(contract) = contracts.get(&table_ident_uuid) { + for table in table_updates { + match table { + TableUpdate::AddSnapshot { .. } + | TableUpdate::AssignUuid { .. } + | TableUpdate::AddSortOrder { .. } + | TableUpdate::AddSpec { .. } + | TableUpdate::AddSchema { .. } + | TableUpdate::UpgradeFormatVersion { .. } => { + return Ok(TableCheckResult::Clear {}) + } + + // these are the important ones to check + TableUpdate::SetCurrentSchema { schema_id } => { + let new_schema = current_metadata.schema_by_id(*schema_id).unwrap(); + for field in new_schema.as_struct().fields().iter() { + if let Some(contracted_field) = contract.field_by_name(&field.name) { + if contracted_field.field_type != field.field_type { + return Ok(TableCheckResult::Block { + error_model: ErrorModel::builder() + .code(409) + .message( + "Contract violation: promised field type changed" + .to_string(), + ) + .r#type("ContractViolation".to_string()) + .build() + .into(), + }); + } + } + } + } + TableUpdate::SetDefaultSortOrder { .. } => {} + TableUpdate::RemoveSnapshotRef { .. } => {} + TableUpdate::RemoveSnapshots { .. } => {} + TableUpdate::SetSnapshotRef { .. } => {} + TableUpdate::SetDefaultSpec { .. } => {} + TableUpdate::SetLocation { .. } => {} + TableUpdate::SetProperties { .. } => {} + TableUpdate::RemoveProperties { .. } => {} + } + } + } + Ok(TableCheckResult::Clear {}) + } + + async fn check_drop( + &self, + table_ident_uuid: TableIdentUuid, + ) -> Result { + let contracts = self.contracts.lock().await; + if contracts.contains_key(&table_ident_uuid) { + Ok(TableCheckResult::Block { + error_model: ErrorModel::builder() + .code(409) + .message("Cannot delete table with active contract.".to_string()) + .r#type("ContractViolation".to_string()) + .build() + .into(), + }) + } else { + Ok(TableCheckResult::Clear {}) + } + } +} + +#[derive(Debug)] +pub struct AllowAllChecker; +#[async_trait] +impl TableChangeCheck for AllowAllChecker { + fn name(&self) -> &'static str { + "AllowAllChecker" + } + async fn check( + &self, + _table_updates: &[TableUpdate], + _table_ident_uuid: TableIdentUuid, + _current_metadata: &TableMetadata, + ) -> Result { + Ok(TableCheckResult::Clear {}) + } + + async fn check_drop( + &self, + _table_ident_uuid: TableIdentUuid, + ) -> Result { + Ok(TableCheckResult::Clear {}) + } +} + +#[derive(Debug)] +pub struct DenyAllChecker; + +#[async_trait] +impl TableChangeCheck for DenyAllChecker { + fn name(&self) -> &'static str { + "DenyAllChecker" + } + async fn check( + &self, + _table_updates: &[TableUpdate], + _table_ident_uuid: TableIdentUuid, + _current_metadata: &TableMetadata, + ) -> Result { + Ok(TableCheckResult::Block { + error_model: ErrorModel::builder() + .code(409) + .message("Denied") + .r#type("ContractViolation".to_string()) + .build() + .into(), + }) + } + + async fn check_drop( + &self, + _table_ident_uuid: TableIdentUuid, + ) -> Result { + Ok(TableCheckResult::Block { + error_model: ErrorModel::builder() + .code(409) + .r#type("ContractViolation".to_string()) + .message("Denied") + .build() + .into(), + }) + } +} diff --git a/examples/docker-compose.yaml b/examples/docker-compose.yaml index 1cf1347d..95f2224d 100644 --- a/examples/docker-compose.yaml +++ b/examples/docker-compose.yaml @@ -13,7 +13,7 @@ services: - "8888:8888" server: - image: hansetag/iceberg-catalog:0.0.1 + image: iceberg-catalog-local:latest environment: - ICEBERG_REST__BASE_URI=http://server:8080/catalog/ - ICEBERG_REST__PG_ENCRYPTION_KEY=This-is-NOT-Secure! diff --git a/examples/notebooks/Multiple Warehouses.ipynb b/examples/notebooks/Multiple Warehouses.ipynb index b2c2ab2a..0c158e0f 100644 --- a/examples/notebooks/Multiple Warehouses.ipynb +++ b/examples/notebooks/Multiple Warehouses.ipynb @@ -2,7 +2,7 @@ "cells": [ { "cell_type": "code", - "execution_count": null, + "execution_count": 1, "metadata": {}, "outputs": [], "source": [ @@ -28,9 +28,20 @@ }, { "cell_type": "code", - "execution_count": null, + "execution_count": 2, "metadata": {}, - "outputs": [], + "outputs": [ + { + "data": { + "text/plain": [ + "200" + ] + }, + "execution_count": 2, + "metadata": {}, + "output_type": "execute_result" + } + ], "source": [ "response = requests.post(f\"{MANAGEMENT_URL}/v1/warehouse\",\n", " json={\n", @@ -83,7 +94,7 @@ }, { "cell_type": "code", - "execution_count": null, + "execution_count": 3, "metadata": {}, "outputs": [], "source": [ @@ -103,7 +114,7 @@ }, { "cell_type": "code", - "execution_count": null, + "execution_count": 4, "metadata": {}, "outputs": [], "source": [ @@ -116,9 +127,72 @@ }, { "cell_type": "code", - "execution_count": null, + "execution_count": 6, "metadata": {}, - "outputs": [], + "outputs": [ + { + "data": { + "text/plain": [ + "DataFrame[]" + ] + }, + "execution_count": 6, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "spark.sql(\"CREATE NAMESPACE test_create_namespace_spark\")" + ] + }, + { + "cell_type": "code", + "execution_count": 5, + "metadata": {}, + "outputs": [ + { + "data": { + "text/html": [ + "
\n", + "\n", + "\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "
namespace
0spark_demo
\n", + "
" + ], + "text/plain": [ + " namespace\n", + "0 spark_demo" + ] + }, + "execution_count": 5, + "metadata": {}, + "output_type": "execute_result" + } + ], "source": [ "spark.sql(\"CREATE NAMESPACE IF NOT EXISTS spark_demo\")\n", "spark.sql(\"SHOW NAMESPACES\").toPandas()" diff --git a/examples/notebooks/Spark Configuration.ipynb b/examples/notebooks/Spark Configuration.ipynb index 6f4baa1c..a5a8f667 100644 --- a/examples/notebooks/Spark Configuration.ipynb +++ b/examples/notebooks/Spark Configuration.ipynb @@ -2,7 +2,7 @@ "cells": [ { "cell_type": "code", - "execution_count": null, + "execution_count": 1, "metadata": {}, "outputs": [], "source": [ @@ -28,7 +28,7 @@ }, { "cell_type": "code", - "execution_count": null, + "execution_count": 2, "metadata": {}, "outputs": [], "source": [ @@ -46,7 +46,7 @@ }, { "cell_type": "code", - "execution_count": null, + "execution_count": 3, "metadata": {}, "outputs": [], "source": [ @@ -66,9 +66,52 @@ }, { "cell_type": "code", - "execution_count": null, + "execution_count": 4, "metadata": {}, - "outputs": [], + "outputs": [ + { + "data": { + "text/html": [ + "
\n", + "\n", + "\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "
namespace
0spark_demo
\n", + "
" + ], + "text/plain": [ + " namespace\n", + "0 spark_demo" + ] + }, + "execution_count": 4, + "metadata": {}, + "output_type": "execute_result" + } + ], "source": [ "spark.sql(\"CREATE NAMESPACE IF NOT EXISTS spark_demo\")\n", "spark.sql(\"SHOW NAMESPACES\").toPandas()" @@ -76,7 +119,7 @@ }, { "cell_type": "code", - "execution_count": null, + "execution_count": 5, "metadata": {}, "outputs": [], "source": [ @@ -86,7 +129,7 @@ }, { "cell_type": "code", - "execution_count": null, + "execution_count": 6, "metadata": {}, "outputs": [], "source": [