diff --git a/Cargo.lock b/Cargo.lock index 7517d2b..622a5d8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1675,7 +1675,7 @@ dependencies = [ [[package]] name = "iceberg" version = "0.3.0" -source = "git+https://github.com/splitgraph/iceberg-rust?rev=59e113b4650cfcd52331d3c179154a2cd7f88b6f#59e113b4650cfcd52331d3c179154a2cd7f88b6f" +source = "git+https://github.com/splitgraph/iceberg-rust?rev=e7008f39975ee2f09bc81a74d4ec5c9a3089580d#e7008f39975ee2f09bc81a74d4ec5c9a3089580d" dependencies = [ "anyhow", "apache-avro", diff --git a/Cargo.toml b/Cargo.toml index 44ab265..abc5437 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,7 +16,7 @@ deltalake = { version = "0.22" } env_logger = "0.11.1" fastrand = "2.2.0" futures = "0.3" -iceberg = { git = "https://github.com/splitgraph/iceberg-rust", rev = "59e113b4650cfcd52331d3c179154a2cd7f88b6f" } +iceberg = { git = "https://github.com/splitgraph/iceberg-rust", rev = "e7008f39975ee2f09bc81a74d4ec5c9a3089580d" } log = "0.4" native-tls = "0.2.11" object_store = { version = "0.11", features = ["aws"] } diff --git a/src/iceberg_destination.rs b/src/iceberg_destination.rs index 48993b6..d8a62a5 100644 --- a/src/iceberg_destination.rs +++ b/src/iceberg_destination.rs @@ -1,3 +1,4 @@ +use core::str; use std::collections::HashMap; use std::sync::Arc; use std::time::{SystemTime, UNIX_EPOCH}; @@ -98,6 +99,7 @@ pub async fn record_batches_to_iceberg( record_batch_stream: impl TryStream>, arrow_schema: SchemaRef, target_url: Url, + overwrite: bool, ) -> Result<(), DataLoadingError> { pin_mut!(record_batch_stream); @@ -108,27 +110,74 @@ pub async fn record_batches_to_iceberg( )?); let version_hint_location = format!("{}/metadata/version-hint.text", target_url); - if file_io.new_input(&version_hint_location)?.exists().await? { - return Err(DataLoadingError::IcebergError(iceberg::Error::new( - iceberg::ErrorKind::FeatureUnsupported, - "Iceberg table already exists. Writing to an existing table is not implemented", - ))); + let version_hint_input = file_io.new_input(&version_hint_location)?; + let old_version_hint: Option = if version_hint_input.exists().await? { + if !overwrite { + return Err(DataLoadingError::IoError(std::io::Error::other( + "Table exists. Pass the overwrite flag to lakehouse-loader to overwrite data", + ))); + } + let x = version_hint_input.read().await?; + let y: String = String::from_utf8(x.to_vec()).map_err(|_| { + DataLoadingError::IcebergError(iceberg::Error::new( + iceberg::ErrorKind::DataInvalid, + "Could not parse UTF-8 in version-hint.text", + )) + })?; + let z = y.trim().parse::().map_err(|_| { + DataLoadingError::IcebergError(iceberg::Error::new( + iceberg::ErrorKind::DataInvalid, + "Could not parse integer version in version-hint.text", + )) + })?; + Some(z) + } else { + None }; - let metadata_v0 = create_metadata_v0(&iceberg_schema, target_url.to_string())?; - let metadata_v0_location = format!("{}/metadata/v0.metadata.json", target_url); - - file_io - .new_output(&metadata_v0_location)? - .write(serde_json::to_vec(&metadata_v0).unwrap().into()) - .await?; - info!("Wrote v0 metadata: {:?}", metadata_v0_location); + let (old_metadata, old_metadata_location) = match old_version_hint { + Some(version_hint) => { + let old_metadata_location = + format!("{}/metadata/v{}.metadata.json", target_url, version_hint); + let old_metadata_bytes = file_io.new_input(&old_metadata_location)?.read().await?; + let old_metadata_string = str::from_utf8(&old_metadata_bytes).map_err(|_| { + DataLoadingError::IcebergError(iceberg::Error::new( + iceberg::ErrorKind::DataInvalid, + "Could not parse UTF-8 in old metadata file", + )) + })?; + let old_metadata = + serde_json::from_str::(old_metadata_string).map_err(|_| { + DataLoadingError::IcebergError(iceberg::Error::new( + iceberg::ErrorKind::DataInvalid, + "Could not parse old metadata file", + )) + })?; + if old_metadata.current_schema() != &iceberg_schema { + return Err(DataLoadingError::IcebergError(iceberg::Error::new( + iceberg::ErrorKind::FeatureUnsupported, + "Schema changes not supported", + ))); + } + (old_metadata, old_metadata_location) + } + None => { + let metadata_v0 = create_metadata_v0(&iceberg_schema, target_url.to_string())?; + let metadata_v0_location = format!("{}/metadata/v0.metadata.json", target_url); + file_io + .new_output(&metadata_v0_location)? + .write_exclusive(serde_json::to_vec(&metadata_v0).unwrap().into()) + .await?; + info!("Wrote v0 metadata: {:?}", metadata_v0_location); + (metadata_v0, metadata_v0_location) + } + }; let file_writer_builder = ParquetWriterBuilder::new( WriterProperties::builder().build(), iceberg_schema.clone(), file_io.clone(), - DefaultLocationGenerator::new(metadata_v0.clone()).unwrap(), + DefaultLocationGenerator::new(old_metadata.clone()).unwrap(), DefaultFileNameGenerator::new( "part".to_string(), Some(Uuid::new_v4().to_string()), @@ -219,18 +268,25 @@ pub async fn record_batches_to_iceberg( }) .build(); - let metadata_v1 = create_metadata_v1(&metadata_v0, metadata_v0_location, snapshot)?; - let metadata_v1_location = format!("{}/metadata/v1.metadata.json", target_url); + let new_metadata = create_metadata_v1(&old_metadata, old_metadata_location, snapshot)?; + let new_version_hint = match old_version_hint { + Some(x) => x + 1, + None => 1, + }; + let new_metadata_location = format!( + "{}/metadata/v{}.metadata.json", + target_url, new_version_hint + ); file_io - .new_output(&metadata_v1_location)? - .write(serde_json::to_vec(&metadata_v1).unwrap().into()) + .new_output(&new_metadata_location)? + .write_exclusive(serde_json::to_vec(&new_metadata).unwrap().into()) .await?; - info!("Wrote v1 metadata: {:?}", metadata_v1_location); + info!("Wrote new metadata: {:?}", new_metadata_location); file_io .new_output(&version_hint_location)? - .write("1".to_string().into()) + .write(new_version_hint.to_string().into()) .await?; info!("Wrote version hint: {:?}", version_hint_location); diff --git a/src/lib.rs b/src/lib.rs index f0f78fc..6b64ff8 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -53,6 +53,8 @@ enum Commands { ParquetToIceberg { source_file: String, target_url: Url, + #[clap(long, short, action)] + overwrite: bool, }, #[command(arg_required_else_help = true)] PgToIceberg { @@ -60,6 +62,8 @@ enum Commands { target_url: Url, #[clap(long, short, action, help("SQL text to extract the data"))] query: String, + #[clap(long, short, action)] + overwrite: bool, #[clap( long, short, @@ -111,6 +115,7 @@ pub async fn do_main(args: Cli) -> Result<(), DataLoadingError> { Commands::ParquetToIceberg { source_file, target_url, + overwrite, } => { let file = tokio::fs::File::open(source_file).await?; let record_batch_reader = ParquetRecordBatchStreamBuilder::new(file) @@ -123,6 +128,7 @@ pub async fn do_main(args: Cli) -> Result<(), DataLoadingError> { record_batch_reader.map_err(DataLoadingError::ParquetError), schema, target_url, + overwrite, ) .await } @@ -130,6 +136,7 @@ pub async fn do_main(args: Cli) -> Result<(), DataLoadingError> { connection_string, target_url, query, + overwrite, batch_size, } => { let mut source = PgArrowSource::new(connection_string.as_ref(), &query, batch_size) @@ -138,7 +145,8 @@ pub async fn do_main(args: Cli) -> Result<(), DataLoadingError> { let arrow_schema = source.get_arrow_schema(); let record_batch_stream = source.get_record_batch_stream(); info!("Rowset schema: {}", arrow_schema); - record_batches_to_iceberg(record_batch_stream, arrow_schema, target_url).await + record_batches_to_iceberg(record_batch_stream, arrow_schema, target_url, overwrite) + .await } } // TODO diff --git a/tests/basic_integration.rs b/tests/basic_integration.rs index 9b7f902..3f6da6f 100644 --- a/tests/basic_integration.rs +++ b/tests/basic_integration.rs @@ -97,8 +97,37 @@ async fn test_pg_to_iceberg() { assert_eq!(&paths[4].to_string(), "iceberg/metadata/v1.metadata.json"); assert_eq!(&paths[5].to_string(), "iceberg/metadata/version-hint.text"); - // WHEN we try to write to an existing table + // WHEN we try to write to an existing table without passing the overwrite flag // THEN the command errors out + let args = vec![ + "lakehouse-loader", + "pg-to-iceberg", + "postgres://test-user:test-password@localhost:5432/test-db", + "-q", + "select cint4, cint8 + 1 cint8, ctext, cbool from t1 order by id", + target_url, + ]; + match do_main(Cli::parse_from(args.clone())).await { + Err(DataLoadingError::IoError(e)) => { + assert!(e.kind() == std::io::ErrorKind::Other); + } + Err(e) => { + panic!("Unexpected error type: {:?}", e); + } + Ok(_) => panic!("Expected command to fail but it succeeded"), + }; + + // WHEN we try to write to an existing table with a different schema + // THEN the command errors out + let args = vec![ + "lakehouse-loader", + "pg-to-iceberg", + "postgres://test-user:test-password@localhost:5432/test-db", + "-q", + "select cint4, cint8 cint8_newname, ctext, cbool from t1 order by id", + target_url, + "--overwrite", + ]; match do_main(Cli::parse_from(args.clone())).await { Err(DataLoadingError::IcebergError(e)) => { assert!(e.kind() == iceberg::ErrorKind::FeatureUnsupported); @@ -108,6 +137,19 @@ async fn test_pg_to_iceberg() { } Ok(_) => panic!("Expected command to fail but it succeeded"), }; + + // WHEN we try to write to an existing table with the same schema + // THEN the command succeeds + let args = vec![ + "lakehouse-loader", + "pg-to-iceberg", + "postgres://test-user:test-password@localhost:5432/test-db", + "-q", + "select cint4, cint8 + 1 cint8, ctext, cbool from t1 order by id", + target_url, + "--overwrite", + ]; + assert!(do_main(Cli::parse_from(args.clone())).await.is_ok()); } #[tokio::test]