Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: correct partition-id to field-id in UnboundPartitionField #576

Merged
merged 4 commits into from
Aug 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion crates/iceberg/src/expr/visitors/expression_evaluator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ mod tests {
.add_unbound_fields(vec![UnboundPartitionField::builder()
.source_id(1)
.name("a".to_string())
.partition_id(1)
.field_id(1)
.transform(Transform::Identity)
.build()])
.unwrap()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1661,7 +1661,7 @@ mod test {
.add_unbound_fields(vec![UnboundPartitionField::builder()
.source_id(1)
.name("a".to_string())
.partition_id(1)
.field_id(1)
.transform(Transform::Identity)
.build()])
.unwrap()
Expand Down
6 changes: 3 additions & 3 deletions crates/iceberg/src/expr/visitors/inclusive_projection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,7 @@ mod tests {
UnboundPartitionField::builder()
.source_id(1)
.name("a".to_string())
.partition_id(1)
.field_id(1)
.transform(Transform::Identity)
.build(),
)
Expand Down Expand Up @@ -386,7 +386,7 @@ mod tests {
UnboundPartitionField::builder()
.source_id(3)
.name("name_truncate".to_string())
.partition_id(3)
.field_id(3)
.transform(Transform::Truncate(4))
.build(),
)
Expand Down Expand Up @@ -426,7 +426,7 @@ mod tests {
UnboundPartitionField::builder()
.source_id(1)
.name("a_bucket[7]".to_string())
.partition_id(1)
.field_id(1)
.transform(Transform::Bucket(7))
.build(),
)
Expand Down
78 changes: 37 additions & 41 deletions crates/iceberg/src/spec/partition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ pub struct UnboundPartitionField {
/// A partition field id that is used to identify a partition field and is unique within a partition spec.
/// In v2 table metadata, it is unique across all partition specs.
#[builder(default, setter(strip_option))]
pub partition_id: Option<i32>,
pub field_id: Option<i32>,
/// A partition name.
pub name: String,
/// A transform that is applied to the source column to produce a partition value.
Expand Down Expand Up @@ -177,7 +177,7 @@ impl From<PartitionField> for UnboundPartitionField {
fn from(field: PartitionField) -> Self {
UnboundPartitionField {
source_id: field.source_id,
partition_id: Some(field.field_id),
field_id: Some(field.field_id),
name: field.name,
transform: field.transform,
}
Expand Down Expand Up @@ -224,7 +224,7 @@ impl UnboundPartitionSpecBuilder {
) -> Result<Self> {
let field = UnboundPartitionField {
source_id,
partition_id: None,
field_id: None,
name: target_name.to_string(),
transform: transformation,
};
Expand All @@ -246,8 +246,8 @@ impl UnboundPartitionSpecBuilder {
fn add_partition_field_internal(mut self, field: UnboundPartitionField) -> Result<Self> {
self.check_name_set_and_unique(&field.name)?;
self.check_for_redundant_partitions(field.source_id, &field.transform)?;
if let Some(partition_id) = field.partition_id {
self.check_partition_id_unique(partition_id)?;
if let Some(partition_field_id) = field.field_id {
self.check_partition_id_unique(partition_field_id)?;
}
self.fields.push(field);
Ok(self)
Expand Down Expand Up @@ -331,7 +331,7 @@ impl<'a> PartitionSpecBuilder<'a> {
.id;
let field = UnboundPartitionField {
source_id,
partition_id: None,
field_id: None,
name: target_name.into(),
transform,
};
Expand All @@ -341,15 +341,15 @@ impl<'a> PartitionSpecBuilder<'a> {

/// Add a new partition field to the partition spec.
///
/// If `partition_id` is set, it is used as the field id.
/// If partition field id is set, it is used as the field id.
/// Otherwise, a new `field_id` is assigned.
pub fn add_unbound_field(mut self, field: UnboundPartitionField) -> Result<Self> {
self.check_name_set_and_unique(&field.name)?;
self.check_for_redundant_partitions(field.source_id, &field.transform)?;
Self::check_name_does_not_collide_with_schema(&field, self.schema)?;
Self::check_transform_compatibility(&field, self.schema)?;
if let Some(partition_id) = field.partition_id {
self.check_partition_id_unique(partition_id)?;
if let Some(partition_field_id) = field.field_id {
self.check_partition_id_unique(partition_field_id)?;
}

// Non-fallible from here
Expand Down Expand Up @@ -387,7 +387,7 @@ impl<'a> PartitionSpecBuilder<'a> {
// we skip it.
let assigned_ids = fields
.iter()
.filter_map(|f| f.partition_id)
.filter_map(|f| f.field_id)
.collect::<std::collections::HashSet<_>>();

fn _check_add_1(prev: i32) -> Result<i32> {
Expand All @@ -401,9 +401,9 @@ impl<'a> PartitionSpecBuilder<'a> {

let mut bound_fields = Vec::with_capacity(fields.len());
for field in fields.into_iter() {
let partition_id = if let Some(partition_id) = field.partition_id {
last_assigned_field_id = std::cmp::max(last_assigned_field_id, partition_id);
partition_id
let partition_field_id = if let Some(partition_field_id) = field.field_id {
last_assigned_field_id = std::cmp::max(last_assigned_field_id, partition_field_id);
partition_field_id
} else {
last_assigned_field_id = _check_add_1(last_assigned_field_id)?;
while assigned_ids.contains(&last_assigned_field_id) {
Expand All @@ -414,7 +414,7 @@ impl<'a> PartitionSpecBuilder<'a> {

bound_fields.push(PartitionField {
source_id: field.source_id,
field_id: partition_id,
field_id: partition_field_id,
name: field.name,
transform: field.transform,
})
Expand Down Expand Up @@ -544,11 +544,7 @@ trait CorePartitionSpecValidator {

/// Check field / partition_id unique within the partition spec if set
fn check_partition_id_unique(&self, field_id: i32) -> Result<()> {
if self
.fields()
.iter()
.any(|f| f.partition_id == Some(field_id))
{
if self.fields().iter().any(|f| f.field_id == Some(field_id)) {
return Err(Error::new(
ErrorKind::DataInvalid,
format!(
Expand Down Expand Up @@ -698,17 +694,17 @@ mod tests {
"spec-id": 1,
"fields": [ {
"source-id": 4,
"partition-id": 1000,
"field-id": 1000,
"name": "ts_day",
"transform": "day"
}, {
"source-id": 1,
"partition-id": 1001,
"field-id": 1001,
"name": "id_bucket",
"transform": "bucket[16]"
}, {
"source-id": 2,
"partition-id": 1002,
"field-id": 1002,
"name": "id_truncate",
"transform": "truncate[4]"
} ]
Expand All @@ -719,17 +715,17 @@ mod tests {
assert_eq!(Some(1), partition_spec.spec_id);

assert_eq!(4, partition_spec.fields[0].source_id);
assert_eq!(Some(1000), partition_spec.fields[0].partition_id);
assert_eq!(Some(1000), partition_spec.fields[0].field_id);
assert_eq!("ts_day", partition_spec.fields[0].name);
assert_eq!(Transform::Day, partition_spec.fields[0].transform);

assert_eq!(1, partition_spec.fields[1].source_id);
assert_eq!(Some(1001), partition_spec.fields[1].partition_id);
assert_eq!(Some(1001), partition_spec.fields[1].field_id);
assert_eq!("id_bucket", partition_spec.fields[1].name);
assert_eq!(Transform::Bucket(16), partition_spec.fields[1].transform);

assert_eq!(2, partition_spec.fields[2].source_id);
assert_eq!(Some(1002), partition_spec.fields[2].partition_id);
assert_eq!(Some(1002), partition_spec.fields[2].field_id);
assert_eq!("id_truncate", partition_spec.fields[2].name);
assert_eq!(Transform::Truncate(4), partition_spec.fields[2].transform);

Expand All @@ -746,7 +742,7 @@ mod tests {
assert_eq!(None, partition_spec.spec_id);

assert_eq!(4, partition_spec.fields[0].source_id);
assert_eq!(None, partition_spec.fields[0].partition_id);
assert_eq!(None, partition_spec.fields[0].field_id);
assert_eq!("ts_day", partition_spec.fields[0].name);
assert_eq!(Transform::Day, partition_spec.fields[0].transform);
}
Expand Down Expand Up @@ -963,14 +959,14 @@ mod tests {
PartitionSpec::builder(&schema)
.add_unbound_field(UnboundPartitionField {
source_id: 1,
partition_id: Some(1000),
field_id: Some(1000),
name: "id".to_string(),
transform: Transform::Identity,
})
.unwrap()
.add_unbound_field(UnboundPartitionField {
source_id: 2,
partition_id: Some(1000),
field_id: Some(1000),
name: "id_bucket".to_string(),
transform: Transform::Bucket(16),
})
Expand Down Expand Up @@ -1004,22 +1000,22 @@ mod tests {
source_id: 1,
name: "id".to_string(),
transform: Transform::Identity,
partition_id: Some(1012),
field_id: Some(1012),
})
.unwrap()
.add_unbound_field(UnboundPartitionField {
source_id: 2,
name: "name_void".to_string(),
transform: Transform::Void,
partition_id: None,
field_id: None,
})
.unwrap()
// Should keep its ID even if its lower
.add_unbound_field(UnboundPartitionField {
source_id: 3,
name: "year".to_string(),
transform: Transform::Year,
partition_id: Some(1),
field_id: Some(1),
})
.unwrap()
.build()
Expand Down Expand Up @@ -1090,7 +1086,7 @@ mod tests {
.with_spec_id(1)
.add_unbound_field(UnboundPartitionField {
source_id: 1,
partition_id: None,
field_id: None,
name: "id".to_string(),
transform: Transform::Bucket(16),
})
Expand Down Expand Up @@ -1123,7 +1119,7 @@ mod tests {
.with_spec_id(1)
.add_unbound_field(UnboundPartitionField {
source_id: 1,
partition_id: None,
field_id: None,
name: "id".to_string(),
transform: Transform::Identity,
})
Expand All @@ -1136,7 +1132,7 @@ mod tests {
.with_spec_id(1)
.add_unbound_field(UnboundPartitionField {
source_id: 2,
partition_id: None,
field_id: None,
name: "id".to_string(),
transform: Transform::Identity,
})
Expand Down Expand Up @@ -1171,13 +1167,13 @@ mod tests {
.add_unbound_fields(vec![
UnboundPartitionField {
source_id: 1,
partition_id: None,
field_id: None,
name: "id_bucket".to_string(),
transform: Transform::Bucket(16),
},
UnboundPartitionField {
source_id: 2,
partition_id: None,
field_id: None,
name: "name".to_string(),
transform: Transform::Identity,
},
Expand All @@ -1192,13 +1188,13 @@ mod tests {
.add_unbound_fields(vec![
UnboundPartitionField {
source_id: 1,
partition_id: None,
field_id: None,
name: "id_bucket".to_string(),
transform: Transform::Bucket(16),
},
UnboundPartitionField {
source_id: 4,
partition_id: None,
field_id: None,
name: "name".to_string(),
transform: Transform::Identity,
},
Expand Down Expand Up @@ -1237,7 +1233,7 @@ mod tests {
.with_spec_id(1)
.add_unbound_field(UnboundPartitionField {
source_id: 1,
partition_id: None,
field_id: None,
name: "id_year".to_string(),
transform: Transform::Year,
})
Expand All @@ -1250,7 +1246,7 @@ mod tests {
.with_spec_id(1)
.add_partition_fields(vec![UnboundPartitionField {
source_id: 1,
partition_id: None,
field_id: None,
name: "id_bucket[16]".to_string(),
transform: Transform::Bucket(16),
}])
Expand All @@ -1261,7 +1257,7 @@ mod tests {
spec_id: Some(1),
fields: vec![UnboundPartitionField {
source_id: 1,
partition_id: None,
field_id: None,
name: "id_bucket[16]".to_string(),
transform: Transform::Bucket(16),
}]
Expand Down
6 changes: 3 additions & 3 deletions crates/iceberg/src/spec/table_metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1293,7 +1293,7 @@ mod tests {
name: "x".to_string(),
transform: Transform::Identity,
source_id: 1,
partition_id: Some(1000),
field_id: Some(1000),
Copy link
Member

@Xuanwo Xuanwo Aug 23, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a bit surprised that the test against testdata/table_metadata/TableMetadataV2Valid.json passed before. Would you like to conduct a deep investigation?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure

Copy link
Contributor Author

@FANNG1 FANNG1 Aug 24, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When building PartitionSpec, UnboundPartitionField is transformed to PartitionField, in which partition_id is transformed field_id

            bound_fields.push(PartitionField {
                source_id: field.source_id,
                field_id: partition_id,
                name: field.name,
                transform: field.transform,
            })

https://github.com/apache/iceberg-rust/blob/main/crates/iceberg/src/spec/partition.rs#L415-L420

Copy link
Contributor Author

@FANNG1 FANNG1 Aug 24, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By the way, I think add_partition_field is more general to use, seems add_unbound_field should only be used in test to specify partition field id, we should remove pub from add_unbound_field.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, would you like to submit a new issue for disucsion. I believe this PR is good to go.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, thanks

})
.unwrap()
.build()
Expand Down Expand Up @@ -1416,7 +1416,7 @@ mod tests {
name: "x".to_string(),
transform: Transform::Identity,
source_id: 1,
partition_id: Some(1000),
field_id: Some(1000),
})
.unwrap()
.build()
Expand Down Expand Up @@ -1496,7 +1496,7 @@ mod tests {
name: "x".to_string(),
transform: Transform::Identity,
source_id: 1,
partition_id: Some(1000),
field_id: Some(1000),
})
.unwrap()
.build()
Expand Down
Loading