Skip to content

Commit

Permalink
Fix sequence number generation
Browse files Browse the repository at this point in the history
  • Loading branch information
SergeiPatiakin committed Dec 5, 2024
1 parent 748356a commit 11dc554
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 7 deletions.
2 changes: 1 addition & 1 deletion src/iceberg_destination.rs
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ pub async fn record_batches_to_iceberg(
.collect();

let snapshot_id = fastrand::i64(..);
let sequence_number = 1;
let sequence_number = previous_metadata.last_sequence_number() + 1;

let manifest_file_path = format!("{}/metadata/manifest-{}.avro", target_url, Uuid::new_v4());
let manifest_file_output = file_io.new_output(manifest_file_path)?;
Expand Down
81 changes: 75 additions & 6 deletions tests/basic_integration.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
use std::vec;

use arrow::array::{
Array, BinaryArray, BooleanArray, Date32Array, Decimal128Array, Float32Array, Float64Array,
Int16Array, Int32Array, Int64Array, Int8Array, StringArray, TimestampMicrosecondArray,
};
use arrow::datatypes::DataType;
use clap::Parser;
use futures::{StreamExt, TryStreamExt};
use iceberg::spec::TableMetadata;
use lakehouse_loader::delta_destination::object_store_keys_from_env;
use lakehouse_loader::error::DataLoadingError;
use lakehouse_loader::pg_arrow_source::PgArrowSource;
Expand Down Expand Up @@ -57,6 +60,10 @@ async fn test_pg_to_delta_e2e() {
assert!(paths[2].to_string().ends_with("-c000.snappy.parquet"));
}

const DATA_FILEPATH_PATTERN: &str = r"^iceberg/data/part-00000-[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}.parquet$";
const MANIFEST_FILEPATH_PATTERN: &str = r"^iceberg/metadata/manifest-[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}.avro$";
const MANIFEST_LIST_FILEPATH_PATTERN: &str = r"^iceberg/metadata/manifest-list-[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}.avro$";

#[tokio::test]
async fn test_pg_to_iceberg() {
let target_url = "s3://lhl-test-bucket/iceberg";
Expand All @@ -78,23 +85,40 @@ async fn test_pg_to_iceberg() {
let (store, path) =
object_store::parse_url_opts(&Url::parse(target_url).unwrap(), config).unwrap();

// THEN iceberg data and metadata files are written
let mut paths = store
.list(Some(&path))
.map_ok(|m| m.location)
.boxed()
.try_collect::<Vec<Path>>()
.await
.unwrap();

paths.sort();

// THEN iceberg data and metadata files are written
assert_eq!(paths.len(), 5);
assert!(Regex::new(r"^iceberg/data/part-00000-[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}.parquet$").unwrap().is_match(paths[0].as_ref()));
assert!(Regex::new(r"^iceberg/metadata/manifest-[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}.avro$").unwrap().is_match(paths[1].as_ref()));
assert!(Regex::new(r"^iceberg/metadata/manifest-list-[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}.avro$").unwrap().is_match(paths[2].as_ref()));
assert!(Regex::new(DATA_FILEPATH_PATTERN)
.unwrap()
.is_match(paths[0].as_ref()));
assert!(Regex::new(MANIFEST_FILEPATH_PATTERN)
.unwrap()
.is_match(paths[1].as_ref()));
assert!(Regex::new(MANIFEST_LIST_FILEPATH_PATTERN)
.unwrap()
.is_match(paths[2].as_ref()));
assert_eq!(&paths[3].to_string(), "iceberg/metadata/v0.metadata.json");
assert_eq!(&paths[4].to_string(), "iceberg/metadata/version-hint.text");
// THEN iceberg metadata can be parsed
let metadata_bytes = store.get(&paths[3]).await.unwrap().bytes().await.unwrap();
let metadata_str = core::str::from_utf8(&metadata_bytes).unwrap();
let metadata = serde_json::from_str::<TableMetadata>(metadata_str).unwrap();
// THEN metadata contains a single snapshot with sequence number 1
assert_eq!(metadata.last_sequence_number(), 1);
assert_eq!(
metadata
.snapshots()
.map(|s| s.sequence_number())
.collect::<Vec<_>>(),
vec![1]
);

// WHEN we try to write to an existing table without passing the overwrite flag
// THEN the command errors out
Expand Down Expand Up @@ -149,6 +173,51 @@ async fn test_pg_to_iceberg() {
"--overwrite",
];
assert!(do_main(Cli::parse_from(args.clone())).await.is_ok());

// THEN iceberg data and metadata files are written
let mut paths = store
.list(Some(&path))
.map_ok(|m| m.location)
.boxed()
.try_collect::<Vec<Path>>()
.await
.unwrap();
paths.sort();
assert_eq!(paths.len(), 9);
assert!(Regex::new(DATA_FILEPATH_PATTERN)
.unwrap()
.is_match(paths[0].as_ref()));
assert!(Regex::new(DATA_FILEPATH_PATTERN)
.unwrap()
.is_match(paths[1].as_ref()));
assert!(Regex::new(MANIFEST_FILEPATH_PATTERN)
.unwrap()
.is_match(paths[2].as_ref()));
assert!(Regex::new(MANIFEST_FILEPATH_PATTERN)
.unwrap()
.is_match(paths[3].as_ref()));
assert!(Regex::new(MANIFEST_LIST_FILEPATH_PATTERN)
.unwrap()
.is_match(paths[4].as_ref()));
assert!(Regex::new(MANIFEST_LIST_FILEPATH_PATTERN)
.unwrap()
.is_match(paths[5].as_ref()));
assert_eq!(&paths[6].to_string(), "iceberg/metadata/v0.metadata.json");
assert_eq!(&paths[7].to_string(), "iceberg/metadata/v1.metadata.json");
assert_eq!(&paths[8].to_string(), "iceberg/metadata/version-hint.text");
// THEN iceberg metadata can be parsed
let metadata_bytes = store.get(&paths[7]).await.unwrap().bytes().await.unwrap();
let metadata_str = core::str::from_utf8(&metadata_bytes).unwrap();
let metadata = serde_json::from_str::<TableMetadata>(metadata_str).unwrap();
// THEN metadata contains two snapshots with sequence numbers 1 and 2
assert_eq!(metadata.last_sequence_number(), 2);
assert_eq!(
metadata
.snapshots()
.map(|s| s.sequence_number())
.collect::<Vec<_>>(),
vec![2, 1]
);
}

#[tokio::test]
Expand Down

0 comments on commit 11dc554

Please sign in to comment.