Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Reassign field ids for schema #615

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions crates/iceberg/src/spec/datatypes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -668,6 +668,12 @@ impl NestedField {
self.write_default = Some(value);
self
}

/// Set the id of the field.
pub(crate) fn with_id(mut self, id: i32) -> Self {
self.id = id;
self
}
}

impl fmt::Display for NestedField {
Expand Down
312 changes: 309 additions & 3 deletions crates/iceberg/src/spec/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ use crate::{ensure_data_valid, Error, ErrorKind};
pub type SchemaId = i32;
/// Reference to [`Schema`].
pub type SchemaRef = Arc<Schema>;
const DEFAULT_SCHEMA_ID: SchemaId = 0;
pub(crate) const DEFAULT_SCHEMA_ID: SchemaId = 0;

/// Defines schema in iceberg.
#[derive(Debug, Serialize, Deserialize, Clone)]
Expand Down Expand Up @@ -77,6 +77,7 @@ pub struct SchemaBuilder {
fields: Vec<NestedFieldRef>,
alias_to_id: BiHashMap<String, i32>,
identifier_field_ids: HashSet<i32>,
reassign_field_ids_from: Option<i32>,
}

impl SchemaBuilder {
Expand All @@ -86,6 +87,16 @@ impl SchemaBuilder {
self
}

/// Reassign all field-ids (nested) on build.
/// If `start_from` is provided, it will start reassigning from that id (inclusive).
/// If not provided, it will start from 0.
///
/// All specified aliases and identifier fields will be updated to the new field-ids.
pub fn with_reassigned_field_ids(mut self, start_from: Option<i32>) -> Self {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please elaborate on the circumstances under which the user wants to set this? I used to think users always need to start with 0.

self.reassign_field_ids_from = Some(start_from.unwrap_or(0));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we always set Some(v) here, how about just using i32 for reassign_field_ids_from with default value 0?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typically you would pass in last-column-id from the table metadata. In the case of a CREATE TABLE this will be zero, in the case of a REPLACE TABLE it will be last-column-id where all columns get a new ID and you ensure they are not used before.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Xuanwo only doubt with the i32 is that people might input 1 as something is required - and it's shorter than Default::default().
The rust tests are quite a good example that this can happen: We have many tests where field numbering starts from 1, so I had to re-write them to be compatible with the new builder.

I am OK with the i32 as well, just wanted to bring up that a None is probably more verbose.

self
}

/// Set schema id.
pub fn with_schema_id(mut self, schema_id: i32) -> Self {
self.schema_id = schema_id;
Expand All @@ -105,13 +116,24 @@ impl SchemaBuilder {
}

/// Builds the schema.
pub fn build(self) -> Result<Schema> {
let highest_field_id = self.fields.iter().map(|f| f.id).max().unwrap_or(0);
pub fn build(mut self) -> Result<Schema> {
let mut highest_field_id = None;
if let Some(start_from) = self.reassign_field_ids_from {
let mut id_reassigner = ReassignFieldIds::new(start_from);
self.fields = id_reassigner.reassign_field_ids(self.fields);
highest_field_id = Some(id_reassigner.next_field_id - 1);

self.identifier_field_ids =
id_reassigner.apply_to_identifier_fields(self.identifier_field_ids)?;
self.alias_to_id = id_reassigner.apply_to_aliases(self.alias_to_id)?;
}

let field_id_to_accessor = self.build_accessors();

let r#struct = StructType::new(self.fields);
let id_to_field = index_by_id(&r#struct)?;
let highest_field_id =
highest_field_id.unwrap_or(id_to_field.keys().max().cloned().unwrap_or(0));

Self::validate_identifier_ids(
&r#struct,
Expand Down Expand Up @@ -266,6 +288,7 @@ impl Schema {
fields: vec![],
identifier_field_ids: HashSet::default(),
alias_to_id: BiHashMap::default(),
reassign_field_ids_from: None,
}
}

Expand All @@ -276,6 +299,7 @@ impl Schema {
fields: self.r#struct.fields().to_vec(),
alias_to_id: self.alias_to_id,
identifier_field_ids: self.identifier_field_ids,
reassign_field_ids_from: None,
}
}

Expand Down Expand Up @@ -944,6 +968,122 @@ impl SchemaVisitor for PruneColumn {
}
}

struct ReassignFieldIds {
next_field_id: i32,
old_to_new_id: HashMap<i32, i32>,
}

// We are not using the visitor here, as post order traversal is not desired.
// Instead we want to re-assign all fields on one level first before diving deeper.
impl ReassignFieldIds {
fn new(start_from: i32) -> Self {
Self {
next_field_id: start_from,
old_to_new_id: HashMap::new(),
}
}

fn reassign_field_ids(&mut self, fields: Vec<NestedFieldRef>) -> Vec<NestedFieldRef> {
// Visit fields on the same level first
let outer_fields = fields
.into_iter()
.map(|field| {
self.old_to_new_id.insert(field.id, self.next_field_id);
let new_field = Arc::unwrap_or_clone(field).with_id(self.next_field_id);
self.next_field_id += 1;
Arc::new(new_field)
})
.collect::<Vec<_>>();

// Now visit nested fields
outer_fields
.into_iter()
.map(|field| {
if field.field_type.is_primitive() {
field
} else {
let mut new_field = Arc::unwrap_or_clone(field);
*new_field.field_type = self.reassign_ids_visit_type(*new_field.field_type);
Arc::new(new_field)
}
})
.collect()
}

fn reassign_ids_visit_type(&mut self, field_type: Type) -> Type {
match field_type {
Type::Primitive(s) => Type::Primitive(s),
Type::Struct(s) => {
let new_fields = self.reassign_field_ids(s.fields().to_vec());
Type::Struct(StructType::new(new_fields))
}
Type::List(l) => {
self.old_to_new_id
.insert(l.element_field.id, self.next_field_id);
let mut element_field = Arc::unwrap_or_clone(l.element_field);
element_field.id = self.next_field_id;
self.next_field_id += 1;
*element_field.field_type = self.reassign_ids_visit_type(*element_field.field_type);
Type::List(ListType {
element_field: Arc::new(element_field),
})
}
Type::Map(m) => {
self.old_to_new_id
.insert(m.key_field.id, self.next_field_id);
let mut key_field = Arc::unwrap_or_clone(m.key_field);
key_field.id = self.next_field_id;
self.next_field_id += 1;
*key_field.field_type = self.reassign_ids_visit_type(*key_field.field_type);

self.old_to_new_id
.insert(m.value_field.id, self.next_field_id);
let mut value_field = Arc::unwrap_or_clone(m.value_field);
value_field.id = self.next_field_id;
self.next_field_id += 1;
*value_field.field_type = self.reassign_ids_visit_type(*value_field.field_type);

Type::Map(MapType {
key_field: Arc::new(key_field),
value_field: Arc::new(value_field),
})
}
}
}

fn apply_to_identifier_fields(&self, field_ids: HashSet<i32>) -> Result<HashSet<i32>> {
field_ids
.into_iter()
.map(|id| {
self.old_to_new_id.get(&id).copied().ok_or_else(|| {
Error::new(
ErrorKind::DataInvalid,
format!("Identifier Field ID {} not found", id),
)
})
})
.collect()
}

fn apply_to_aliases(&self, alias: BiHashMap<String, i32>) -> Result<BiHashMap<String, i32>> {
alias
.into_iter()
.map(|(name, id)| {
self.old_to_new_id
.get(&id)
.copied()
.ok_or_else(|| {
Error::new(
ErrorKind::DataInvalid,
format!("Field with id {} for alias {} not found", id, name),
)
})
.map(|new_id| (name, new_id))
})
.collect()
}
}

pub(super) mod _serde {
/// This is a helper module that defines types to help with serialization/deserialization.
/// For deserialization the input first gets read into either the [SchemaV1] or [SchemaV2] struct
Expand Down Expand Up @@ -1063,6 +1203,8 @@ pub(super) mod _serde {
mod tests {
use std::collections::{HashMap, HashSet};

use bimap::BiHashMap;

use super::DEFAULT_SCHEMA_ID;
use crate::spec::datatypes::Type::{List, Map, Primitive, Struct};
use crate::spec::datatypes::{
Expand Down Expand Up @@ -1335,6 +1477,12 @@ table {
assert_eq!(original_schema, schema);
}

#[test]
fn test_highest_field_id() {
let schema = table_schema_nested();
assert_eq!(17, schema.highest_field_id());
}

#[test]
fn test_schema_index_by_name() {
let expected_name_to_id = HashMap::from(
Expand Down Expand Up @@ -2229,4 +2377,162 @@ table {
assert!(result.is_ok());
assert_eq!(result.unwrap(), Type::Struct(schema.as_struct().clone()));
}

#[test]
fn test_reassign_ids() {
let schema = Schema::builder()
.with_schema_id(1)
.with_identifier_field_ids(vec![3])
.with_alias(BiHashMap::from_iter(vec![("bar_alias".to_string(), 3)]))
.with_fields(vec![
NestedField::optional(5, "foo", Type::Primitive(PrimitiveType::String)).into(),
NestedField::required(3, "bar", Type::Primitive(PrimitiveType::Int)).into(),
NestedField::optional(4, "baz", Type::Primitive(PrimitiveType::Boolean)).into(),
])
.build()
.unwrap();

let reassigned_schema = schema
.into_builder()
.with_reassigned_field_ids(Some(0))
.build()
.unwrap();

let expected = Schema::builder()
.with_schema_id(1)
.with_identifier_field_ids(vec![1])
.with_alias(BiHashMap::from_iter(vec![("bar_alias".to_string(), 1)]))
.with_fields(vec![
NestedField::optional(0, "foo", Type::Primitive(PrimitiveType::String)).into(),
NestedField::required(1, "bar", Type::Primitive(PrimitiveType::Int)).into(),
NestedField::optional(2, "baz", Type::Primitive(PrimitiveType::Boolean)).into(),
])
.build()
.unwrap();

pretty_assertions::assert_eq!(expected, reassigned_schema);
assert_eq!(reassigned_schema.highest_field_id(), 2);
}

#[test]
fn test_reassigned_ids_nested() {
let schema = table_schema_nested();
let reassigned_schema = schema
.into_builder()
.with_alias(BiHashMap::from_iter(vec![("bar_alias".to_string(), 2)]))
.with_reassigned_field_ids(Some(0))
.build()
.unwrap();

let expected = Schema::builder()
.with_schema_id(1)
.with_identifier_field_ids(vec![1])
.with_alias(BiHashMap::from_iter(vec![("bar_alias".to_string(), 1)]))
.with_fields(vec![
NestedField::optional(0, "foo", Type::Primitive(PrimitiveType::String)).into(),
NestedField::required(1, "bar", Type::Primitive(PrimitiveType::Int)).into(),
NestedField::optional(2, "baz", Type::Primitive(PrimitiveType::Boolean)).into(),
NestedField::required(
3,
"qux",
Type::List(ListType {
element_field: NestedField::list_element(
7,
Type::Primitive(PrimitiveType::String),
true,
)
.into(),
}),
)
.into(),
NestedField::required(
4,
"quux",
Type::Map(MapType {
key_field: NestedField::map_key_element(
8,
Type::Primitive(PrimitiveType::String),
)
.into(),
value_field: NestedField::map_value_element(
9,
Type::Map(MapType {
key_field: NestedField::map_key_element(
10,
Type::Primitive(PrimitiveType::String),
)
.into(),
value_field: NestedField::map_value_element(
11,
Type::Primitive(PrimitiveType::Int),
true,
)
.into(),
}),
true,
)
.into(),
}),
)
.into(),
NestedField::required(
5,
"location",
Type::List(ListType {
element_field: NestedField::list_element(
12,
Type::Struct(StructType::new(vec![
NestedField::optional(
13,
"latitude",
Type::Primitive(PrimitiveType::Float),
)
.into(),
NestedField::optional(
14,
"longitude",
Type::Primitive(PrimitiveType::Float),
)
.into(),
])),
true,
)
.into(),
}),
)
.into(),
NestedField::optional(
6,
"person",
Type::Struct(StructType::new(vec![
NestedField::optional(15, "name", Type::Primitive(PrimitiveType::String))
.into(),
NestedField::required(16, "age", Type::Primitive(PrimitiveType::Int))
.into(),
])),
)
.into(),
])
.build()
.unwrap();

pretty_assertions::assert_eq!(expected, reassigned_schema);
assert_eq!(reassigned_schema.highest_field_id(), 16);
assert_eq!(reassigned_schema.field_by_id(6).unwrap().name, "person");
assert_eq!(reassigned_schema.field_by_id(16).unwrap().name, "age");
}

#[test]
fn test_reassign_ids_empty_schema() {
let schema = Schema::builder().with_schema_id(1).build().unwrap();
let reassigned_schema = schema
.clone()
.into_builder()
.with_reassigned_field_ids(Some(0))
.build()
.unwrap();

assert_eq!(schema, reassigned_schema);
assert_eq!(schema.highest_field_id(), 0);
}
}
Loading