From 12311bc06a0bc61d3563b2c5bf9ca678cdfeb511 Mon Sep 17 00:00:00 2001 From: Tobias Puetz Date: Tue, 11 Jun 2024 12:57:58 +0200 Subject: [PATCH 1/8] Add table-change-checker --- README.md | 3 +- crates/iceberg-catalog-bin/src/main.rs | 2 + crates/iceberg-catalog/src/catalog/tables.rs | 93 +++++++- .../src/implementations/postgres/table.rs | 2 + crates/iceberg-catalog/src/service/catalog.rs | 1 + crates/iceberg-catalog/src/service/mod.rs | 3 + crates/iceberg-catalog/src/service/router.rs | 3 + .../src/service/table_change_check.rs | 212 ++++++++++++++++++ examples/docker-compose.yaml | 2 +- examples/notebooks/Multiple Warehouses.ipynb | 88 +++++++- examples/notebooks/Spark Configuration.ipynb | 57 ++++- 11 files changed, 449 insertions(+), 17 deletions(-) create mode 100644 crates/iceberg-catalog/src/service/table_change_check.rs diff --git a/README.md b/README.md index a31c0987..273fbe24 100644 --- a/README.md +++ b/README.md @@ -15,7 +15,8 @@ The Iceberg REST Protocol has become the standard for catalogs in open Lakehouse We have started this implementation because we were missing "OPENNESS" such as customizability, support for on-premise deployments and other features that are important for us in existing Iceberg Catalogs. Please find following some of our focuses with this implementation: - **Customizable**: If you already manage Access to Tables in your company somewhere else or need the catalog to stream change events to a different system, you can do so by implementing a few methods. Please find more details in the [Customization Guide](CUSTOMIZING.md). -- **Change Events**: Built-in support to emit change events (CloudEvents), which enables you to react to any change that happen to your tables. Changes can also be prohibited by external systems using our request / response handler. This can be used to prohibit changes to tables that would validate Data Contracts, Quality SLOs etc. +- **Change Events**: Built-in support to emit change events (CloudEvents), which enables you to react to any change that happen to your tables. +- **Change Approval**: Changes can also be prohibited by external systems. This can be used to prohibit changes to tables that would validate Data Contracts, Quality SLOs etc. Simply integrate with your own change approval via our `TableChangeCheck` trait. - **Multi-Tenant capable**: A single deployment of our catalog can serve multiple projects - all with a single entrypoint. All Iceberg and Warehouse configurations are completly separated between Warehouses. - **Written in Rust**: Single 18Mb all-in-one binary - no JVM or Python env required. - **Storage Access Management**: Built-in S3-Signing that enables support for self-hosted as well as AWS S3 WITHOUT sharing S3 credentials with clients. diff --git a/crates/iceberg-catalog-bin/src/main.rs b/crates/iceberg-catalog-bin/src/main.rs index cda5eb8c..16c19579 100644 --- a/crates/iceberg-catalog-bin/src/main.rs +++ b/crates/iceberg-catalog-bin/src/main.rs @@ -4,6 +4,7 @@ use clap::{Parser, Subcommand}; use iceberg_catalog::service::event_publisher::{ CloudEventSink, CloudEventsPublisher, NatsPublisher, }; +use iceberg_catalog::service::table_change_check::TableChangeCheckers; use iceberg_catalog::{ implementations::{ postgres::{Catalog, CatalogState, SecretsState, SecretsStore}, @@ -81,6 +82,7 @@ async fn serve(bind_addr: std::net::SocketAddr) -> Result<(), anyhow::Error> { CloudEventsPublisher { sinks: cloud_event_sinks, }, + TableChangeCheckers::new(vec![]), ); service_serve(listener, router).await?; diff --git a/crates/iceberg-catalog/src/catalog/tables.rs b/crates/iceberg-catalog/src/catalog/tables.rs index 88dd02e7..f0d039ad 100644 --- a/crates/iceberg-catalog/src/catalog/tables.rs +++ b/crates/iceberg-catalog/src/catalog/tables.rs @@ -22,6 +22,7 @@ use super::{ }; use crate::service::event_publisher::{CloudEventsPublisher, EventMetadata}; use crate::service::storage::StorageCredential; +use crate::service::table_change_check::TableChangeCheck; use crate::service::{ auth::AuthZHandler, secrets::SecretStore, Catalog, CreateTableResult, LoadTableResult as CatalogLoadTableResult, State, Transaction, @@ -370,6 +371,8 @@ impl // serialize body before moving it let body = maybe_body_to_json(&request); + let updates = copy_updates(updates); + let transaction_request = CommitTransactionRequest { table_changes: vec![request], }; @@ -398,7 +401,12 @@ impl .r#type("NoResultFromCommitTableTransaction".to_string()) .build(), )?; - + state + .v1_state + .table_change_checkers + .check(&updates, table_id, &result.previous_table_metadata) + .await? + .into_result()?; // We don't commit the transaction yet, first we need to write the metadata file. let storage_secret = if let Some(secret_id) = &result.storage_config.storage_secret_ident { Some( @@ -493,6 +501,12 @@ impl C::drop_table(&warehouse_id, &table_id, transaction.transaction()).await?; // ToDo: Delete metadata files + state + .v1_state + .table_change_checkers + .check_drop(table_id) + .await? + .into_result()?; transaction.commit().await?; @@ -756,11 +770,13 @@ impl // serialize request body before moving it here let mut events = vec![]; let mut event_table_ids: Vec<(TableIdent, TableIdentUuid)> = vec![]; + let mut updates = vec![]; for commit_table_request in &request.table_changes { if let Some(id) = &commit_table_request.identifier { if let Some(uuid) = table_ids.get(id) { events.push(maybe_body_to_json(commit_table_request)); event_table_ids.push((id.clone(), *uuid)); + updates.push(copy_updates(&commit_table_request.updates)); } } } @@ -819,6 +835,19 @@ impl )); } + for ((update, (_, table_uuid)), response) in updates + .into_iter() + .zip(&event_table_ids) + .zip(&commit_response) + { + state + .v1_state + .table_change_checkers + .check(&update, *table_uuid, &response.previous_table_metadata) + .await? + .into_result()?; + } + futures::future::try_join_all(write_futures).await?; transaction.commit().await?; @@ -929,3 +958,65 @@ fn maybe_body_to_json(request: impl Serialize) -> serde_json::Value { serde_json::Value::Null } } + +// TableUpdate is not clone but all containing fields are so we use this function to clone.. +fn copy_updates(updates: &[TableUpdate]) -> Vec { + updates + .iter() + .map(|u| match u { + TableUpdate::AddSnapshot { snapshot } => TableUpdate::AddSnapshot { + snapshot: snapshot.clone(), + }, + TableUpdate::AssignUuid { uuid } => TableUpdate::AssignUuid { uuid: *uuid }, + TableUpdate::AddSortOrder { sort_order } => TableUpdate::AddSortOrder { + sort_order: sort_order.clone(), + }, + TableUpdate::AddSpec { spec } => TableUpdate::AddSpec { spec: spec.clone() }, + TableUpdate::AddSchema { + schema, + last_column_id, + } => TableUpdate::AddSchema { + schema: schema.clone(), + last_column_id: *last_column_id, + }, + TableUpdate::UpgradeFormatVersion { format_version } => { + TableUpdate::UpgradeFormatVersion { + format_version: *format_version, + } + } + TableUpdate::SetCurrentSchema { schema_id } => TableUpdate::SetCurrentSchema { + schema_id: *schema_id, + }, + TableUpdate::SetDefaultSortOrder { sort_order_id } => { + TableUpdate::SetDefaultSortOrder { + sort_order_id: *sort_order_id, + } + } + TableUpdate::RemoveSnapshotRef { ref_name } => TableUpdate::RemoveSnapshotRef { + ref_name: ref_name.clone(), + }, + TableUpdate::SetDefaultSpec { spec_id } => { + TableUpdate::SetDefaultSpec { spec_id: *spec_id } + } + TableUpdate::SetSnapshotRef { + ref_name, + reference, + } => TableUpdate::SetSnapshotRef { + ref_name: ref_name.clone(), + reference: reference.clone(), + }, + TableUpdate::RemoveSnapshots { snapshot_ids } => TableUpdate::RemoveSnapshots { + snapshot_ids: snapshot_ids.clone(), + }, + TableUpdate::SetLocation { location } => TableUpdate::SetLocation { + location: location.clone(), + }, + TableUpdate::SetProperties { updates } => TableUpdate::SetProperties { + updates: updates.clone(), + }, + TableUpdate::RemoveProperties { removals } => TableUpdate::RemoveProperties { + removals: removals.clone(), + }, + }) + .collect() +} diff --git a/crates/iceberg-catalog/src/implementations/postgres/table.rs b/crates/iceberg-catalog/src/implementations/postgres/table.rs index 78ea4ddb..bf78453f 100644 --- a/crates/iceberg-catalog/src/implementations/postgres/table.rs +++ b/crates/iceberg-catalog/src/implementations/postgres/table.rs @@ -688,6 +688,7 @@ fn apply_commits(commits: Vec) -> Result) -> Result { pub catalog: C::State, pub secrets: S::State, pub publisher: CloudEventsPublisher, + pub table_change_checkers: TableChangeCheckers, } impl ServiceState for State {} diff --git a/crates/iceberg-catalog/src/service/router.rs b/crates/iceberg-catalog/src/service/router.rs index d0c1336f..afe7f4be 100644 --- a/crates/iceberg-catalog/src/service/router.rs +++ b/crates/iceberg-catalog/src/service/router.rs @@ -3,6 +3,7 @@ use crate::tracing::{MakeRequestUuid7, RestMakeSpan}; use crate::api::management::ApiServer; use crate::api::{iceberg::v1::new_v1_full_router, shutdown_signal, ApiContext}; +use crate::service::table_change_check::TableChangeCheckers; use axum::{routing::get, Router}; use tower::ServiceBuilder; use tower_http::{ @@ -29,6 +30,7 @@ pub fn new_full_router< catalog_state: C::State, secrets_state: S::State, publisher: CloudEventsPublisher, + table_change_checkers: TableChangeCheckers, ) -> Router { let v1_routes = new_v1_full_router::< crate::catalog::ConfigServer, @@ -66,6 +68,7 @@ pub fn new_full_router< catalog: catalog_state, secrets: secrets_state, publisher, + table_change_checkers, }, }) } diff --git a/crates/iceberg-catalog/src/service/table_change_check.rs b/crates/iceberg-catalog/src/service/table_change_check.rs new file mode 100644 index 00000000..a0f9e82c --- /dev/null +++ b/crates/iceberg-catalog/src/service/table_change_check.rs @@ -0,0 +1,212 @@ +use crate::service::TableIdentUuid; +use async_trait::async_trait; +use iceberg::spec::TableMetadata; +use iceberg::TableUpdate; +use iceberg_ext::catalog::rest::ErrorModel; +use std::fmt::Debug; +use std::sync::Arc; + +/// A trait for checking if a table change is allowed. +/// +/// This trait is used to implement custom logic for checking if a table change is allowed. One +/// possible application is the enforcement of data contracts. For example, an external system could +/// be contacted to check if the changed table columns are part of an existing contract. +/// +/// # Example +/// +/// ```rust +/// use async_trait::async_trait; +/// use iceberg::spec::TableMetadata; +/// use iceberg::TableUpdate; +/// use iceberg_catalog::service::table_change_check::{TableChangeCheck, TableCheckResult}; +/// use iceberg_catalog::service::TableIdentUuid; +/// use iceberg_ext::catalog::rest::ErrorModel; +/// +/// #[derive(Debug)] +/// pub struct AllowAllChecker; +/// #[async_trait] +/// impl TableChangeCheck for AllowAllChecker { +/// fn name(&self) -> &'static str { +/// "AllowAllChecker" +/// } +/// async fn check( +/// &self, +/// _table_updates: &[TableUpdate], +/// _table_ident_uuid: TableIdentUuid, +/// _current_metadata: &TableMetadata, +/// ) -> Result { +/// Ok(TableCheckResult::Clear {}) +/// } +/// +/// async fn check_drop( +/// &self, +/// _table_ident_uuid: TableIdentUuid, +/// ) -> Result { +/// Ok(TableCheckResult::Clear {}) +/// } +/// } +/// +/// #[derive(Debug)] +/// pub struct DenyAllChecker; +/// +/// #[async_trait] +/// impl TableChangeCheck for DenyAllChecker { +/// fn name(&self) -> &'static str { +/// "DenyAllChecker" +/// } +/// async fn check( +/// &self, +/// _table_updates: &[TableUpdate], +/// _table_ident_uuid: TableIdentUuid, +/// _current_metadata: &TableMetadata, +/// ) -> Result { +/// Ok(TableCheckResult::Block { +/// error_model: ErrorModel::builder() +/// .code(409) +/// .message("Denied") +/// .r#type("ContractViolation".to_string()) +/// .build() +/// .into(), +/// }) +/// } +/// +/// async fn check_drop( +/// &self, +/// _table_ident_uuid: TableIdentUuid, +/// ) -> Result { +/// Ok(TableCheckResult::Block { +/// error_model: ErrorModel::builder() +/// .code(409) +/// .r#type("ContractViolation".to_string()) +/// .message("Denied") +/// .build() +/// .into(), +/// }) +/// } +/// } +/// ``` +#[async_trait] +pub trait TableChangeCheck: Debug { + fn name(&self) -> &'static str; + + async fn check( + &self, + table_updates: &[TableUpdate], + table_ident_uuid: TableIdentUuid, + current_metadata: &TableMetadata, + ) -> Result; + + async fn check_drop( + &self, + table_ident_uuid: TableIdentUuid, + ) -> Result; +} + +#[derive(Debug)] +pub enum TableCheckResult { + Clear {}, + Block { error_model: ErrorModel }, +} + +impl TableCheckResult { + /// Converts `self` into a `Result<(), ErrorModel>`. + /// + /// When using `TableChangeCheck`, we are presented with a `Result` + /// where the outer `ErrorModel` indicates that a checker failed, that would indicate a problem + /// with the checker itself and may be returned as an internal server error. This function here + /// offers convenience to go from the `TableCheckResult` to a `Result<(), ErrorModel>` which can + /// be short-circuited using the `?` operator in the handler. + /// + /// # Example + /// + /// ```rust + /// use iceberg_catalog::service::table_change_check::TableCheckResult; + /// use iceberg_ext::catalog::rest::ErrorModel; + /// let result: Result = Ok(TableCheckResult::Clear {}); + /// // no need to match on the result + /// result?.into_result()?; + /// ``` + /// + /// # Errors + /// + /// - extracts `error_model` from `TableCheckResult::Block` and returns it as an `Err` for + /// convenience. + pub fn into_result(self) -> Result<(), ErrorModel> { + match self { + TableCheckResult::Clear {} => Ok(()), + TableCheckResult::Block { error_model } => Err(error_model), + } + } +} + +#[derive(Debug, Clone)] +pub struct TableChangeCheckers { + checkers: Vec>, +} + +impl TableChangeCheckers { + #[must_use] + pub fn new(checkers: Vec>) -> Self { + Self { checkers } + } +} + +#[async_trait] +impl TableChangeCheck for TableChangeCheckers { + fn name(&self) -> &'static str { + "TableChangeCheckers" + } + + async fn check( + &self, + table_updates: &[TableUpdate], + table_ident_uuid: TableIdentUuid, + current_metadata: &TableMetadata, + ) -> Result { + for checker in &self.checkers { + match checker + .check(table_updates, table_ident_uuid, current_metadata) + .await + { + Ok(TableCheckResult::Clear {}) => {} + Ok(block_result @ TableCheckResult::Block { error_model: _ }) => { + tracing::info!( + "Checker {} blocked change on table '{}'", + checker.name(), + table_ident_uuid + ); + return Ok(block_result); + } + Err(error) => { + tracing::warn!("Checker {} failed", checker.name()); + return Err(error); + } + } + } + + Ok(TableCheckResult::Clear {}) + } + async fn check_drop( + &self, + table_ident_uuid: TableIdentUuid, + ) -> Result { + for checker in &self.checkers { + match checker.check_drop(table_ident_uuid).await { + Ok(TableCheckResult::Clear {}) => {} + Ok(block_result @ TableCheckResult::Block { error_model: _ }) => { + tracing::info!( + "Checker {} blocked drop on table '{}'", + checker.name(), + table_ident_uuid + ); + return Ok(block_result); + } + Err(error) => { + tracing::warn!("Checker {} failed", checker.name()); + return Err(error); + } + } + } + Ok(TableCheckResult::Clear {}) + } +} diff --git a/examples/docker-compose.yaml b/examples/docker-compose.yaml index 1cf1347d..95f2224d 100644 --- a/examples/docker-compose.yaml +++ b/examples/docker-compose.yaml @@ -13,7 +13,7 @@ services: - "8888:8888" server: - image: hansetag/iceberg-catalog:0.0.1 + image: iceberg-catalog-local:latest environment: - ICEBERG_REST__BASE_URI=http://server:8080/catalog/ - ICEBERG_REST__PG_ENCRYPTION_KEY=This-is-NOT-Secure! diff --git a/examples/notebooks/Multiple Warehouses.ipynb b/examples/notebooks/Multiple Warehouses.ipynb index b2c2ab2a..0c158e0f 100644 --- a/examples/notebooks/Multiple Warehouses.ipynb +++ b/examples/notebooks/Multiple Warehouses.ipynb @@ -2,7 +2,7 @@ "cells": [ { "cell_type": "code", - "execution_count": null, + "execution_count": 1, "metadata": {}, "outputs": [], "source": [ @@ -28,9 +28,20 @@ }, { "cell_type": "code", - "execution_count": null, + "execution_count": 2, "metadata": {}, - "outputs": [], + "outputs": [ + { + "data": { + "text/plain": [ + "200" + ] + }, + "execution_count": 2, + "metadata": {}, + "output_type": "execute_result" + } + ], "source": [ "response = requests.post(f\"{MANAGEMENT_URL}/v1/warehouse\",\n", " json={\n", @@ -83,7 +94,7 @@ }, { "cell_type": "code", - "execution_count": null, + "execution_count": 3, "metadata": {}, "outputs": [], "source": [ @@ -103,7 +114,7 @@ }, { "cell_type": "code", - "execution_count": null, + "execution_count": 4, "metadata": {}, "outputs": [], "source": [ @@ -116,9 +127,72 @@ }, { "cell_type": "code", - "execution_count": null, + "execution_count": 6, "metadata": {}, - "outputs": [], + "outputs": [ + { + "data": { + "text/plain": [ + "DataFrame[]" + ] + }, + "execution_count": 6, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "spark.sql(\"CREATE NAMESPACE test_create_namespace_spark\")" + ] + }, + { + "cell_type": "code", + "execution_count": 5, + "metadata": {}, + "outputs": [ + { + "data": { + "text/html": [ + "
\n", + "\n", + "\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "
namespace
0spark_demo
\n", + "
" + ], + "text/plain": [ + " namespace\n", + "0 spark_demo" + ] + }, + "execution_count": 5, + "metadata": {}, + "output_type": "execute_result" + } + ], "source": [ "spark.sql(\"CREATE NAMESPACE IF NOT EXISTS spark_demo\")\n", "spark.sql(\"SHOW NAMESPACES\").toPandas()" diff --git a/examples/notebooks/Spark Configuration.ipynb b/examples/notebooks/Spark Configuration.ipynb index 6f4baa1c..a5a8f667 100644 --- a/examples/notebooks/Spark Configuration.ipynb +++ b/examples/notebooks/Spark Configuration.ipynb @@ -2,7 +2,7 @@ "cells": [ { "cell_type": "code", - "execution_count": null, + "execution_count": 1, "metadata": {}, "outputs": [], "source": [ @@ -28,7 +28,7 @@ }, { "cell_type": "code", - "execution_count": null, + "execution_count": 2, "metadata": {}, "outputs": [], "source": [ @@ -46,7 +46,7 @@ }, { "cell_type": "code", - "execution_count": null, + "execution_count": 3, "metadata": {}, "outputs": [], "source": [ @@ -66,9 +66,52 @@ }, { "cell_type": "code", - "execution_count": null, + "execution_count": 4, "metadata": {}, - "outputs": [], + "outputs": [ + { + "data": { + "text/html": [ + "
\n", + "\n", + "\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "
namespace
0spark_demo
\n", + "
" + ], + "text/plain": [ + " namespace\n", + "0 spark_demo" + ] + }, + "execution_count": 4, + "metadata": {}, + "output_type": "execute_result" + } + ], "source": [ "spark.sql(\"CREATE NAMESPACE IF NOT EXISTS spark_demo\")\n", "spark.sql(\"SHOW NAMESPACES\").toPandas()" @@ -76,7 +119,7 @@ }, { "cell_type": "code", - "execution_count": null, + "execution_count": 5, "metadata": {}, "outputs": [], "source": [ @@ -86,7 +129,7 @@ }, { "cell_type": "code", - "execution_count": null, + "execution_count": 6, "metadata": {}, "outputs": [], "source": [ From c8c2f83f1841d3383b8afbcddbf4d23f947291b5 Mon Sep 17 00:00:00 2001 From: Tobias Puetz Date: Tue, 11 Jun 2024 15:10:51 +0200 Subject: [PATCH 2/8] fix doc test --- crates/iceberg-catalog/src/service/table_change_check.rs | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/crates/iceberg-catalog/src/service/table_change_check.rs b/crates/iceberg-catalog/src/service/table_change_check.rs index a0f9e82c..ff276570 100644 --- a/crates/iceberg-catalog/src/service/table_change_check.rs +++ b/crates/iceberg-catalog/src/service/table_change_check.rs @@ -122,9 +122,12 @@ impl TableCheckResult { /// ```rust /// use iceberg_catalog::service::table_change_check::TableCheckResult; /// use iceberg_ext::catalog::rest::ErrorModel; - /// let result: Result = Ok(TableCheckResult::Clear {}); - /// // no need to match on the result - /// result?.into_result()?; + /// + /// fn my_handler() -> Result<(), ErrorModel> { + /// let result: Result = Ok(TableCheckResult::Clear {}); + /// result?.into_result()?; + /// Ok(()) + /// } /// ``` /// /// # Errors From a25f4791f013682331560d839ada55c808320d17 Mon Sep 17 00:00:00 2001 From: Tobias Puetz Date: Tue, 11 Jun 2024 15:50:55 +0200 Subject: [PATCH 3/8] revert notebook changes --- examples/docker-compose.yaml | 2 +- examples/notebooks/Multiple Warehouses.ipynb | 88 ++------------------ examples/notebooks/Spark Configuration.ipynb | 57 ++----------- 3 files changed, 15 insertions(+), 132 deletions(-) diff --git a/examples/docker-compose.yaml b/examples/docker-compose.yaml index 95f2224d..1cf1347d 100644 --- a/examples/docker-compose.yaml +++ b/examples/docker-compose.yaml @@ -13,7 +13,7 @@ services: - "8888:8888" server: - image: iceberg-catalog-local:latest + image: hansetag/iceberg-catalog:0.0.1 environment: - ICEBERG_REST__BASE_URI=http://server:8080/catalog/ - ICEBERG_REST__PG_ENCRYPTION_KEY=This-is-NOT-Secure! diff --git a/examples/notebooks/Multiple Warehouses.ipynb b/examples/notebooks/Multiple Warehouses.ipynb index 0c158e0f..b2c2ab2a 100644 --- a/examples/notebooks/Multiple Warehouses.ipynb +++ b/examples/notebooks/Multiple Warehouses.ipynb @@ -2,7 +2,7 @@ "cells": [ { "cell_type": "code", - "execution_count": 1, + "execution_count": null, "metadata": {}, "outputs": [], "source": [ @@ -28,20 +28,9 @@ }, { "cell_type": "code", - "execution_count": 2, + "execution_count": null, "metadata": {}, - "outputs": [ - { - "data": { - "text/plain": [ - "200" - ] - }, - "execution_count": 2, - "metadata": {}, - "output_type": "execute_result" - } - ], + "outputs": [], "source": [ "response = requests.post(f\"{MANAGEMENT_URL}/v1/warehouse\",\n", " json={\n", @@ -94,7 +83,7 @@ }, { "cell_type": "code", - "execution_count": 3, + "execution_count": null, "metadata": {}, "outputs": [], "source": [ @@ -114,7 +103,7 @@ }, { "cell_type": "code", - "execution_count": 4, + "execution_count": null, "metadata": {}, "outputs": [], "source": [ @@ -127,72 +116,9 @@ }, { "cell_type": "code", - "execution_count": 6, - "metadata": {}, - "outputs": [ - { - "data": { - "text/plain": [ - "DataFrame[]" - ] - }, - "execution_count": 6, - "metadata": {}, - "output_type": "execute_result" - } - ], - "source": [ - "spark.sql(\"CREATE NAMESPACE test_create_namespace_spark\")" - ] - }, - { - "cell_type": "code", - "execution_count": 5, + "execution_count": null, "metadata": {}, - "outputs": [ - { - "data": { - "text/html": [ - "
\n", - "\n", - "\n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - "
namespace
0spark_demo
\n", - "
" - ], - "text/plain": [ - " namespace\n", - "0 spark_demo" - ] - }, - "execution_count": 5, - "metadata": {}, - "output_type": "execute_result" - } - ], + "outputs": [], "source": [ "spark.sql(\"CREATE NAMESPACE IF NOT EXISTS spark_demo\")\n", "spark.sql(\"SHOW NAMESPACES\").toPandas()" diff --git a/examples/notebooks/Spark Configuration.ipynb b/examples/notebooks/Spark Configuration.ipynb index a5a8f667..6f4baa1c 100644 --- a/examples/notebooks/Spark Configuration.ipynb +++ b/examples/notebooks/Spark Configuration.ipynb @@ -2,7 +2,7 @@ "cells": [ { "cell_type": "code", - "execution_count": 1, + "execution_count": null, "metadata": {}, "outputs": [], "source": [ @@ -28,7 +28,7 @@ }, { "cell_type": "code", - "execution_count": 2, + "execution_count": null, "metadata": {}, "outputs": [], "source": [ @@ -46,7 +46,7 @@ }, { "cell_type": "code", - "execution_count": 3, + "execution_count": null, "metadata": {}, "outputs": [], "source": [ @@ -66,52 +66,9 @@ }, { "cell_type": "code", - "execution_count": 4, + "execution_count": null, "metadata": {}, - "outputs": [ - { - "data": { - "text/html": [ - "
\n", - "\n", - "\n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - "
namespace
0spark_demo
\n", - "
" - ], - "text/plain": [ - " namespace\n", - "0 spark_demo" - ] - }, - "execution_count": 4, - "metadata": {}, - "output_type": "execute_result" - } - ], + "outputs": [], "source": [ "spark.sql(\"CREATE NAMESPACE IF NOT EXISTS spark_demo\")\n", "spark.sql(\"SHOW NAMESPACES\").toPandas()" @@ -119,7 +76,7 @@ }, { "cell_type": "code", - "execution_count": 5, + "execution_count": null, "metadata": {}, "outputs": [], "source": [ @@ -129,7 +86,7 @@ }, { "cell_type": "code", - "execution_count": 6, + "execution_count": null, "metadata": {}, "outputs": [], "source": [ From a13235f6a2ab4b29091becc5b86ca7da6f1b9126 Mon Sep 17 00:00:00 2001 From: Tobias Puetz Date: Tue, 11 Jun 2024 16:32:23 +0200 Subject: [PATCH 4/8] rename table checker to contract verifier --- README.md | 55 +++++--- crates/iceberg-catalog-bin/src/main.rs | 4 +- crates/iceberg-catalog/src/catalog/tables.rs | 15 +- ...ange_check.rs => contract_verification.rs} | 132 ++++++++++++------ crates/iceberg-catalog/src/service/mod.rs | 6 +- crates/iceberg-catalog/src/service/router.rs | 6 +- 6 files changed, 148 insertions(+), 70 deletions(-) rename crates/iceberg-catalog/src/service/{table_change_check.rs => contract_verification.rs} (51%) diff --git a/README.md b/README.md index 273fbe24..3bb9618e 100644 --- a/README.md +++ b/README.md @@ -4,27 +4,46 @@ [![Build Status][actions-badge]][actions-url] [actions-badge]: https://github.com/hansetag/iceberg-catalog/workflows/CI/badge.svg?branch=main + [actions-url]: https://github.com/hansetag/iceberg-catalog/actions?query=workflow%3ACI+branch%3Amain -This is TIP: A Rust-native implementation of the [Apache Iceberg](https://iceberg.apache.org/) REST Catalog specification based on [apache/iceberg-rust](https://github.com/apache/iceberg-rust). +This is TIP: A Rust-native implementation of the [Apache Iceberg](https://iceberg.apache.org/) REST Catalog +specification based on [apache/iceberg-rust](https://github.com/apache/iceberg-rust). # Scope and Features -The Iceberg REST Protocol has become the standard for catalogs in open Lakehouses. It natively enables multi-table commits, server-side deconflicting and much more. It is figuratively the (**TIP**) of the Iceberg. - -We have started this implementation because we were missing "OPENNESS" such as customizability, support for on-premise deployments and other features that are important for us in existing Iceberg Catalogs. Please find following some of our focuses with this implementation: - -- **Customizable**: If you already manage Access to Tables in your company somewhere else or need the catalog to stream change events to a different system, you can do so by implementing a few methods. Please find more details in the [Customization Guide](CUSTOMIZING.md). -- **Change Events**: Built-in support to emit change events (CloudEvents), which enables you to react to any change that happen to your tables. -- **Change Approval**: Changes can also be prohibited by external systems. This can be used to prohibit changes to tables that would validate Data Contracts, Quality SLOs etc. Simply integrate with your own change approval via our `TableChangeCheck` trait. -- **Multi-Tenant capable**: A single deployment of our catalog can serve multiple projects - all with a single entrypoint. All Iceberg and Warehouse configurations are completly separated between Warehouses. +The Iceberg REST Protocol has become the standard for catalogs in open Lakehouses. It natively enables multi-table +commits, server-side deconflicting and much more. It is figuratively the (**TIP**) of the Iceberg. + +We have started this implementation because we were missing "OPENNESS" such as customizability, support for on-premise +deployments and other features that are important for us in existing Iceberg Catalogs. Please find following some of our +focuses with this implementation: + +- **Customizable**: If you already manage Access to Tables in your company somewhere else or need the catalog to stream + change events to a different system, you can do so by implementing a few methods. Please find more details in + the [Customization Guide](CUSTOMIZING.md). +- **Change Events**: Built-in support to emit change events (CloudEvents), which enables you to react to any change that + happen to your tables. +- **Change Approval**: Changes can also be prohibited by external systems. This can be used to prohibit changes to + tables that would validate Data Contracts, Quality SLOs etc. Simply integrate with your own change approval via + our `ContractVerification` trait. +- **Multi-Tenant capable**: A single deployment of our catalog can serve multiple projects - all with a single + entrypoint. All Iceberg and Warehouse configurations are completly separated between Warehouses. - **Written in Rust**: Single 18Mb all-in-one binary - no JVM or Python env required. -- **Storage Access Management**: Built-in S3-Signing that enables support for self-hosted as well as AWS S3 WITHOUT sharing S3 credentials with clients. -- **Well-Tested**: Integration-tested with `spark`, `trino` and `pyiceberg` (support for S3 with this catalog from pyiceberg 0.7.0) -- **High Available & Horizontally Scalable**: There is no local state - the catalog can be scaled horizontally and updated without downtimes. -- **Fine Grained Access (FGA) (Coming soon):** Simple Role-Based access control is not enough for many rapidly evolving Data & Analytics initiatives. We are leveraging [OpenFGA](https://openfga.dev/) based on googles [Zanzibar-Paper](https://research.google/pubs/zanzibar-googles-consistent-global-authorization-system/) to implement authorization. If your company already has a different system in place, you can integrate with it by implementing a handful of methods in the `AuthZHandler` trait. - -Please find following an overview of currently supported features. Please also check the Issues if you are missing something. +- **Storage Access Management**: Built-in S3-Signing that enables support for self-hosted as well as AWS S3 WITHOUT + sharing S3 credentials with clients. +- **Well-Tested**: Integration-tested with `spark`, `trino` and `pyiceberg` (support for S3 with this catalog from + pyiceberg 0.7.0) +- **High Available & Horizontally Scalable**: There is no local state - the catalog can be scaled horizontally and + updated without downtimes. +- **Fine Grained Access (FGA) (Coming soon):** Simple Role-Based access control is not enough for many rapidly evolving + Data & Analytics initiatives. We are leveraging [OpenFGA](https://openfga.dev/) based on + googles [Zanzibar-Paper](https://research.google/pubs/zanzibar-googles-consistent-global-authorization-system/) to + implement authorization. If your company already has a different system in place, you can integrate with it by + implementing a handful of methods in the `AuthZHandler` trait. + +Please find following an overview of currently supported features. Please also check the Issues if you are missing +something. # Quickstart @@ -99,7 +118,8 @@ Then open your browser and head to `localhost:8888`. # Multiple Projects -The iceberg-rest server can host multiple independent warehouses that are again grouped by projects. The overall structure looks like this: +The iceberg-rest server can host multiple independent warehouses that are again grouped by projects. The overall +structure looks like this: ``` / @@ -145,7 +165,8 @@ Configuration parameters if Postgres is used as a backend: # Limitations -- Table Metadata is currently limited to `256Mb` for the `postgres` implementation. If you need more, you should probably vaccum your table ;) +- Table Metadata is currently limited to `256Mb` for the `postgres` implementation. If you need more, you should + probably vaccum your table ;) - Views are not supported yet ## License diff --git a/crates/iceberg-catalog-bin/src/main.rs b/crates/iceberg-catalog-bin/src/main.rs index 16c19579..56e99b9f 100644 --- a/crates/iceberg-catalog-bin/src/main.rs +++ b/crates/iceberg-catalog-bin/src/main.rs @@ -1,10 +1,10 @@ use anyhow::Context; use async_nats::ServerAddr; use clap::{Parser, Subcommand}; +use iceberg_catalog::service::contract_verification::ContractVerifiers; use iceberg_catalog::service::event_publisher::{ CloudEventSink, CloudEventsPublisher, NatsPublisher, }; -use iceberg_catalog::service::table_change_check::TableChangeCheckers; use iceberg_catalog::{ implementations::{ postgres::{Catalog, CatalogState, SecretsState, SecretsStore}, @@ -82,7 +82,7 @@ async fn serve(bind_addr: std::net::SocketAddr) -> Result<(), anyhow::Error> { CloudEventsPublisher { sinks: cloud_event_sinks, }, - TableChangeCheckers::new(vec![]), + ContractVerifiers::new(vec![]), ); service_serve(listener, router).await?; diff --git a/crates/iceberg-catalog/src/catalog/tables.rs b/crates/iceberg-catalog/src/catalog/tables.rs index f0d039ad..6b4334ce 100644 --- a/crates/iceberg-catalog/src/catalog/tables.rs +++ b/crates/iceberg-catalog/src/catalog/tables.rs @@ -20,9 +20,9 @@ use super::{ namespace::{uppercase_first_letter, validate_namespace_ident}, require_warehouse_id, CatalogServer, }; +use crate::service::contract_verification::ContractVerification; use crate::service::event_publisher::{CloudEventsPublisher, EventMetadata}; use crate::service::storage::StorageCredential; -use crate::service::table_change_check::TableChangeCheck; use crate::service::{ auth::AuthZHandler, secrets::SecretStore, Catalog, CreateTableResult, LoadTableResult as CatalogLoadTableResult, State, Transaction, @@ -403,7 +403,7 @@ impl )?; state .v1_state - .table_change_checkers + .contract_verifiers .check(&updates, table_id, &result.previous_table_metadata) .await? .into_result()?; @@ -503,7 +503,7 @@ impl // ToDo: Delete metadata files state .v1_state - .table_change_checkers + .contract_verifiers .check_drop(table_id) .await? .into_result()?; @@ -661,6 +661,13 @@ impl ) .await?; + state + .v1_state + .contract_verifiers + .check_rename(source_id, &destination) + .await? + .into_result()?; + transaction.commit().await?; Ok(()) @@ -842,7 +849,7 @@ impl { state .v1_state - .table_change_checkers + .contract_verifiers .check(&update, *table_uuid, &response.previous_table_metadata) .await? .into_result()?; diff --git a/crates/iceberg-catalog/src/service/table_change_check.rs b/crates/iceberg-catalog/src/service/contract_verification.rs similarity index 51% rename from crates/iceberg-catalog/src/service/table_change_check.rs rename to crates/iceberg-catalog/src/service/contract_verification.rs index ff276570..765c6aff 100644 --- a/crates/iceberg-catalog/src/service/table_change_check.rs +++ b/crates/iceberg-catalog/src/service/contract_verification.rs @@ -1,7 +1,8 @@ +#![allow(clippy::module_name_repetitions)] use crate::service::TableIdentUuid; use async_trait::async_trait; use iceberg::spec::TableMetadata; -use iceberg::TableUpdate; +use iceberg::{TableIdent, TableUpdate}; use iceberg_ext::catalog::rest::ErrorModel; use std::fmt::Debug; use std::sync::Arc; @@ -17,15 +18,16 @@ use std::sync::Arc; /// ```rust /// use async_trait::async_trait; /// use iceberg::spec::TableMetadata; -/// use iceberg::TableUpdate; -/// use iceberg_catalog::service::table_change_check::{TableChangeCheck, TableCheckResult}; +/// use iceberg::{TableIdent, TableUpdate}; +/// use iceberg_catalog::service::contract_verification::{ContractVerification, ContractVerificationOutcome}; /// use iceberg_catalog::service::TableIdentUuid; /// use iceberg_ext::catalog::rest::ErrorModel; /// /// #[derive(Debug)] /// pub struct AllowAllChecker; +/// /// #[async_trait] -/// impl TableChangeCheck for AllowAllChecker { +/// impl ContractVerification for AllowAllChecker { /// fn name(&self) -> &'static str { /// "AllowAllChecker" /// } @@ -34,15 +36,19 @@ use std::sync::Arc; /// _table_updates: &[TableUpdate], /// _table_ident_uuid: TableIdentUuid, /// _current_metadata: &TableMetadata, -/// ) -> Result { -/// Ok(TableCheckResult::Clear {}) +/// ) -> Result { +/// Ok(ContractVerificationOutcome::Clear {}) /// } /// /// async fn check_drop( /// &self, /// _table_ident_uuid: TableIdentUuid, -/// ) -> Result { -/// Ok(TableCheckResult::Clear {}) +/// ) -> Result { +/// Ok(ContractVerificationOutcome::Clear {}) +/// } +/// +/// async fn check_rename(&self, source: TableIdentUuid, destination: &TableIdent) -> Result { +/// Ok(ContractVerificationOutcome::Clear {}) /// } /// } /// @@ -50,7 +56,7 @@ use std::sync::Arc; /// pub struct DenyAllChecker; /// /// #[async_trait] -/// impl TableChangeCheck for DenyAllChecker { +/// impl ContractVerification for DenyAllChecker { /// fn name(&self) -> &'static str { /// "DenyAllChecker" /// } @@ -59,8 +65,8 @@ use std::sync::Arc; /// _table_updates: &[TableUpdate], /// _table_ident_uuid: TableIdentUuid, /// _current_metadata: &TableMetadata, -/// ) -> Result { -/// Ok(TableCheckResult::Block { +/// ) -> Result { +/// Ok(ContractVerificationOutcome::Violation { /// error_model: ErrorModel::builder() /// .code(409) /// .message("Denied") @@ -73,8 +79,19 @@ use std::sync::Arc; /// async fn check_drop( /// &self, /// _table_ident_uuid: TableIdentUuid, -/// ) -> Result { -/// Ok(TableCheckResult::Block { +/// ) -> Result { +/// Ok(ContractVerificationOutcome::Violation { +/// error_model: ErrorModel::builder() +/// .code(409) +/// .r#type("ContractViolation".to_string()) +/// .message("Denied") +/// .build() +/// .into(), +/// }) +/// } +/// +/// async fn check_rename(&self, source: TableIdentUuid, destination: &TableIdent) -> Result { +/// Ok(ContractVerificationOutcome::Violation { /// error_model: ErrorModel::builder() /// .code(409) /// .r#type("ContractViolation".to_string()) @@ -86,7 +103,7 @@ use std::sync::Arc; /// } /// ``` #[async_trait] -pub trait TableChangeCheck: Debug { +pub trait ContractVerification: Debug { fn name(&self) -> &'static str; async fn check( @@ -94,37 +111,44 @@ pub trait TableChangeCheck: Debug { table_updates: &[TableUpdate], table_ident_uuid: TableIdentUuid, current_metadata: &TableMetadata, - ) -> Result; + ) -> Result; async fn check_drop( &self, table_ident_uuid: TableIdentUuid, - ) -> Result; + ) -> Result; + + async fn check_rename( + &self, + source: TableIdentUuid, + destination: &TableIdent, + ) -> Result; } #[derive(Debug)] -pub enum TableCheckResult { +pub enum ContractVerificationOutcome { Clear {}, - Block { error_model: ErrorModel }, + Violation { error_model: ErrorModel }, } -impl TableCheckResult { +impl ContractVerificationOutcome { /// Converts `self` into a `Result<(), ErrorModel>`. /// - /// When using `TableChangeCheck`, we are presented with a `Result` + /// When using `ContractVerificationOutcome`, we are presented with a + /// `Result` /// where the outer `ErrorModel` indicates that a checker failed, that would indicate a problem /// with the checker itself and may be returned as an internal server error. This function here - /// offers convenience to go from the `TableCheckResult` to a `Result<(), ErrorModel>` which can - /// be short-circuited using the `?` operator in the handler. + /// offers convenience to go from the `ContractVerificationOutcome` to a `Result<(), ErrorModel>` + /// which can be short-circuited using the `?` operator in the handler. /// /// # Example /// /// ```rust - /// use iceberg_catalog::service::table_change_check::TableCheckResult; + /// use iceberg_catalog::service::contract_verification::ContractVerificationOutcome; /// use iceberg_ext::catalog::rest::ErrorModel; /// /// fn my_handler() -> Result<(), ErrorModel> { - /// let result: Result = Ok(TableCheckResult::Clear {}); + /// let result: Result = Ok(ContractVerificationOutcome::Clear {}); /// result?.into_result()?; /// Ok(()) /// } @@ -132,32 +156,32 @@ impl TableCheckResult { /// /// # Errors /// - /// - extracts `error_model` from `TableCheckResult::Block` and returns it as an `Err` for + /// - extracts `error_model` from `ContractVerificationOutcome::Block` and returns it as an `Err` for /// convenience. pub fn into_result(self) -> Result<(), ErrorModel> { match self { - TableCheckResult::Clear {} => Ok(()), - TableCheckResult::Block { error_model } => Err(error_model), + ContractVerificationOutcome::Clear {} => Ok(()), + ContractVerificationOutcome::Violation { error_model } => Err(error_model), } } } #[derive(Debug, Clone)] -pub struct TableChangeCheckers { - checkers: Vec>, +pub struct ContractVerifiers { + checkers: Vec>, } -impl TableChangeCheckers { +impl ContractVerifiers { #[must_use] - pub fn new(checkers: Vec>) -> Self { + pub fn new(checkers: Vec>) -> Self { Self { checkers } } } #[async_trait] -impl TableChangeCheck for TableChangeCheckers { +impl ContractVerification for ContractVerifiers { fn name(&self) -> &'static str { - "TableChangeCheckers" + "ContractVerifiers" } async fn check( @@ -165,14 +189,14 @@ impl TableChangeCheck for TableChangeCheckers { table_updates: &[TableUpdate], table_ident_uuid: TableIdentUuid, current_metadata: &TableMetadata, - ) -> Result { + ) -> Result { for checker in &self.checkers { match checker .check(table_updates, table_ident_uuid, current_metadata) .await { - Ok(TableCheckResult::Clear {}) => {} - Ok(block_result @ TableCheckResult::Block { error_model: _ }) => { + Ok(ContractVerificationOutcome::Clear {}) => {} + Ok(block_result @ ContractVerificationOutcome::Violation { error_model: _ }) => { tracing::info!( "Checker {} blocked change on table '{}'", checker.name(), @@ -187,16 +211,16 @@ impl TableChangeCheck for TableChangeCheckers { } } - Ok(TableCheckResult::Clear {}) + Ok(ContractVerificationOutcome::Clear {}) } async fn check_drop( &self, table_ident_uuid: TableIdentUuid, - ) -> Result { + ) -> Result { for checker in &self.checkers { match checker.check_drop(table_ident_uuid).await { - Ok(TableCheckResult::Clear {}) => {} - Ok(block_result @ TableCheckResult::Block { error_model: _ }) => { + Ok(ContractVerificationOutcome::Clear {}) => {} + Ok(block_result @ ContractVerificationOutcome::Violation { error_model: _ }) => { tracing::info!( "Checker {} blocked drop on table '{}'", checker.name(), @@ -210,6 +234,32 @@ impl TableChangeCheck for TableChangeCheckers { } } } - Ok(TableCheckResult::Clear {}) + Ok(ContractVerificationOutcome::Clear {}) + } + + async fn check_rename( + &self, + source: TableIdentUuid, + destination: &TableIdent, + ) -> Result { + for checker in &self.checkers { + match checker.check_rename(source, destination).await { + Ok(ContractVerificationOutcome::Clear {}) => {} + Ok(block_result @ ContractVerificationOutcome::Violation { error_model: _ }) => { + tracing::info!( + "Checker {} blocked rename from '{}' to '{:?}'", + checker.name(), + source, + destination + ); + return Ok(block_result); + } + Err(error) => { + tracing::warn!("Checker {} failed", checker.name()); + return Err(error); + } + } + } + Ok(ContractVerificationOutcome::Clear {}) } } diff --git a/crates/iceberg-catalog/src/service/mod.rs b/crates/iceberg-catalog/src/service/mod.rs index 94ddae40..61dd0667 100644 --- a/crates/iceberg-catalog/src/service/mod.rs +++ b/crates/iceberg-catalog/src/service/mod.rs @@ -1,12 +1,12 @@ pub mod auth; mod catalog; pub mod config; +pub mod contract_verification; pub mod event_publisher; #[cfg(feature = "router")] pub mod router; pub mod secrets; pub mod storage; -pub mod table_change_check; pub use catalog::{ Catalog, CommitTableResponseExt, CreateTableResult, GetStorageConfigResult, @@ -20,8 +20,8 @@ use http::StatusCode; use iceberg::NamespaceIdent; use std::str::FromStr; +use crate::service::contract_verification::ContractVerifiers; use crate::service::event_publisher::CloudEventsPublisher; -use crate::service::table_change_check::TableChangeCheckers; pub use secrets::{SecretIdent, SecretStore}; use self::auth::AuthZHandler; @@ -62,7 +62,7 @@ pub struct State { pub catalog: C::State, pub secrets: S::State, pub publisher: CloudEventsPublisher, - pub table_change_checkers: TableChangeCheckers, + pub contract_verifiers: ContractVerifiers, } impl ServiceState for State {} diff --git a/crates/iceberg-catalog/src/service/router.rs b/crates/iceberg-catalog/src/service/router.rs index afe7f4be..b55ba640 100644 --- a/crates/iceberg-catalog/src/service/router.rs +++ b/crates/iceberg-catalog/src/service/router.rs @@ -3,7 +3,7 @@ use crate::tracing::{MakeRequestUuid7, RestMakeSpan}; use crate::api::management::ApiServer; use crate::api::{iceberg::v1::new_v1_full_router, shutdown_signal, ApiContext}; -use crate::service::table_change_check::TableChangeCheckers; +use crate::service::contract_verification::ContractVerifiers; use axum::{routing::get, Router}; use tower::ServiceBuilder; use tower_http::{ @@ -30,7 +30,7 @@ pub fn new_full_router< catalog_state: C::State, secrets_state: S::State, publisher: CloudEventsPublisher, - table_change_checkers: TableChangeCheckers, + table_change_checkers: ContractVerifiers, ) -> Router { let v1_routes = new_v1_full_router::< crate::catalog::ConfigServer, @@ -68,7 +68,7 @@ pub fn new_full_router< catalog: catalog_state, secrets: secrets_state, publisher, - table_change_checkers, + contract_verifiers: table_change_checkers, }, }) } From ad4b64692c0e6fe3566e98a19e6779bc1a017d83 Mon Sep 17 00:00:00 2001 From: Tobias Puetz Date: Tue, 11 Jun 2024 20:21:04 +0200 Subject: [PATCH 5/8] review comments --- crates/iceberg-catalog/src/catalog/tables.rs | 32 +++++++++---------- .../src/service/contract_verification.rs | 21 ++++-------- 2 files changed, 23 insertions(+), 30 deletions(-) diff --git a/crates/iceberg-catalog/src/catalog/tables.rs b/crates/iceberg-catalog/src/catalog/tables.rs index 6b4334ce..1088d9f1 100644 --- a/crates/iceberg-catalog/src/catalog/tables.rs +++ b/crates/iceberg-catalog/src/catalog/tables.rs @@ -371,7 +371,7 @@ impl // serialize body before moving it let body = maybe_body_to_json(&request); - let updates = copy_updates(updates); + let updates = clone_updates(updates); let transaction_request = CommitTransactionRequest { table_changes: vec![request], @@ -783,7 +783,7 @@ impl if let Some(uuid) = table_ids.get(id) { events.push(maybe_body_to_json(commit_table_request)); event_table_ids.push((id.clone(), *uuid)); - updates.push(copy_updates(&commit_table_request.updates)); + updates.push(clone_updates(&commit_table_request.updates)); } } } @@ -796,6 +796,19 @@ impl ) .await?; + for ((update, (_, table_uuid)), response) in updates + .into_iter() + .zip(&event_table_ids) + .zip(&commit_response) + { + state + .v1_state + .contract_verifiers + .check(&update, *table_uuid, &response.previous_table_metadata) + .await? + .into_result()?; + } + // We don't commit the transaction yet, first we need to write the metadata file. // Fetch all secrets concurrently let storage_secrets = futures::future::try_join_all( @@ -842,19 +855,6 @@ impl )); } - for ((update, (_, table_uuid)), response) in updates - .into_iter() - .zip(&event_table_ids) - .zip(&commit_response) - { - state - .v1_state - .contract_verifiers - .check(&update, *table_uuid, &response.previous_table_metadata) - .await? - .into_result()?; - } - futures::future::try_join_all(write_futures).await?; transaction.commit().await?; @@ -967,7 +967,7 @@ fn maybe_body_to_json(request: impl Serialize) -> serde_json::Value { } // TableUpdate is not clone but all containing fields are so we use this function to clone.. -fn copy_updates(updates: &[TableUpdate]) -> Vec { +fn clone_updates(updates: &[TableUpdate]) -> Vec { updates .iter() .map(|u| match u { diff --git a/crates/iceberg-catalog/src/service/contract_verification.rs b/crates/iceberg-catalog/src/service/contract_verification.rs index 765c6aff..fca36c61 100644 --- a/crates/iceberg-catalog/src/service/contract_verification.rs +++ b/crates/iceberg-catalog/src/service/contract_verification.rs @@ -34,7 +34,6 @@ use std::sync::Arc; /// async fn check( /// &self, /// _table_updates: &[TableUpdate], -/// _table_ident_uuid: TableIdentUuid, /// _current_metadata: &TableMetadata, /// ) -> Result { /// Ok(ContractVerificationOutcome::Clear {}) @@ -63,7 +62,6 @@ use std::sync::Arc; /// async fn check( /// &self, /// _table_updates: &[TableUpdate], -/// _table_ident_uuid: TableIdentUuid, /// _current_metadata: &TableMetadata, /// ) -> Result { /// Ok(ContractVerificationOutcome::Violation { @@ -109,7 +107,6 @@ pub trait ContractVerification: Debug { async fn check( &self, table_updates: &[TableUpdate], - table_ident_uuid: TableIdentUuid, current_metadata: &TableMetadata, ) -> Result; @@ -187,20 +184,16 @@ impl ContractVerification for ContractVerifiers { async fn check( &self, table_updates: &[TableUpdate], - table_ident_uuid: TableIdentUuid, current_metadata: &TableMetadata, ) -> Result { for checker in &self.checkers { - match checker - .check(table_updates, table_ident_uuid, current_metadata) - .await - { + match checker.check(table_updates, current_metadata).await { Ok(ContractVerificationOutcome::Clear {}) => {} Ok(block_result @ ContractVerificationOutcome::Violation { error_model: _ }) => { tracing::info!( - "Checker {} blocked change on table '{}'", + "ContractVerifier '{}' blocked change on table '{}'", checker.name(), - table_ident_uuid + current_metadata.table_uuid ); return Ok(block_result); } @@ -222,14 +215,14 @@ impl ContractVerification for ContractVerifiers { Ok(ContractVerificationOutcome::Clear {}) => {} Ok(block_result @ ContractVerificationOutcome::Violation { error_model: _ }) => { tracing::info!( - "Checker {} blocked drop on table '{}'", + "ContractVerifier '{}' blocked drop on table '{}'", checker.name(), table_ident_uuid ); return Ok(block_result); } Err(error) => { - tracing::warn!("Checker {} failed", checker.name()); + tracing::warn!("ContractVerifier '{}' failed", checker.name()); return Err(error); } } @@ -247,7 +240,7 @@ impl ContractVerification for ContractVerifiers { Ok(ContractVerificationOutcome::Clear {}) => {} Ok(block_result @ ContractVerificationOutcome::Violation { error_model: _ }) => { tracing::info!( - "Checker {} blocked rename from '{}' to '{:?}'", + "ContractVerifier '{}' blocked rename from '{}' to '{:?}'", checker.name(), source, destination @@ -255,7 +248,7 @@ impl ContractVerification for ContractVerifiers { return Ok(block_result); } Err(error) => { - tracing::warn!("Checker {} failed", checker.name()); + tracing::warn!("ContractVerifier '{}' failed", checker.name()); return Err(error); } } From b57f4ed73ef5eb59d6f4eb510efa278bf42ed3a5 Mon Sep 17 00:00:00 2001 From: Tobias Puetz Date: Tue, 11 Jun 2024 20:31:34 +0200 Subject: [PATCH 6/8] more review comments --- crates/iceberg-catalog/src/catalog/tables.rs | 30 +++++++++++--------- 1 file changed, 16 insertions(+), 14 deletions(-) diff --git a/crates/iceberg-catalog/src/catalog/tables.rs b/crates/iceberg-catalog/src/catalog/tables.rs index e7752752..2a3b4780 100644 --- a/crates/iceberg-catalog/src/catalog/tables.rs +++ b/crates/iceberg-catalog/src/catalog/tables.rs @@ -20,7 +20,7 @@ use super::{ namespace::{uppercase_first_letter, validate_namespace_ident}, require_warehouse_id, CatalogServer, }; -use crate::service::contract_verification::ContractVerification; +use crate::service::contract_verification::{ContractVerification, ContractVerificationOutcome}; use crate::service::event_publisher::{CloudEventsPublisher, EventMetadata}; use crate::service::storage::StorageCredential; use crate::service::{ @@ -451,7 +451,7 @@ impl state .v1_state .contract_verifiers - .check(&updates, table_id, &result.previous_table_metadata) + .check(&updates, &result.previous_table_metadata) .await? .into_result()?; // We don't commit the transaction yet, first we need to write the metadata file. @@ -842,19 +842,21 @@ impl transaction.transaction(), ) .await?; - - for ((update, (_, table_uuid)), response) in updates - .into_iter() - .zip(&event_table_ids) + let futures = updates + .iter() .zip(&commit_response) - { - state - .v1_state - .contract_verifiers - .check(&update, *table_uuid, &response.previous_table_metadata) - .await? - .into_result()?; - } + .map(|(update, response)| { + state + .v1_state + .contract_verifiers + .check(update, &response.previous_table_metadata) + }); + + futures::future::try_join_all(futures) + .await? + .into_iter() + .map(ContractVerificationOutcome::into_result) + .collect::, ErrorModel>>()?; // We don't commit the transaction yet, first we need to write the metadata file. // Fetch all secrets concurrently From e57e240626b2c9c2774a9ff382265cdf6c4a8bb5 Mon Sep 17 00:00:00 2001 From: Tobias Puetz Date: Tue, 11 Jun 2024 20:33:27 +0200 Subject: [PATCH 7/8] add change event to rename table --- crates/iceberg-catalog/src/catalog/tables.rs | 20 +++++++++++++++++++- 1 file changed, 19 insertions(+), 1 deletion(-) diff --git a/crates/iceberg-catalog/src/catalog/tables.rs b/crates/iceberg-catalog/src/catalog/tables.rs index 2a3b4780..33dc0e52 100644 --- a/crates/iceberg-catalog/src/catalog/tables.rs +++ b/crates/iceberg-catalog/src/catalog/tables.rs @@ -645,7 +645,8 @@ impl request_metadata: RequestMetadata, ) -> Result<()> { // ------------------- VALIDATIONS ------------------- - let warehouse_id = require_warehouse_id(prefix)?; + let warehouse_id = require_warehouse_id(prefix.clone())?; + let body = maybe_body_to_json(&request); let RenameTableRequest { source, destination, @@ -717,6 +718,23 @@ impl transaction.commit().await?; + emit_change_event( + EventMetadata { + table_id: *source_id.as_uuid(), + warehouse_id: *warehouse_id.as_uuid(), + name: Cow::Borrowed(&source.name), + namespace: Cow::Owned(source.namespace.encode_in_url()), + prefix: Cow::Owned(prefix.map(Prefix::into_string).unwrap_or_default()), + num_events: 1, + sequence_number: 0, + trace_id: request_metadata.request_id, + }, + body, + "renameTable", + state.v1_state.publisher.clone(), + ) + .await; + Ok(()) } From ec7b0a8d6e60a9156f52f4906776885916de0796 Mon Sep 17 00:00:00 2001 From: Tobias Puetz Date: Wed, 12 Jun 2024 09:52:46 +0200 Subject: [PATCH 8/8] update readme --- README.md | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index 798deef3..552a62b9 100644 --- a/README.md +++ b/README.md @@ -17,7 +17,9 @@ specification based on [apache/iceberg-rust](https://github.com/apache/iceberg-r We have started this implementation because we were missing OPENNESS such as customizability, support for on-premise deployments and other features that are important for us in existing Iceberg Catalogs. Please find following some of our focuses with this implementation: - **Customizable**: If you already manage Access to Tables in your company somewhere else or need the catalog to stream change events to a different system, you can do so by implementing a few methods. Please find more details in the [Customization Guide](CUSTOMIZING.md). -- **Change Events**: Built-in support to emit change events (CloudEvents), which enables you to react to any change that happen to your tables. Changes can also be prohibited by external systems using our request / response handler. This can be used to prohibit changes to tables that would validate Data Contracts, Quality SLOs etc. + +- **Change Events**: Built-in support to emit change events (CloudEvents), which enables you to react to any change that happen to your tables. +- **Change Approval**: Changes can also be prohibited by external systems. This can be used to prohibit changes to tables that would invalidate Data Contracts, Quality SLOs etc. Simply integrate with your own change approval via our `ContractVerification` trait. - **Multi-Tenant capable**: A single deployment of our catalog can serve multiple projects - all with a single entrypoint. All Iceberg and Warehouse configurations are completly separated between Warehouses. - **Written in Rust**: Single 18Mb all-in-one binary - no JVM or Python env required. - **Storage Access Management**: Built-in S3-Signing that enables support for self-hosted as well as AWS S3 WITHOUT sharing S3 credentials with clients.