Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Table change check #80

Merged
merged 11 commits into from
Jun 12, 2024
14 changes: 10 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@
[![Build Status][actions-badge]][actions-url]

[actions-badge]: https://github.com/hansetag/iceberg-catalog/workflows/CI/badge.svg?branch=main

[actions-url]: https://github.com/hansetag/iceberg-catalog/actions?query=workflow%3ACI+branch%3Amain

This is TIP: A Rust-native implementation of the [Apache Iceberg](https://iceberg.apache.org/) REST Catalog specification based on [apache/iceberg-rust](https://github.com/apache/iceberg-rust).
This is TIP: A Rust-native implementation of the [Apache Iceberg](https://iceberg.apache.org/) REST Catalog
specification based on [apache/iceberg-rust](https://github.com/apache/iceberg-rust).

# Scope and Features

Expand All @@ -15,7 +17,9 @@ This is TIP: A Rust-native implementation of the [Apache Iceberg](https://iceber
We have started this implementation because we were missing OPENNESS such as customizability, support for on-premise deployments and other features that are important for us in existing Iceberg Catalogs. Please find following some of our focuses with this implementation:

- **Customizable**: If you already manage Access to Tables in your company somewhere else or need the catalog to stream change events to a different system, you can do so by implementing a few methods. Please find more details in the [Customization Guide](CUSTOMIZING.md).
- **Change Events**: Built-in support to emit change events (CloudEvents), which enables you to react to any change that happen to your tables. Changes can also be prohibited by external systems using our request / response handler. This can be used to prohibit changes to tables that would validate Data Contracts, Quality SLOs etc.

- **Change Events**: Built-in support to emit change events (CloudEvents), which enables you to react to any change that happen to your tables.
- **Change Approval**: Changes can also be prohibited by external systems. This can be used to prohibit changes to tables that would invalidate Data Contracts, Quality SLOs etc. Simply integrate with your own change approval via our `ContractVerification` trait.
- **Multi-Tenant capable**: A single deployment of our catalog can serve multiple projects - all with a single entrypoint. All Iceberg and Warehouse configurations are completly separated between Warehouses.
- **Written in Rust**: Single 18Mb all-in-one binary - no JVM or Python env required.
- **Storage Access Management**: Built-in S3-Signing that enables support for self-hosted as well as AWS S3 WITHOUT sharing S3 credentials with clients.
Expand Down Expand Up @@ -98,7 +102,8 @@ Then open your browser and head to `localhost:8888`.

# Multiple Projects

The iceberg-rest server can host multiple independent warehouses that are again grouped by projects. The overall structure looks like this:
The iceberg-rest server can host multiple independent warehouses that are again grouped by projects. The overall
structure looks like this:

```
<project-1-uuid>/
Expand Down Expand Up @@ -144,7 +149,8 @@ Configuration parameters if Postgres is used as a backend:

# Limitations

- Table Metadata is currently limited to `256Mb` for the `postgres` implementation. If you need more, you should probably vaccum your table ;)
- Table Metadata is currently limited to `256Mb` for the `postgres` implementation. If you need more, you should
probably vaccum your table ;)
- Views are not supported yet

## License
Expand Down
2 changes: 2 additions & 0 deletions crates/iceberg-catalog-bin/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use anyhow::Context;
use async_nats::ServerAddr;
use clap::{Parser, Subcommand};
use iceberg_catalog::service::contract_verification::ContractVerifiers;
use iceberg_catalog::service::event_publisher::{
CloudEventSink, CloudEventsPublisher, NatsPublisher,
};
Expand Down Expand Up @@ -81,6 +82,7 @@ async fn serve(bind_addr: std::net::SocketAddr) -> Result<(), anyhow::Error> {
CloudEventsPublisher {
sinks: cloud_event_sinks,
},
ContractVerifiers::new(vec![]),
);

service_serve(listener, router).await?;
Expand Down
122 changes: 120 additions & 2 deletions crates/iceberg-catalog/src/catalog/tables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use super::{
namespace::{uppercase_first_letter, validate_namespace_ident},
require_warehouse_id, CatalogServer,
};
use crate::service::contract_verification::{ContractVerification, ContractVerificationOutcome};
use crate::service::event_publisher::{CloudEventsPublisher, EventMetadata};
use crate::service::storage::StorageCredential;
use crate::service::{
Expand Down Expand Up @@ -417,6 +418,8 @@ impl<C: Catalog, A: AuthZHandler, S: SecretStore>
// serialize body before moving it
let body = maybe_body_to_json(&request);

let updates = clone_updates(updates);

let transaction_request = CommitTransactionRequest {
table_changes: vec![request],
};
Expand Down Expand Up @@ -445,7 +448,12 @@ impl<C: Catalog, A: AuthZHandler, S: SecretStore>
.r#type("NoResultFromCommitTableTransaction".to_string())
.build(),
)?;

state
.v1_state
.contract_verifiers
.check(&updates, &result.previous_table_metadata)
.await?
.into_result()?;
// We don't commit the transaction yet, first we need to write the metadata file.
let storage_secret = if let Some(secret_id) = &result.storage_config.storage_secret_ident {
Some(
Expand Down Expand Up @@ -540,6 +548,12 @@ impl<C: Catalog, A: AuthZHandler, S: SecretStore>
C::drop_table(&warehouse_id, &table_id, transaction.transaction()).await?;

// ToDo: Delete metadata files
state
.v1_state
.contract_verifiers
.check_drop(table_id)
.await?
.into_result()?;

transaction.commit().await?;

Expand Down Expand Up @@ -631,7 +645,8 @@ impl<C: Catalog, A: AuthZHandler, S: SecretStore>
request_metadata: RequestMetadata,
) -> Result<()> {
// ------------------- VALIDATIONS -------------------
let warehouse_id = require_warehouse_id(prefix)?;
let warehouse_id = require_warehouse_id(prefix.clone())?;
let body = maybe_body_to_json(&request);
let RenameTableRequest {
source,
destination,
Expand Down Expand Up @@ -694,8 +709,32 @@ impl<C: Catalog, A: AuthZHandler, S: SecretStore>
)
.await?;

state
.v1_state
.contract_verifiers
.check_rename(source_id, &destination)
.await?
.into_result()?;

transaction.commit().await?;

emit_change_event(
EventMetadata {
table_id: *source_id.as_uuid(),
warehouse_id: *warehouse_id.as_uuid(),
name: Cow::Borrowed(&source.name),
namespace: Cow::Owned(source.namespace.encode_in_url()),
prefix: Cow::Owned(prefix.map(Prefix::into_string).unwrap_or_default()),
num_events: 1,
sequence_number: 0,
trace_id: request_metadata.request_id,
},
body,
"renameTable",
state.v1_state.publisher.clone(),
)
.await;

Ok(())
}

Expand Down Expand Up @@ -803,11 +842,13 @@ impl<C: Catalog, A: AuthZHandler, S: SecretStore>
// serialize request body before moving it here
let mut events = vec![];
let mut event_table_ids: Vec<(TableIdent, TableIdentUuid)> = vec![];
let mut updates = vec![];
for commit_table_request in &request.table_changes {
if let Some(id) = &commit_table_request.identifier {
if let Some(uuid) = table_ids.get(id) {
events.push(maybe_body_to_json(commit_table_request));
event_table_ids.push((id.clone(), *uuid));
updates.push(clone_updates(&commit_table_request.updates));
}
}
}
Expand All @@ -819,6 +860,21 @@ impl<C: Catalog, A: AuthZHandler, S: SecretStore>
transaction.transaction(),
)
.await?;
let futures = updates
.iter()
.zip(&commit_response)
.map(|(update, response)| {
state
.v1_state
.contract_verifiers
.check(update, &response.previous_table_metadata)
});

futures::future::try_join_all(futures)
.await?
.into_iter()
.map(ContractVerificationOutcome::into_result)
.collect::<Result<Vec<()>, ErrorModel>>()?;

// We don't commit the transaction yet, first we need to write the metadata file.
// Fetch all secrets concurrently
Expand Down Expand Up @@ -976,3 +1032,65 @@ fn maybe_body_to_json(request: impl Serialize) -> serde_json::Value {
serde_json::Value::Null
}
}

// TableUpdate is not clone but all containing fields are so we use this function to clone..
fn clone_updates(updates: &[TableUpdate]) -> Vec<TableUpdate> {
updates
.iter()
.map(|u| match u {
TableUpdate::AddSnapshot { snapshot } => TableUpdate::AddSnapshot {
snapshot: snapshot.clone(),
},
TableUpdate::AssignUuid { uuid } => TableUpdate::AssignUuid { uuid: *uuid },
TableUpdate::AddSortOrder { sort_order } => TableUpdate::AddSortOrder {
sort_order: sort_order.clone(),
},
TableUpdate::AddSpec { spec } => TableUpdate::AddSpec { spec: spec.clone() },
TableUpdate::AddSchema {
schema,
last_column_id,
} => TableUpdate::AddSchema {
schema: schema.clone(),
last_column_id: *last_column_id,
},
TableUpdate::UpgradeFormatVersion { format_version } => {
TableUpdate::UpgradeFormatVersion {
format_version: *format_version,
}
}
TableUpdate::SetCurrentSchema { schema_id } => TableUpdate::SetCurrentSchema {
schema_id: *schema_id,
},
TableUpdate::SetDefaultSortOrder { sort_order_id } => {
TableUpdate::SetDefaultSortOrder {
sort_order_id: *sort_order_id,
}
}
TableUpdate::RemoveSnapshotRef { ref_name } => TableUpdate::RemoveSnapshotRef {
ref_name: ref_name.clone(),
},
TableUpdate::SetDefaultSpec { spec_id } => {
TableUpdate::SetDefaultSpec { spec_id: *spec_id }
}
TableUpdate::SetSnapshotRef {
ref_name,
reference,
} => TableUpdate::SetSnapshotRef {
ref_name: ref_name.clone(),
reference: reference.clone(),
},
TableUpdate::RemoveSnapshots { snapshot_ids } => TableUpdate::RemoveSnapshots {
snapshot_ids: snapshot_ids.clone(),
},
TableUpdate::SetLocation { location } => TableUpdate::SetLocation {
location: location.clone(),
},
TableUpdate::SetProperties { updates } => TableUpdate::SetProperties {
updates: updates.clone(),
},
TableUpdate::RemoveProperties { removals } => TableUpdate::RemoveProperties {
removals: removals.clone(),
},
})
.collect()
}
2 changes: 2 additions & 0 deletions crates/iceberg-catalog/src/implementations/postgres/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -688,6 +688,7 @@ fn apply_commits(commits: Vec<CommitContext>) -> Result<Vec<CommitTableResponseE
let metadata_location = context
.storage_profile
.metadata_location(&previous_location, &metadata_id);
let previous_table_metadata = context.metadata.clone();
let mut builder = TableMetadataAggregate::new_from_metadata(context.metadata);
for update in context.updates {
match &update {
Expand Down Expand Up @@ -728,6 +729,7 @@ fn apply_commits(commits: Vec<CommitContext>) -> Result<Vec<CommitTableResponseE
storage_secret_ident: context.storage_secret_ident,
namespace_id: context.namespace_id,
},
previous_table_metadata,
});
}

Expand Down
1 change: 1 addition & 0 deletions crates/iceberg-catalog/src/service/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ pub struct GetStorageConfigResult {
pub struct CommitTableResponseExt {
pub commit_response: CommitTableResponse,
pub storage_config: GetStorageConfigResult,
pub previous_table_metadata: TableMetadata,
}

#[async_trait::async_trait]
Expand Down
Loading