Skip to content

Commit

Permalink
Allow writing to existing table
Browse files Browse the repository at this point in the history
  • Loading branch information
SergeiPatiakin committed Dec 4, 2024
1 parent b7e8995 commit cd11c71
Show file tree
Hide file tree
Showing 5 changed files with 132 additions and 24 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ deltalake = { version = "0.22" }
env_logger = "0.11.1"
fastrand = "2.2.0"
futures = "0.3"
iceberg = { git = "https://github.com/splitgraph/iceberg-rust", rev = "59e113b4650cfcd52331d3c179154a2cd7f88b6f" }
iceberg = { git = "https://github.com/splitgraph/iceberg-rust", rev = "9eaff9bd76a93243eea0360f1833b133f16104d6" }
log = "0.4"
native-tls = "0.2.11"
object_store = { version = "0.11", features = ["aws"] }
Expand Down
96 changes: 76 additions & 20 deletions src/iceberg_destination.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use core::str;
use std::collections::HashMap;
use std::sync::Arc;
use std::time::{SystemTime, UNIX_EPOCH};
Expand Down Expand Up @@ -98,6 +99,7 @@ pub async fn record_batches_to_iceberg(
record_batch_stream: impl TryStream<Item = Result<RecordBatch, DataLoadingError>>,
arrow_schema: SchemaRef,
target_url: Url,
overwrite: bool,
) -> Result<(), DataLoadingError> {
pin_mut!(record_batch_stream);

Expand All @@ -108,27 +110,74 @@ pub async fn record_batches_to_iceberg(
)?);

let version_hint_location = format!("{}/metadata/version-hint.text", target_url);
if file_io.new_input(&version_hint_location)?.exists().await? {
return Err(DataLoadingError::IcebergError(iceberg::Error::new(
iceberg::ErrorKind::FeatureUnsupported,
"Iceberg table already exists. Writing to an existing table is not implemented",
)));
let version_hint_input = file_io.new_input(&version_hint_location)?;
let old_version_hint: Option<u64> = if version_hint_input.exists().await? {
if !overwrite {
return Err(DataLoadingError::IoError(std::io::Error::other(
"Table exists. Pass the overwrite flag to lakehouse-loader to overwrite data",
)));
}
let x = version_hint_input.read().await?;
let y: String = String::from_utf8(x.to_vec()).map_err(|_| {
DataLoadingError::IcebergError(iceberg::Error::new(
iceberg::ErrorKind::DataInvalid,
"Could not parse UTF-8 in version-hint.text",
))
})?;
let z = y.trim().parse::<u64>().map_err(|_| {
DataLoadingError::IcebergError(iceberg::Error::new(
iceberg::ErrorKind::DataInvalid,
"Could not parse integer version in version-hint.text",
))
})?;
Some(z)
} else {
None
};

let metadata_v0 = create_metadata_v0(&iceberg_schema, target_url.to_string())?;
let metadata_v0_location = format!("{}/metadata/v0.metadata.json", target_url);

file_io
.new_output(&metadata_v0_location)?
.write(serde_json::to_vec(&metadata_v0).unwrap().into())
.await?;
info!("Wrote v0 metadata: {:?}", metadata_v0_location);
let (old_metadata, old_metadata_location) = match old_version_hint {
Some(version_hint) => {
let old_metadata_location =
format!("{}/metadata/v{}.metadata.json", target_url, version_hint);
let old_metadata_bytes = file_io.new_input(&old_metadata_location)?.read().await?;
let old_metadata_string = str::from_utf8(&old_metadata_bytes).map_err(|_| {
DataLoadingError::IcebergError(iceberg::Error::new(
iceberg::ErrorKind::DataInvalid,
"Could not parse UTF-8 in old metadata file",
))
})?;
let old_metadata =
serde_json::from_str::<TableMetadata>(old_metadata_string).map_err(|_| {
DataLoadingError::IcebergError(iceberg::Error::new(
iceberg::ErrorKind::DataInvalid,
"Could not parse old metadata file",
))
})?;
if old_metadata.current_schema() != &iceberg_schema {
return Err(DataLoadingError::IcebergError(iceberg::Error::new(
iceberg::ErrorKind::FeatureUnsupported,
"Schema changes not supported",
)));
}
(old_metadata, old_metadata_location)
}
None => {
let metadata_v0 = create_metadata_v0(&iceberg_schema, target_url.to_string())?;
let metadata_v0_location = format!("{}/metadata/v0.metadata.json", target_url);
file_io
.new_output(&metadata_v0_location)?
.write_exclusive(serde_json::to_vec(&metadata_v0).unwrap().into())
.await?;
info!("Wrote v0 metadata: {:?}", metadata_v0_location);
(metadata_v0, metadata_v0_location)
}
};

let file_writer_builder = ParquetWriterBuilder::new(
WriterProperties::builder().build(),
iceberg_schema.clone(),
file_io.clone(),
DefaultLocationGenerator::new(metadata_v0.clone()).unwrap(),
DefaultLocationGenerator::new(old_metadata.clone()).unwrap(),
DefaultFileNameGenerator::new(
"part".to_string(),
Some(Uuid::new_v4().to_string()),
Expand Down Expand Up @@ -219,18 +268,25 @@ pub async fn record_batches_to_iceberg(
})
.build();

let metadata_v1 = create_metadata_v1(&metadata_v0, metadata_v0_location, snapshot)?;
let metadata_v1_location = format!("{}/metadata/v1.metadata.json", target_url);
let new_metadata = create_metadata_v1(&old_metadata, old_metadata_location, snapshot)?;
let new_version_hint = match old_version_hint {
Some(x) => x + 1,
None => 1,
};
let new_metadata_location = format!(
"{}/metadata/v{}.metadata.json",
target_url, new_version_hint
);

file_io
.new_output(&metadata_v1_location)?
.write(serde_json::to_vec(&metadata_v1).unwrap().into())
.new_output(&new_metadata_location)?
.write_exclusive(serde_json::to_vec(&new_metadata).unwrap().into())
.await?;
info!("Wrote v1 metadata: {:?}", metadata_v1_location);
info!("Wrote new metadata: {:?}", new_metadata_location);

file_io
.new_output(&version_hint_location)?
.write("1".to_string().into())
.write(new_version_hint.to_string().into())
.await?;
info!("Wrote version hint: {:?}", version_hint_location);

Expand Down
10 changes: 9 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,13 +53,17 @@ enum Commands {
ParquetToIceberg {
source_file: String,
target_url: Url,
#[clap(long, short, action)]
overwrite: bool,
},
#[command(arg_required_else_help = true)]
PgToIceberg {
connection_string: Url,
target_url: Url,
#[clap(long, short, action, help("SQL text to extract the data"))]
query: String,
#[clap(long, short, action)]
overwrite: bool,
#[clap(
long,
short,
Expand Down Expand Up @@ -111,6 +115,7 @@ pub async fn do_main(args: Cli) -> Result<(), DataLoadingError> {
Commands::ParquetToIceberg {
source_file,
target_url,
overwrite,
} => {
let file = tokio::fs::File::open(source_file).await?;
let record_batch_reader = ParquetRecordBatchStreamBuilder::new(file)
Expand All @@ -123,13 +128,15 @@ pub async fn do_main(args: Cli) -> Result<(), DataLoadingError> {
record_batch_reader.map_err(DataLoadingError::ParquetError),
schema,
target_url,
overwrite,
)
.await
}
Commands::PgToIceberg {
connection_string,
target_url,
query,
overwrite,
batch_size,
} => {
let mut source = PgArrowSource::new(connection_string.as_ref(), &query, batch_size)
Expand All @@ -138,7 +145,8 @@ pub async fn do_main(args: Cli) -> Result<(), DataLoadingError> {
let arrow_schema = source.get_arrow_schema();
let record_batch_stream = source.get_record_batch_stream();
info!("Rowset schema: {}", arrow_schema);
record_batches_to_iceberg(record_batch_stream, arrow_schema, target_url).await
record_batches_to_iceberg(record_batch_stream, arrow_schema, target_url, overwrite)
.await
}
}
// TODO
Expand Down
46 changes: 45 additions & 1 deletion tests/basic_integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,38 @@ async fn test_pg_to_iceberg() {
assert_eq!(&paths[4].to_string(), "iceberg/metadata/v1.metadata.json");
assert_eq!(&paths[5].to_string(), "iceberg/metadata/version-hint.text");

// WHEN we try to write to an existing table
// WHEN we try to write to an existing table without passing the overwrite flag
// THEN the command errors out
let args = vec![
"lakehouse-loader",
"pg-to-iceberg",
"postgres://test-user:test-password@localhost:5432/test-db",
"-q",
// We have to cherry-pick fields as not all types are supported by iceberg
"select cint4, cint8 + 1 cint8, ctext, cbool from t1 order by id",
target_url,
];
match do_main(Cli::parse_from(args.clone())).await {
Err(DataLoadingError::IoError(e)) => {
assert!(e.kind() == std::io::ErrorKind::Other);
}
Err(e) => {
panic!("Unexpected error type: {:?}", e);
}
Ok(_) => panic!("Expected command to fail but it succeeded"),
};

// WHEN we try to write to an existing table with a different schema
// THEN the command errors out
let args = vec![
"lakehouse-loader",
"pg-to-iceberg",
"postgres://test-user:test-password@localhost:5432/test-db",
"-q",
"select cint4, cint8 cint8_newname, ctext, cbool from t1 order by id",
target_url,
"--overwrite",
];
match do_main(Cli::parse_from(args.clone())).await {
Err(DataLoadingError::IcebergError(e)) => {
assert!(e.kind() == iceberg::ErrorKind::FeatureUnsupported);
Expand All @@ -108,6 +138,20 @@ async fn test_pg_to_iceberg() {
}
Ok(_) => panic!("Expected command to fail but it succeeded"),
};

// WHEN we try to write to an existing table with the same schema
// THEN the command succeeds
let args = vec![
"lakehouse-loader",
"pg-to-iceberg",
"postgres://test-user:test-password@localhost:5432/test-db",
"-q",
// We have to cherry-pick fields as not all types are supported by iceberg
"select cint4, cint8 + 1 cint8, ctext, cbool from t1 order by id",
target_url,
"--overwrite",
];
assert!(do_main(Cli::parse_from(args.clone())).await.is_ok());
}

#[tokio::test]
Expand Down

0 comments on commit cd11c71

Please sign in to comment.