Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
twuebi committed Jun 11, 2024
1 parent 01414ee commit b586b67
Show file tree
Hide file tree
Showing 9 changed files with 486 additions and 16 deletions.
92 changes: 91 additions & 1 deletion crates/iceberg-catalog/src/catalog/tables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,8 @@ impl<C: Catalog, A: AuthZHandler, S: SecretStore>
// serialize body before moving it
let body = maybe_body_to_json(&request);

let updates = copy_updates(&updates);

let transaction_request = CommitTransactionRequest {
table_changes: vec![request],
};
Expand Down Expand Up @@ -398,7 +400,12 @@ impl<C: Catalog, A: AuthZHandler, S: SecretStore>
.r#type("NoResultFromCommitTableTransaction".to_string())
.build(),
)?;

state
.v1_state
.table_check
.check(&updates, table_id, &result.previous_table_metadata)
.await?
.into_result()?;
// We don't commit the transaction yet, first we need to write the metadata file.
let storage_secret = if let Some(secret_id) = &result.storage_config.storage_secret_ident {
Some(
Expand Down Expand Up @@ -493,6 +500,12 @@ impl<C: Catalog, A: AuthZHandler, S: SecretStore>
C::drop_table(&warehouse_id, &table_id, transaction.transaction()).await?;

// ToDo: Delete metadata files
state
.v1_state
.table_check
.check_drop(table_id)
.await?
.into_result()?;

transaction.commit().await?;

Expand Down Expand Up @@ -756,11 +769,13 @@ impl<C: Catalog, A: AuthZHandler, S: SecretStore>
// serialize request body before moving it here
let mut events = vec![];
let mut event_table_ids: Vec<(TableIdent, TableIdentUuid)> = vec![];
let mut updates = vec![];
for req in &request.table_changes {
if let Some(id) = &req.identifier {
if let Some(uuid) = table_ids.get(id) {
events.push(maybe_body_to_json(&request));
event_table_ids.push((id.clone(), *uuid));
updates.push(copy_updates(&req.updates));
}
}
}
Expand Down Expand Up @@ -819,6 +834,19 @@ impl<C: Catalog, A: AuthZHandler, S: SecretStore>
));
}

for ((update, (_, table_uuid)), response) in updates
.into_iter()
.zip(&event_table_ids)
.zip(&commit_response)
{
state
.v1_state
.table_check
.check(&update, *table_uuid, &response.previous_table_metadata)
.await?
.into_result()?;
}

futures::future::try_join_all(write_futures).await?;

transaction.commit().await?;
Expand Down Expand Up @@ -929,3 +957,65 @@ fn maybe_body_to_json(request: impl Serialize) -> serde_json::Value {
serde_json::Value::Null
}
}

// TableUpdate is not clone but all containing fields are so we use this function to clone..
fn copy_updates(updates: &[TableUpdate]) -> Vec<TableUpdate> {
updates
.iter()
.map(|u| match u {
TableUpdate::AddSnapshot { snapshot } => TableUpdate::AddSnapshot {
snapshot: snapshot.clone(),
},
TableUpdate::AssignUuid { uuid } => TableUpdate::AssignUuid { uuid: uuid.clone() },
TableUpdate::AddSortOrder { sort_order } => TableUpdate::AddSortOrder {
sort_order: sort_order.clone(),
},
TableUpdate::AddSpec { spec } => TableUpdate::AddSpec { spec: spec.clone() },
TableUpdate::AddSchema {
schema,
last_column_id,
} => TableUpdate::AddSchema {
schema: schema.clone(),
last_column_id: last_column_id.clone(),
},
TableUpdate::UpgradeFormatVersion { format_version } => {
TableUpdate::UpgradeFormatVersion {
format_version: format_version.clone(),
}
}
TableUpdate::SetCurrentSchema { schema_id } => TableUpdate::SetCurrentSchema {
schema_id: schema_id.clone(),
},
TableUpdate::SetDefaultSortOrder { sort_order_id } => {
TableUpdate::SetDefaultSortOrder {
sort_order_id: *sort_order_id,
}
}
TableUpdate::RemoveSnapshotRef { ref_name } => TableUpdate::RemoveSnapshotRef {
ref_name: ref_name.clone(),
},
TableUpdate::SetDefaultSpec { spec_id } => {
TableUpdate::SetDefaultSpec { spec_id: *spec_id }
}
TableUpdate::SetSnapshotRef {
ref_name,
reference,
} => TableUpdate::SetSnapshotRef {
ref_name: ref_name.clone(),
reference: reference.clone(),
},
TableUpdate::RemoveSnapshots { snapshot_ids } => TableUpdate::RemoveSnapshots {
snapshot_ids: snapshot_ids.clone(),
},
TableUpdate::SetLocation { location } => TableUpdate::SetLocation {
location: location.clone(),
},
TableUpdate::SetProperties { updates } => TableUpdate::SetProperties {
updates: updates.clone(),
},
TableUpdate::RemoveProperties { removals } => TableUpdate::RemoveProperties {
removals: removals.clone(),
},
})
.collect()
}
2 changes: 2 additions & 0 deletions crates/iceberg-catalog/src/implementations/postgres/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -688,6 +688,7 @@ fn apply_commits(commits: Vec<CommitContext>) -> Result<Vec<CommitTableResponseE
let metadata_location = context
.storage_profile
.metadata_location(&previous_location, &metadata_id);
let previous_table_metadata = context.metadata.clone();
let mut builder = TableMetadataAggregate::new_from_metadata(context.metadata);
for update in context.updates {
match &update {
Expand Down Expand Up @@ -728,6 +729,7 @@ fn apply_commits(commits: Vec<CommitContext>) -> Result<Vec<CommitTableResponseE
storage_secret_ident: context.storage_secret_ident,
namespace_id: context.namespace_id,
},
previous_table_metadata,
});
}

Expand Down
1 change: 1 addition & 0 deletions crates/iceberg-catalog/src/service/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ pub struct GetStorageConfigResult {
pub struct CommitTableResponseExt {
pub commit_response: CommitTableResponse,
pub storage_config: GetStorageConfigResult,
pub previous_table_metadata: TableMetadata,
}

#[async_trait::async_trait]
Expand Down
3 changes: 3 additions & 0 deletions crates/iceberg-catalog/src/service/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ pub mod event_publisher;
pub mod router;
pub mod secrets;
pub mod storage;
pub mod table_change_check;

pub use catalog::{
Catalog, CommitTableResponseExt, CreateTableResult, GetStorageConfigResult,
Expand All @@ -20,6 +21,7 @@ use iceberg::NamespaceIdent;
use std::str::FromStr;

use crate::service::event_publisher::CloudEventsPublisher;
use crate::service::table_change_check::TableChangeCheckers;
pub use secrets::{SecretIdent, SecretStore};

use self::auth::AuthZHandler;
Expand Down Expand Up @@ -60,6 +62,7 @@ pub struct State<A: AuthZHandler, C: Catalog, S: SecretStore> {
pub catalog: C::State,
pub secrets: S::State,
pub publisher: CloudEventsPublisher,
pub table_check: TableChangeCheckers,
}

impl<A: AuthZHandler, C: Catalog, S: SecretStore> ServiceState for State<A, C, S> {}
Expand Down
5 changes: 5 additions & 0 deletions crates/iceberg-catalog/src/service/router.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
use crate::service::event_publisher::CloudEventsPublisher;
use crate::tracing::{MakeRequestUuid7, RestMakeSpan};
use std::sync::Arc;

use crate::api::management::ApiServer;
use crate::api::{iceberg::v1::new_v1_full_router, shutdown_signal, ApiContext};
use crate::service::table_change_check::TableChangeCheckers;
use axum::{routing::get, Router};
use tower::ServiceBuilder;
use tower_http::{
Expand Down Expand Up @@ -66,6 +68,9 @@ pub fn new_full_router<
catalog: catalog_state,
secrets: secrets_state,
publisher,
table_check: TableChangeCheckers::new(vec![Arc::new(
crate::service::table_change_check::DenyAllChecker,
)]),
},
})
}
Expand Down
Loading

0 comments on commit b586b67

Please sign in to comment.