Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Partition Evolution #245

Merged
merged 1 commit into from
Feb 28, 2024
Merged

Conversation

amogh-jahagirdar
Copy link
Contributor

@amogh-jahagirdar amogh-jahagirdar commented Dec 29, 2023

Fixes #193

@amogh-jahagirdar
Copy link
Contributor Author

There's still a lot of cleanup required, and need to add unit tests, also some bugs I'm encountering. But I'm putting up this draft since the core pieces are here.

pyiceberg/partitioning.py Outdated Show resolved Hide resolved
pyiceberg/table/__init__.py Outdated Show resolved Hide resolved
pyiceberg/table/__init__.py Outdated Show resolved Hide resolved
pyiceberg/table/__init__.py Outdated Show resolved Hide resolved
pyiceberg/partitioning.py Outdated Show resolved Hide resolved
pyiceberg/partitioning.py Outdated Show resolved Hide resolved
pyiceberg/partitioning.py Outdated Show resolved Hide resolved
pyiceberg/partitioning.py Outdated Show resolved Hide resolved
pyiceberg/partitioning.py Outdated Show resolved Hide resolved
pyiceberg/partitioning.py Outdated Show resolved Hide resolved
if not source_name:
raise ValueError(f"Could not find column with id {field.source_id}")

transform = field.transform
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice to have: Do we also want to have a single dispatch to map these types?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you create an issue for this?

pyiceberg/table/__init__.py Outdated Show resolved Hide resolved
pyiceberg/table/__init__.py Outdated Show resolved Hide resolved
pyiceberg/table/__init__.py Outdated Show resolved Hide resolved
pyiceberg/table/__init__.py Outdated Show resolved Hide resolved
pyiceberg/table/__init__.py Outdated Show resolved Hide resolved
Comment on lines 922 to 929
def last_partition_id(self) -> Optional[int]:
"""Return the highest assigned partition field ID across all specs for the table or None if the table is unpartitioned and there are no specs."""
if len(self.specs()) == 1 and self.spec().is_unpartitioned():
return None
return self.metadata.last_partition_id
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Fokko @HonahX I added this API to the Table since we'll need it in the implementation and we don't want to directly access TableMetadata let me know what you think

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we probably should update

@property
def last_assigned_field_id(self) -> int:
if self.fields:
return max(pf.field_id for pf in self.fields)
return PARTITION_FIELD_ID_START

to return PARTITION_FIELD_ID_START - 1 for unpartitioned spec. Then we can return the last_partition_id from metadata directly because the metadata should have last_partition_id=999 for unpartitioned table.

Java implementation uses PARTITION_FIELD_ID_START - 1 for unpartitioned spec:
https://github.com/apache/iceberg/blob/main/api/src/main/java/org/apache/iceberg/PartitionSpec.java#L344-L345
https://github.com/apache/iceberg/blob/9921937d8285dec9a19fd16b0cd82d451a8aca9e/api/src/main/java/org/apache/iceberg/PartitionSpec.java#L319-L321

I checked locally that unpartitioned tables created by spark-iceberg-runtime have last_partition_id=999, while those created by pyiceberg have last_partition_id=1000:

Copy link
Contributor Author

@amogh-jahagirdar amogh-jahagirdar Feb 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking about that but wasn't sure about breaking API behavior which is why I added a new API. If we have the flexibility here to change the API we should do that. I think we can because arguably it's incorrect to return 1000 for an unpartitioned table so it's really a fix.

update.remove_field("day_ts").remove_field("bucketed_id")
with table_v2.update_spec() as update:
update.add_field("str", TruncateTransform(2), "truncated_str")
_validate_new_partition_fields(table_v2, 1002, 2, PartitionField(3, 1002, TruncateTransform(2), "truncated_str"))
Copy link
Contributor Author

@amogh-jahagirdar amogh-jahagirdar Feb 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test shows why assigning new field IDs based on the last field ID across all specs is important to avoid collisions. In this case if we just used the last spec, after the remove_field is done for the original partitions, the latest spec would just be the unpartitioned spec. Then when we go and add the new truncated_str partitioned field, we would create a field ID of 1000 which is not what we want (it'll collide with the original 1000 field ID of the bcuket transform on ID)

@amogh-jahagirdar amogh-jahagirdar force-pushed the partition-evolution branch 2 times, most recently from 12fd0f0 to 351ec9b Compare February 2, 2024 14:46
@amogh-jahagirdar amogh-jahagirdar force-pushed the partition-evolution branch 2 times, most recently from 615bd93 to 7fbcc22 Compare February 2, 2024 15:43
Comment on lines 922 to 929
def last_partition_id(self) -> Optional[int]:
"""Return the highest assigned partition field ID across all specs for the table or None if the table is unpartitioned and there are no specs."""
if len(self.specs()) == 1 and self.spec().is_unpartitioned():
return None
return self.metadata.last_partition_id
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we probably should update

@property
def last_assigned_field_id(self) -> int:
if self.fields:
return max(pf.field_id for pf in self.fields)
return PARTITION_FIELD_ID_START

to return PARTITION_FIELD_ID_START - 1 for unpartitioned spec. Then we can return the last_partition_id from metadata directly because the metadata should have last_partition_id=999 for unpartitioned table.

Java implementation uses PARTITION_FIELD_ID_START - 1 for unpartitioned spec:
https://github.com/apache/iceberg/blob/main/api/src/main/java/org/apache/iceberg/PartitionSpec.java#L344-L345
https://github.com/apache/iceberg/blob/9921937d8285dec9a19fd16b0cd82d451a8aca9e/api/src/main/java/org/apache/iceberg/PartitionSpec.java#L319-L321

I checked locally that unpartitioned tables created by spark-iceberg-runtime have last_partition_id=999, while those created by pyiceberg have last_partition_id=1000:

pyiceberg/table/__init__.py Show resolved Hide resolved
tests/test_integration_partition_evolution.py Outdated Show resolved Hide resolved
@amogh-jahagirdar amogh-jahagirdar force-pushed the partition-evolution branch 2 times, most recently from 89e3bcb to 3e7e183 Compare February 6, 2024 06:07
@@ -308,7 +308,8 @@ def construct_partition_specs(cls, data: Dict[str, Any]) -> Dict[str, Any]:
data[PARTITION_SPECS] = [{"field-id": 0, "fields": ()}]

data[LAST_PARTITION_ID] = max(
[field.get(FIELD_ID) for spec in data[PARTITION_SPECS] for field in spec[FIELDS]], default=PARTITION_FIELD_ID_START
[field.get(FIELD_ID) for spec in data[PARTITION_SPECS] for field in spec[FIELDS]],
default=PARTITION_FIELD_ID_START - 1,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This needs to be updated so that in case there are partition specs, we return 999. It's insufficient to just update the PartitionSpec#last_assigned_field_id method. I do believe this is spec compliant since the spec doesn't explicitly say what values these IDs should be. This is also what Spark does when one creates an unpartitioned table. The spec does say in v1, ids were assigned starting at 1000, which is still followed. So I think we're covered. @Fokko @HonahX

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I see. So the next one will be 1000. It feels a bit like a workaround, let me check

@@ -277,7 +277,7 @@ def test_create_table(table_schema_simple: Schema, hive_database: HiveDatabase,
)
],
current_schema_id=0,
last_partition_id=1000,
last_partition_id=999,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this one is a bit odd since it is V2 table. I had to dig into the code myself a bit as well. I noticed that the last_partition_id is optional in the metadata. What do you think of the following solution: amogh-jahagirdar#1

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I checked V2 unpartitioned table created by spark-iceberg-runtime and the last_partition_id stored in the metadata is 999.
Screenshot 2024-02-21 at 16 27 58
Therefore I suggested to update the last_partition_id() in pyiceberg to align with the java implementation.

In general, I think 999 is spec compliant since it is for UnpartitionedSpec, where there is no existing partition field. It implies that 1000 will be the id for the first valid partition field and thus align with the spec. Do these sound reasonable? Appreciate your thoughts on this!

Copy link
Contributor Author

@amogh-jahagirdar amogh-jahagirdar Feb 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Fokko I took a look and integrated the changes, and after going back/forth I think I'd like to keep the changes as is.
My rationale is that even after those changes there's some more workarounds that need to be done to make sure new IDs start at 1000 (after taking the changes directly, field IDs for non-REST will start at 1001 without any more changes).

Now, technically it does not seem to be a hard requirement that ids need to start at 1000 for v2 tables. Even for v1 starting at 1000 does not seem to be a requirement, rather just how the Java lib was implemented.

In v1, partition field IDs were not tracked, but were assigned sequentially starting at 1000 in the reference implementation.

I read this as "this was how we originally implemented in Java but not really required so long as it's unique and increasing for new fields"

All that said, I'd advocate for just following the practice of starting at 1000 for both v1 and v2 because it's just the established model and avoids any confusion.

On returning 999 for unpartitioned spec:

As @HonahX alluded to I think that makes for a logical API, considering we want to start a field at 1000, the unpartitioned spec (no fields) last assigned field ID should be one less than that. I don't think we want to return 1000 in that case. This is also what Spark sets for the unpartitioned spec, and is spec compliant (since the spec doesn't mandate any particular IDs)

I could see the argument where we just want to return None for a last_assigned_partition_id for this API but that just shifts the responsibility to other locations to make sure Ids are set correctly according to our scheme which ends up being more messy imo compared to just defining the API to return 999 if the only spec is the unpartitioned spec (which is the value which would be set anyways).

I also think it makes sense to set last-assigned-partition-id for both V1 and V2. Even though it's optional for V1 we set the other optional fields for v1 metadata so it seems a bit odd to make this particular case an exception.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds reasonable to me, and as long as it is in line with the spec, I'm okay with it 👍

@@ -277,7 +277,7 @@ def test_create_table(table_schema_simple: Schema, hive_database: HiveDatabase,
)
],
current_schema_id=0,
last_partition_id=1000,
last_partition_id=999,
Copy link
Contributor Author

@amogh-jahagirdar amogh-jahagirdar Feb 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Fokko I took a look and integrated the changes, and after going back/forth I think I'd like to keep the changes as is.
My rationale is that even after those changes there's some more workarounds that need to be done to make sure new IDs start at 1000 (after taking the changes directly, field IDs for non-REST will start at 1001 without any more changes).

Now, technically it does not seem to be a hard requirement that ids need to start at 1000 for v2 tables. Even for v1 starting at 1000 does not seem to be a requirement, rather just how the Java lib was implemented.

In v1, partition field IDs were not tracked, but were assigned sequentially starting at 1000 in the reference implementation.

I read this as "this was how we originally implemented in Java but not really required so long as it's unique and increasing for new fields"

All that said, I'd advocate for just following the practice of starting at 1000 for both v1 and v2 because it's just the established model and avoids any confusion.

On returning 999 for unpartitioned spec:

As @HonahX alluded to I think that makes for a logical API, considering we want to start a field at 1000, the unpartitioned spec (no fields) last assigned field ID should be one less than that. I don't think we want to return 1000 in that case. This is also what Spark sets for the unpartitioned spec, and is spec compliant (since the spec doesn't mandate any particular IDs)

I could see the argument where we just want to return None for a last_assigned_partition_id for this API but that just shifts the responsibility to other locations to make sure Ids are set correctly according to our scheme which ends up being more messy imo compared to just defining the API to return 999 if the only spec is the unpartitioned spec (which is the value which would be set anyways).

I also think it makes sense to set last-assigned-partition-id for both V1 and V2. Even though it's optional for V1 we set the other optional fields for v1 metadata so it seems a bit odd to make this particular case an exception.



@pytest.mark.integration
@pytest.mark.parametrize('catalog', [pytest.lazy_fixture('catalog_hive'), pytest.lazy_fixture('catalog_rest')])
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've parameterized all the tests for testing both hive/rest.

@Fokko Fokko merged commit 6a34421 into apache:main Feb 28, 2024
6 checks passed
@Fokko
Copy link
Contributor

Fokko commented Feb 28, 2024

Thanks @amogh-jahagirdar for working on this, and sorry for the long wait for the review. Thanks @HonahX for the review 🙌

himadripal pushed a commit to himadripal/iceberg-python that referenced this pull request Mar 1, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Add partition evolution
3 participants