Skip to content

Commit

Permalink
Prepare for Iceberg Sync (#766)
Browse files Browse the repository at this point in the history
  • Loading branch information
SergeiPatiakin authored Dec 19, 2024
1 parent 1b5ffbc commit cb8605b
Show file tree
Hide file tree
Showing 4 changed files with 167 additions and 121 deletions.
57 changes: 31 additions & 26 deletions src/frontend/flight/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use url::Url;
use crate::context::SeafowlContext;
use crate::sync::schema::SyncSchema;
use crate::sync::writer::SeafowlDataSyncWriter;
use crate::sync::{SyncError, SyncResult};
use crate::sync::{LakehouseSyncTarget, SyncError, SyncResult};

lazy_static! {
pub static ref SEAFOWL_SQL_DATA: SqlInfoData = {
Expand Down Expand Up @@ -163,32 +163,37 @@ impl SeafowlFlightHandler {
});
}

if cmd.format != TableFormat::Delta as i32 {
return Err(SyncError::NotImplemented);
}

let log_store = match cmd.store {
None => self
.context
.get_internal_object_store()?
.get_log_store(&cmd.path),
Some(store_loc) => {
self.context
.metastore
.object_stores
.get_log_store_for_table(
Url::parse(&store_loc.location).map_err(|e| {
DataFusionError::Execution(format!(
"Couldn't parse sync location: {e}"
))
})?,
store_loc.options,
cmd.path,
)
.await?
let (sync_target, url) = match cmd.format() {
TableFormat::Delta => {
let log_store = match cmd.store {
None => self
.context
.get_internal_object_store()?
.get_log_store(&cmd.path),
Some(store_loc) => {
self.context
.metastore
.object_stores
.get_log_store_for_table(
Url::parse(&store_loc.location).map_err(|e| {
DataFusionError::Execution(format!(
"Couldn't parse sync location: {e}"
))
})?,
store_loc.options,
cmd.path,
)
.await?
}
};
let url = log_store.root_uri();
(LakehouseSyncTarget::Delta(log_store), url)
}
TableFormat::Iceberg => {
return Err(SyncError::NotImplemented);
}
};
let url = log_store.root_uri();

let num_batches = batches.len();

debug!("Processing data change with {num_rows} rows, {num_batches} batches, descriptor {sync_schema}, url {url} from origin {:?} at position {:?}",
Expand All @@ -202,7 +207,7 @@ impl SeafowlFlightHandler {
{
Ok(mut sync_writer) => {
sync_writer.enqueue_sync(
log_store,
sync_target,
cmd.sequence_number,
cmd.origin.clone(),
sync_schema,
Expand Down
12 changes: 12 additions & 0 deletions src/sync/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::sync::writer::SeafowlDataSyncWriter;
use deltalake::logstore::LogStore;
use serde::{Deserialize, Serialize};
use std::sync::Arc;
use std::time::Duration;
Expand Down Expand Up @@ -56,6 +57,17 @@ pub(super) struct SyncCommitInfo {
new_tx: bool,
}

#[derive(Clone, Debug)]
pub struct IcebergSyncTarget {
url: String,
}

#[derive(Clone, Debug)]
pub enum LakehouseSyncTarget {
Delta(Arc<dyn LogStore>),
Iceberg(IcebergSyncTarget),
}

impl SyncCommitInfo {
pub(super) fn new(
origin: impl Into<Origin>,
Expand Down
4 changes: 2 additions & 2 deletions src/sync/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ impl SeafowlSyncPlanner {

// Construct a plan for flushing the pending syncs to the provided table.
// Return the plan and the files that are re-written by it (to be removed from the table state).
pub(super) async fn plan_syncs(
pub(super) async fn plan_delta_syncs(
&self,
syncs: &[DataSyncItem],
table: &DeltaTable,
Expand Down Expand Up @@ -963,7 +963,7 @@ mod tests {
.await?;
let table = ctx.try_get_delta_table("test_table").await?;

let (plan, _) = planner.plan_syncs(&[sync_item], &table).await?;
let (plan, _) = planner.plan_delta_syncs(&[sync_item], &table).await?;

let mut actual_plan = get_plan_string(&plan);
actual_plan.iter_mut().for_each(|node| {
Expand Down
Loading

0 comments on commit cb8605b

Please sign in to comment.