Skip to content

Commit

Permalink
Allow immaterial schema differences
Browse files Browse the repository at this point in the history
  • Loading branch information
SergeiPatiakin committed Dec 12, 2024
1 parent 73cecee commit d90700c
Show file tree
Hide file tree
Showing 4 changed files with 229 additions and 21 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ env_logger = "0.11.1"
fastrand = "2.2.0"
futures = "0.3"
iceberg = { git = "https://github.com/splitgraph/iceberg-rust", rev = "e7008f39975ee2f09bc81a74d4ec5c9a3089580d" }
itertools = "0.13.0"
log = "0.4"
native-tls = "0.2.11"
object_store = { version = "0.11", features = ["aws"] }
Expand Down
235 changes: 221 additions & 14 deletions src/iceberg_destination.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use core::str;
use itertools::izip;
use std::collections::HashMap;
use std::error::Error;
use std::sync::Arc;
Expand Down Expand Up @@ -63,7 +64,7 @@ fn create_empty_metadata(
}

// Clone an arrow schema, assigning sequential field IDs starting from 1
fn assign_field_ids(arrow_schema: Arc<Schema>) -> Schema {
fn assign_field_ids(arrow_schema: &Arc<Schema>) -> Schema {
let mut field_id_counter = 1;
let new_fields: Vec<Field> = arrow_schema
.fields
Expand All @@ -83,6 +84,36 @@ fn assign_field_ids(arrow_schema: Arc<Schema>) -> Schema {
Schema::new_with_metadata(new_fields, arrow_schema.metadata.clone())
}

fn is_schema_aligned(
new_arrow_schema: &Arc<Schema>,
existing_iceberg_schema: &Arc<iceberg::spec::Schema>,
) -> Result<(), DataLoadingError> {
let old_iceberg_struct = existing_iceberg_schema.as_struct();
let old_iceberg_fields = old_iceberg_struct.fields();

let new_arrow_schema_with_field_ids = assign_field_ids(new_arrow_schema);
let new_iceberg_schema = Arc::new(iceberg::arrow::arrow_schema_to_schema(
&new_arrow_schema_with_field_ids,
)?);
let new_iceberg_struct = new_iceberg_schema.as_struct();
let new_iceberg_fields = new_iceberg_struct.fields();

if old_iceberg_fields.len() != new_iceberg_fields.len() {
return Err(DataLoadingError::BadInputError(format!("New data is incompatible with existing schema. Old schema has {} fields but new schema has {} fields", old_iceberg_fields.len(), new_iceberg_fields.len())));
}
for (i, old_iceberg_field, new_iceberg_field) in
izip!(0.., old_iceberg_fields.iter(), new_iceberg_fields.iter())
{
if old_iceberg_field.required && !new_iceberg_field.required {
return Err(DataLoadingError::BadInputError(format!("New data is incompatible with existing schema. Field {} ({}) is required in old schema but not required in new schema", i, old_iceberg_field.name)));
}
if old_iceberg_field.field_type != new_iceberg_field.field_type {
return Err(DataLoadingError::BadInputError(format!("New data is incompatible with existing schema. Field {} ({}) has data type {:?} in old schema but {:?} in new schema", i, old_iceberg_field.name, old_iceberg_field.field_type, new_iceberg_field.field_type)));
}
}
Ok(())
}

// Create a new TableMetadata object by updating the current snapshot of an existing TableMetadata
fn update_metadata_snapshot(
previous_metadata: &TableMetadata,
Expand Down Expand Up @@ -139,10 +170,6 @@ pub async fn record_batches_to_iceberg(
pin_mut!(record_batch_stream);

let file_io = create_file_io(target_url.to_string())?;
let arrow_schema_with_ids = assign_field_ids(arrow_schema.clone());
let iceberg_schema = Arc::new(iceberg::arrow::arrow_schema_to_schema(
&arrow_schema_with_ids,
)?);

let version_hint_location = format!("{}/metadata/version-hint.text", target_url);
let version_hint_input = file_io.new_input(&version_hint_location)?;
Expand Down Expand Up @@ -170,7 +197,7 @@ pub async fn record_batches_to_iceberg(
} else {
None
};
let (previous_metadata, previous_metadata_location) = match old_version_hint {
let (previous_metadata, previous_metadata_location, iceberg_schema) = match old_version_hint {
Some(version_hint) => {
let old_metadata_location =
format!("{}/metadata/v{}.metadata.json", target_url, version_hint);
Expand All @@ -188,17 +215,21 @@ pub async fn record_batches_to_iceberg(
"Could not parse old metadata file",
))
})?;
if old_metadata.current_schema() != &iceberg_schema {
return Err(DataLoadingError::IcebergError(iceberg::Error::new(
iceberg::ErrorKind::FeatureUnsupported,
"Schema changes not supported",
)));
}
(old_metadata, Some(old_metadata_location))
let old_iceberg_schema = old_metadata.current_schema();
is_schema_aligned(&arrow_schema, old_iceberg_schema)?;
(
old_metadata.clone(),
Some(old_metadata_location),
old_iceberg_schema.clone(),
)
}
None => {
let arrow_schema_with_ids = assign_field_ids(&arrow_schema);
let iceberg_schema = Arc::new(iceberg::arrow::arrow_schema_to_schema(
&arrow_schema_with_ids,
)?);
let empty_metadata = create_empty_metadata(&iceberg_schema, target_url.to_string())?;
(empty_metadata, None)
(empty_metadata, None, iceberg_schema)
}
};

Expand Down Expand Up @@ -344,3 +375,179 @@ pub async fn record_batches_to_iceberg(

Ok(())
}

#[cfg(test)]
mod tests {
use std::{collections::HashMap, sync::Arc};

use arrow_schema::{DataType, Field, Schema};
use iceberg::spec::{NestedField, PrimitiveType, Type};

use crate::iceberg_destination::is_schema_aligned;

#[test]
fn test_is_schema_aligned_positive() {
let arrow_schema = Schema::new_with_metadata(
vec![
Field::new("a", DataType::Utf8, false),
Field::new("b", DataType::Int32, false),
Field::new("c", DataType::Boolean, false),
],
HashMap::new(),
);

let iceberg_schema = iceberg::spec::Schema::builder()
.with_fields(vec![
NestedField::optional(1, "a", Type::Primitive(PrimitiveType::String)).into(),
NestedField::required(2, "b", Type::Primitive(PrimitiveType::Int)).into(),
NestedField::optional(3, "c", Type::Primitive(PrimitiveType::Boolean)).into(),
])
.build()
.unwrap();

assert!(is_schema_aligned(&Arc::new(arrow_schema), &Arc::new(iceberg_schema)).is_ok());
}

#[test]
fn test_is_schema_aligned_positive_renamed() {
let arrow_schema = Schema::new_with_metadata(
vec![
// Fields renamed
Field::new("x", DataType::Utf8, false),
Field::new("y", DataType::Int32, false),
Field::new("z", DataType::Boolean, false),
],
HashMap::new(),
);

let iceberg_schema = iceberg::spec::Schema::builder()
.with_fields(vec![
NestedField::required(1, "a", Type::Primitive(PrimitiveType::String)).into(),
NestedField::required(2, "b", Type::Primitive(PrimitiveType::Int)).into(),
NestedField::required(3, "c", Type::Primitive(PrimitiveType::Boolean)).into(),
])
.build()
.unwrap();

assert!(is_schema_aligned(&Arc::new(arrow_schema), &Arc::new(iceberg_schema)).is_ok());
}

// OK to insert a non-nullable value into a nullable field
#[test]
fn test_is_schema_aligned_positive_nonnullable() {
let arrow_schema = Schema::new_with_metadata(
vec![
Field::new("a", DataType::Utf8, false),
Field::new("b", DataType::Int32, false),
Field::new("c", DataType::Boolean, false),
],
HashMap::new(),
);

let iceberg_schema = iceberg::spec::Schema::builder()
.with_fields(vec![
NestedField::optional(1, "a", Type::Primitive(PrimitiveType::String)).into(),
NestedField::optional(2, "b", Type::Primitive(PrimitiveType::Int)).into(),
NestedField::optional(3, "c", Type::Primitive(PrimitiveType::Boolean)).into(),
])
.build()
.unwrap();

assert!(is_schema_aligned(&Arc::new(arrow_schema), &Arc::new(iceberg_schema)).is_ok());
}

#[test]
fn test_is_schema_aligned_negative_added_field() {
let arrow_schema = Schema::new_with_metadata(
vec![
Field::new("a", DataType::Utf8, false),
Field::new("b", DataType::Int32, false),
Field::new("c", DataType::Boolean, false),
Field::new("d", DataType::Boolean, false), // Added field
],
HashMap::new(),
);

let iceberg_schema = iceberg::spec::Schema::builder()
.with_fields(vec![
NestedField::required(1, "a", Type::Primitive(PrimitiveType::String)).into(),
NestedField::required(2, "b", Type::Primitive(PrimitiveType::Int)).into(),
NestedField::required(3, "c", Type::Primitive(PrimitiveType::Boolean)).into(),
])
.build()
.unwrap();

assert!(is_schema_aligned(&Arc::new(arrow_schema), &Arc::new(iceberg_schema)).is_err());
}

#[test]
fn test_is_schema_aligned_negative_different_type() {
let arrow_schema = Schema::new_with_metadata(
vec![
Field::new("a", DataType::Utf8, false),
Field::new("b", DataType::Int32, false),
Field::new("c", DataType::Int32, false), // Mismatched type
],
HashMap::new(),
);

let iceberg_schema = iceberg::spec::Schema::builder()
.with_fields(vec![
NestedField::required(1, "a", Type::Primitive(PrimitiveType::String)).into(),
NestedField::required(2, "b", Type::Primitive(PrimitiveType::Int)).into(),
NestedField::required(3, "c", Type::Primitive(PrimitiveType::Boolean)).into(),
])
.build()
.unwrap();

assert!(is_schema_aligned(&Arc::new(arrow_schema), &Arc::new(iceberg_schema)).is_err());
}

#[test]
fn test_is_schema_aligned_negative_reordered() {
let arrow_schema = Schema::new_with_metadata(
vec![
// Same fields but in wrong order
Field::new("b", DataType::Int32, false),
Field::new("a", DataType::Utf8, false),
Field::new("c", DataType::Boolean, false),
],
HashMap::new(),
);

let iceberg_schema = iceberg::spec::Schema::builder()
.with_fields(vec![
NestedField::required(1, "a", Type::Primitive(PrimitiveType::String)).into(),
NestedField::required(2, "b", Type::Primitive(PrimitiveType::Int)).into(),
NestedField::required(3, "c", Type::Primitive(PrimitiveType::Boolean)).into(),
])
.build()
.unwrap();

assert!(is_schema_aligned(&Arc::new(arrow_schema), &Arc::new(iceberg_schema)).is_err());
}

// Not allowed to insert a nullable value into a non-nullable field
#[test]
fn test_is_schema_aligned_negative_nullable() {
let arrow_schema = Schema::new_with_metadata(
vec![
Field::new("a", DataType::Utf8, true), // Nullable
Field::new("b", DataType::Int32, false),
Field::new("c", DataType::Boolean, false),
],
HashMap::new(),
);

let iceberg_schema = iceberg::spec::Schema::builder()
.with_fields(vec![
NestedField::required(1, "a", Type::Primitive(PrimitiveType::String)).into(),
NestedField::required(2, "b", Type::Primitive(PrimitiveType::Int)).into(),
NestedField::required(3, "c", Type::Primitive(PrimitiveType::Boolean)).into(),
])
.build()
.unwrap();

assert!(is_schema_aligned(&Arc::new(arrow_schema), &Arc::new(iceberg_schema)).is_err());
}
}
13 changes: 6 additions & 7 deletions tests/basic_integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,35 +140,34 @@ async fn test_pg_to_iceberg() {
Ok(_) => panic!("Expected command to fail but it succeeded"),
};

// WHEN we try to write to an existing table with a different schema
println!("DBG TestStart");
// WHEN we try to write to an existing table with an incompatible schema
// THEN the command errors out
let args = vec![
"lakehouse-loader",
"pg-to-iceberg",
"postgres://test-user:test-password@localhost:5432/test-db",
"-q",
"select cint4, cint8 cint8_newname, ctext, cbool from t1 order by id",
"select cint4, cint8::text cint8_casted, ctext, cbool from t1 order by id",
target_url,
"--overwrite",
];
match do_main(Cli::parse_from(args.clone())).await {
Err(DataLoadingError::IcebergError(e)) => {
assert!(e.kind() == iceberg::ErrorKind::FeatureUnsupported);
}
Err(DataLoadingError::BadInputError(_)) => {}
Err(e) => {
panic!("Unexpected error type: {:?}", e);
}
Ok(_) => panic!("Expected command to fail but it succeeded"),
};

// WHEN we try to write to an existing table with the same schema
// WHEN we try to write to an existing table with a compatible schema
// THEN the command succeeds
let args = vec![
"lakehouse-loader",
"pg-to-iceberg",
"postgres://test-user:test-password@localhost:5432/test-db",
"-q",
"select cint4, cint8 + 1 cint8, ctext, cbool from t1 order by id",
"select cint4, cint8 + 1 cint8_renamed, ctext, cbool from t1 order by id",
target_url,
"--overwrite",
];
Expand Down

0 comments on commit d90700c

Please sign in to comment.