diff --git a/src/iceberg_destination.rs b/src/iceberg_destination.rs index 75a74df..9d59b48 100644 --- a/src/iceberg_destination.rs +++ b/src/iceberg_destination.rs @@ -203,7 +203,7 @@ pub async fn record_batches_to_iceberg( .collect(); let snapshot_id = fastrand::i64(..); - let sequence_number = 1; + let sequence_number = previous_metadata.last_sequence_number() + 1; let manifest_file_path = format!("{}/metadata/manifest-{}.avro", target_url, Uuid::new_v4()); let manifest_file_output = file_io.new_output(manifest_file_path)?; diff --git a/tests/basic_integration.rs b/tests/basic_integration.rs index f21cfb8..8f8ac34 100644 --- a/tests/basic_integration.rs +++ b/tests/basic_integration.rs @@ -1,3 +1,5 @@ +use std::vec; + use arrow::array::{ Array, BinaryArray, BooleanArray, Date32Array, Decimal128Array, Float32Array, Float64Array, Int16Array, Int32Array, Int64Array, Int8Array, StringArray, TimestampMicrosecondArray, @@ -5,6 +7,7 @@ use arrow::array::{ use arrow::datatypes::DataType; use clap::Parser; use futures::{StreamExt, TryStreamExt}; +use iceberg::spec::TableMetadata; use lakehouse_loader::delta_destination::object_store_keys_from_env; use lakehouse_loader::error::DataLoadingError; use lakehouse_loader::pg_arrow_source::PgArrowSource; @@ -57,6 +60,10 @@ async fn test_pg_to_delta_e2e() { assert!(paths[2].to_string().ends_with("-c000.snappy.parquet")); } +const DATA_FILEPATH_PATTERN: &str = r"^iceberg/data/part-00000-[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}.parquet$"; +const MANIFEST_FILEPATH_PATTERN: &str = r"^iceberg/metadata/manifest-[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}.avro$"; +const MANIFEST_LIST_FILEPATH_PATTERN: &str = r"^iceberg/metadata/manifest-list-[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}.avro$"; + #[tokio::test] async fn test_pg_to_iceberg() { let target_url = "s3://lhl-test-bucket/iceberg"; @@ -78,6 +85,7 @@ async fn test_pg_to_iceberg() { let (store, path) = object_store::parse_url_opts(&Url::parse(target_url).unwrap(), config).unwrap(); + // THEN iceberg data and metadata files are written let mut paths = store .list(Some(&path)) .map_ok(|m| m.location) @@ -85,16 +93,32 @@ async fn test_pg_to_iceberg() { .try_collect::>() .await .unwrap(); - paths.sort(); - - // THEN iceberg data and metadata files are written assert_eq!(paths.len(), 5); - assert!(Regex::new(r"^iceberg/data/part-00000-[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}.parquet$").unwrap().is_match(paths[0].as_ref())); - assert!(Regex::new(r"^iceberg/metadata/manifest-[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}.avro$").unwrap().is_match(paths[1].as_ref())); - assert!(Regex::new(r"^iceberg/metadata/manifest-list-[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}.avro$").unwrap().is_match(paths[2].as_ref())); + assert!(Regex::new(DATA_FILEPATH_PATTERN) + .unwrap() + .is_match(paths[0].as_ref())); + assert!(Regex::new(MANIFEST_FILEPATH_PATTERN) + .unwrap() + .is_match(paths[1].as_ref())); + assert!(Regex::new(MANIFEST_LIST_FILEPATH_PATTERN) + .unwrap() + .is_match(paths[2].as_ref())); assert_eq!(&paths[3].to_string(), "iceberg/metadata/v0.metadata.json"); assert_eq!(&paths[4].to_string(), "iceberg/metadata/version-hint.text"); + // THEN iceberg metadata can be parsed + let metadata_bytes = store.get(&paths[3]).await.unwrap().bytes().await.unwrap(); + let metadata_str = core::str::from_utf8(&metadata_bytes).unwrap(); + let metadata = serde_json::from_str::(metadata_str).unwrap(); + // THEN metadata contains a single snapshot with sequence number 1 + assert_eq!(metadata.last_sequence_number(), 1); + assert_eq!( + metadata + .snapshots() + .map(|s| s.sequence_number()) + .collect::>(), + vec![1] + ); // WHEN we try to write to an existing table without passing the overwrite flag // THEN the command errors out @@ -149,6 +173,50 @@ async fn test_pg_to_iceberg() { "--overwrite", ]; assert!(do_main(Cli::parse_from(args.clone())).await.is_ok()); + + // THEN iceberg data and metadata files are written + let mut paths = store + .list(Some(&path)) + .map_ok(|m| m.location) + .boxed() + .try_collect::>() + .await + .unwrap(); + paths.sort(); + assert_eq!(paths.len(), 9); + assert!(Regex::new(DATA_FILEPATH_PATTERN) + .unwrap() + .is_match(paths[0].as_ref())); + assert!(Regex::new(DATA_FILEPATH_PATTERN) + .unwrap() + .is_match(paths[1].as_ref())); + assert!(Regex::new(MANIFEST_FILEPATH_PATTERN) + .unwrap() + .is_match(paths[2].as_ref())); + assert!(Regex::new(MANIFEST_FILEPATH_PATTERN) + .unwrap() + .is_match(paths[3].as_ref())); + assert!(Regex::new(MANIFEST_LIST_FILEPATH_PATTERN) + .unwrap() + .is_match(paths[4].as_ref())); + assert!(Regex::new(MANIFEST_LIST_FILEPATH_PATTERN) + .unwrap() + .is_match(paths[5].as_ref())); + assert_eq!(&paths[6].to_string(), "iceberg/metadata/v0.metadata.json"); + assert_eq!(&paths[7].to_string(), "iceberg/metadata/v1.metadata.json"); + assert_eq!(&paths[8].to_string(), "iceberg/metadata/version-hint.text"); + // THEN iceberg metadata can be parsed + let metadata_bytes = store.get(&paths[7]).await.unwrap().bytes().await.unwrap(); + let metadata_str = core::str::from_utf8(&metadata_bytes).unwrap(); + let metadata = serde_json::from_str::(metadata_str).unwrap(); + // THEN metadata contains two snapshots with sequence numbers 1 and 2 + assert_eq!(metadata.last_sequence_number(), 2); + let mut snapshot_ids = metadata + .snapshots() + .map(|s| s.sequence_number()) + .collect::>(); + snapshot_ids.sort(); + assert_eq!(snapshot_ids, vec![1, 2]); } #[tokio::test]