Skip to content

Commit

Permalink
Add table-change-checker
Browse files Browse the repository at this point in the history
  • Loading branch information
twuebi committed Jun 11, 2024
1 parent ec9e86e commit 12311bc
Show file tree
Hide file tree
Showing 11 changed files with 449 additions and 17 deletions.
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ The Iceberg REST Protocol has become the standard for catalogs in open Lakehouse
We have started this implementation because we were missing "OPENNESS" such as customizability, support for on-premise deployments and other features that are important for us in existing Iceberg Catalogs. Please find following some of our focuses with this implementation:

- **Customizable**: If you already manage Access to Tables in your company somewhere else or need the catalog to stream change events to a different system, you can do so by implementing a few methods. Please find more details in the [Customization Guide](CUSTOMIZING.md).
- **Change Events**: Built-in support to emit change events (CloudEvents), which enables you to react to any change that happen to your tables. Changes can also be prohibited by external systems using our request / response handler. This can be used to prohibit changes to tables that would validate Data Contracts, Quality SLOs etc.
- **Change Events**: Built-in support to emit change events (CloudEvents), which enables you to react to any change that happen to your tables.
- **Change Approval**: Changes can also be prohibited by external systems. This can be used to prohibit changes to tables that would validate Data Contracts, Quality SLOs etc. Simply integrate with your own change approval via our `TableChangeCheck` trait.
- **Multi-Tenant capable**: A single deployment of our catalog can serve multiple projects - all with a single entrypoint. All Iceberg and Warehouse configurations are completly separated between Warehouses.
- **Written in Rust**: Single 18Mb all-in-one binary - no JVM or Python env required.
- **Storage Access Management**: Built-in S3-Signing that enables support for self-hosted as well as AWS S3 WITHOUT sharing S3 credentials with clients.
Expand Down
2 changes: 2 additions & 0 deletions crates/iceberg-catalog-bin/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use clap::{Parser, Subcommand};
use iceberg_catalog::service::event_publisher::{
CloudEventSink, CloudEventsPublisher, NatsPublisher,
};
use iceberg_catalog::service::table_change_check::TableChangeCheckers;
use iceberg_catalog::{
implementations::{
postgres::{Catalog, CatalogState, SecretsState, SecretsStore},
Expand Down Expand Up @@ -81,6 +82,7 @@ async fn serve(bind_addr: std::net::SocketAddr) -> Result<(), anyhow::Error> {
CloudEventsPublisher {
sinks: cloud_event_sinks,
},
TableChangeCheckers::new(vec![]),
);

service_serve(listener, router).await?;
Expand Down
93 changes: 92 additions & 1 deletion crates/iceberg-catalog/src/catalog/tables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use super::{
};
use crate::service::event_publisher::{CloudEventsPublisher, EventMetadata};
use crate::service::storage::StorageCredential;
use crate::service::table_change_check::TableChangeCheck;
use crate::service::{
auth::AuthZHandler, secrets::SecretStore, Catalog, CreateTableResult,
LoadTableResult as CatalogLoadTableResult, State, Transaction,
Expand Down Expand Up @@ -370,6 +371,8 @@ impl<C: Catalog, A: AuthZHandler, S: SecretStore>
// serialize body before moving it
let body = maybe_body_to_json(&request);

let updates = copy_updates(updates);

let transaction_request = CommitTransactionRequest {
table_changes: vec![request],
};
Expand Down Expand Up @@ -398,7 +401,12 @@ impl<C: Catalog, A: AuthZHandler, S: SecretStore>
.r#type("NoResultFromCommitTableTransaction".to_string())
.build(),
)?;

state
.v1_state
.table_change_checkers
.check(&updates, table_id, &result.previous_table_metadata)
.await?
.into_result()?;
// We don't commit the transaction yet, first we need to write the metadata file.
let storage_secret = if let Some(secret_id) = &result.storage_config.storage_secret_ident {
Some(
Expand Down Expand Up @@ -493,6 +501,12 @@ impl<C: Catalog, A: AuthZHandler, S: SecretStore>
C::drop_table(&warehouse_id, &table_id, transaction.transaction()).await?;

// ToDo: Delete metadata files
state
.v1_state
.table_change_checkers
.check_drop(table_id)
.await?
.into_result()?;

transaction.commit().await?;

Expand Down Expand Up @@ -756,11 +770,13 @@ impl<C: Catalog, A: AuthZHandler, S: SecretStore>
// serialize request body before moving it here
let mut events = vec![];
let mut event_table_ids: Vec<(TableIdent, TableIdentUuid)> = vec![];
let mut updates = vec![];
for commit_table_request in &request.table_changes {
if let Some(id) = &commit_table_request.identifier {
if let Some(uuid) = table_ids.get(id) {
events.push(maybe_body_to_json(commit_table_request));
event_table_ids.push((id.clone(), *uuid));
updates.push(copy_updates(&commit_table_request.updates));
}
}
}
Expand Down Expand Up @@ -819,6 +835,19 @@ impl<C: Catalog, A: AuthZHandler, S: SecretStore>
));
}

for ((update, (_, table_uuid)), response) in updates
.into_iter()
.zip(&event_table_ids)
.zip(&commit_response)
{
state
.v1_state
.table_change_checkers
.check(&update, *table_uuid, &response.previous_table_metadata)
.await?
.into_result()?;
}

futures::future::try_join_all(write_futures).await?;

transaction.commit().await?;
Expand Down Expand Up @@ -929,3 +958,65 @@ fn maybe_body_to_json(request: impl Serialize) -> serde_json::Value {
serde_json::Value::Null
}
}

// TableUpdate is not clone but all containing fields are so we use this function to clone..
fn copy_updates(updates: &[TableUpdate]) -> Vec<TableUpdate> {
updates
.iter()
.map(|u| match u {
TableUpdate::AddSnapshot { snapshot } => TableUpdate::AddSnapshot {
snapshot: snapshot.clone(),
},
TableUpdate::AssignUuid { uuid } => TableUpdate::AssignUuid { uuid: *uuid },
TableUpdate::AddSortOrder { sort_order } => TableUpdate::AddSortOrder {
sort_order: sort_order.clone(),
},
TableUpdate::AddSpec { spec } => TableUpdate::AddSpec { spec: spec.clone() },
TableUpdate::AddSchema {
schema,
last_column_id,
} => TableUpdate::AddSchema {
schema: schema.clone(),
last_column_id: *last_column_id,
},
TableUpdate::UpgradeFormatVersion { format_version } => {
TableUpdate::UpgradeFormatVersion {
format_version: *format_version,
}
}
TableUpdate::SetCurrentSchema { schema_id } => TableUpdate::SetCurrentSchema {
schema_id: *schema_id,
},
TableUpdate::SetDefaultSortOrder { sort_order_id } => {
TableUpdate::SetDefaultSortOrder {
sort_order_id: *sort_order_id,
}
}
TableUpdate::RemoveSnapshotRef { ref_name } => TableUpdate::RemoveSnapshotRef {
ref_name: ref_name.clone(),
},
TableUpdate::SetDefaultSpec { spec_id } => {
TableUpdate::SetDefaultSpec { spec_id: *spec_id }
}
TableUpdate::SetSnapshotRef {
ref_name,
reference,
} => TableUpdate::SetSnapshotRef {
ref_name: ref_name.clone(),
reference: reference.clone(),
},
TableUpdate::RemoveSnapshots { snapshot_ids } => TableUpdate::RemoveSnapshots {
snapshot_ids: snapshot_ids.clone(),
},
TableUpdate::SetLocation { location } => TableUpdate::SetLocation {
location: location.clone(),
},
TableUpdate::SetProperties { updates } => TableUpdate::SetProperties {
updates: updates.clone(),
},
TableUpdate::RemoveProperties { removals } => TableUpdate::RemoveProperties {
removals: removals.clone(),
},
})
.collect()
}
2 changes: 2 additions & 0 deletions crates/iceberg-catalog/src/implementations/postgres/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -688,6 +688,7 @@ fn apply_commits(commits: Vec<CommitContext>) -> Result<Vec<CommitTableResponseE
let metadata_location = context
.storage_profile
.metadata_location(&previous_location, &metadata_id);
let previous_table_metadata = context.metadata.clone();
let mut builder = TableMetadataAggregate::new_from_metadata(context.metadata);
for update in context.updates {
match &update {
Expand Down Expand Up @@ -728,6 +729,7 @@ fn apply_commits(commits: Vec<CommitContext>) -> Result<Vec<CommitTableResponseE
storage_secret_ident: context.storage_secret_ident,
namespace_id: context.namespace_id,
},
previous_table_metadata,
});
}

Expand Down
1 change: 1 addition & 0 deletions crates/iceberg-catalog/src/service/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ pub struct GetStorageConfigResult {
pub struct CommitTableResponseExt {
pub commit_response: CommitTableResponse,
pub storage_config: GetStorageConfigResult,
pub previous_table_metadata: TableMetadata,
}

#[async_trait::async_trait]
Expand Down
3 changes: 3 additions & 0 deletions crates/iceberg-catalog/src/service/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ pub mod event_publisher;
pub mod router;
pub mod secrets;
pub mod storage;
pub mod table_change_check;

pub use catalog::{
Catalog, CommitTableResponseExt, CreateTableResult, GetStorageConfigResult,
Expand All @@ -20,6 +21,7 @@ use iceberg::NamespaceIdent;
use std::str::FromStr;

use crate::service::event_publisher::CloudEventsPublisher;
use crate::service::table_change_check::TableChangeCheckers;
pub use secrets::{SecretIdent, SecretStore};

use self::auth::AuthZHandler;
Expand Down Expand Up @@ -60,6 +62,7 @@ pub struct State<A: AuthZHandler, C: Catalog, S: SecretStore> {
pub catalog: C::State,
pub secrets: S::State,
pub publisher: CloudEventsPublisher,
pub table_change_checkers: TableChangeCheckers,
}

impl<A: AuthZHandler, C: Catalog, S: SecretStore> ServiceState for State<A, C, S> {}
Expand Down
3 changes: 3 additions & 0 deletions crates/iceberg-catalog/src/service/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use crate::tracing::{MakeRequestUuid7, RestMakeSpan};

use crate::api::management::ApiServer;
use crate::api::{iceberg::v1::new_v1_full_router, shutdown_signal, ApiContext};
use crate::service::table_change_check::TableChangeCheckers;
use axum::{routing::get, Router};
use tower::ServiceBuilder;
use tower_http::{
Expand All @@ -29,6 +30,7 @@ pub fn new_full_router<
catalog_state: C::State,
secrets_state: S::State,
publisher: CloudEventsPublisher,
table_change_checkers: TableChangeCheckers,
) -> Router {
let v1_routes = new_v1_full_router::<
crate::catalog::ConfigServer<CP, C, AH, A>,
Expand Down Expand Up @@ -66,6 +68,7 @@ pub fn new_full_router<
catalog: catalog_state,
secrets: secrets_state,
publisher,
table_change_checkers,
},
})
}
Expand Down
Loading

0 comments on commit 12311bc

Please sign in to comment.