Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

create_table with a PyArrow Schema #305

Merged
merged 9 commits into from
Jan 30, 2024

Conversation

sungwy
Copy link
Collaborator

@sungwy sungwy commented Jan 25, 2024

This addresses: #278

Added UP037 [*] Remove quotes from type annotation to ruff in order to support Forward References: PEP-484

As discussed in the issue, we are proposing to update the create_table API to:

    def create_table(
        self,
        identifier: Union[str, Identifier],
        schema: Union[Schema, "pa.Schema"],
        location: Optional[str] = None,
        partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC,
        sort_order: SortOrder = UNSORTED_SORT_ORDER,
        properties: Properties = EMPTY_DICT,
    ) -> Table:
    ...
    if isinstance(schema, pa.Schema):
        schema = pre_order_visit_pyarrow(schema, _ConvertToIcebergWithFreshIds())
    ...
    # existing code

We will call the function like:

table: pa.Table
catalog = load_catalog()
catalog.create_table('some.table', schema=table.schema)

And use the previously proposed Visitor: https://github.com/syun64/iceberg-python/blob/preorder-fresh-schema/pyiceberg/io/pyarrow.py#L994 since new_table_metadata has to take field_ided Iceberg Schema as the input.

@@ -906,6 +986,76 @@ def after_map_value(self, element: pa.Field) -> None:
self._field_names.pop()


class _ConvertToIcebergWithFreshIds(PreOrderPyArrowSchemaVisitor[Union[IcebergType, Schema]]):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ouch, I still think we don't want to duplicate this whole visitor. Maybe include @HonahX his suggestion that he mentioned earlier.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

:) I remember @HonahX 's suggestion, but I'm not sure if we were able to find a way to avoid code duplication. In either case, we need to create a new pre-order visitor that assigns IDs... currently, all visitors are handled post-order.

Please let me know if I missed anything we've discussed!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I see. At least we could call the primitive() of the other visitor, so we don't have to duplicate the mapping of Iceberg to Arrow primitives, because that's the part that concerns me the most.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds great. Let me refactor that component :)

Copy link
Contributor

@HonahX HonahX Jan 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since now we put everything behind the Catalog.create_table API, does the new visitor still need to be pre-order? The new_table_metadata will re-assign ids via a pre-order visitor later in the create_table workflow

def new_table_metadata(
schema: Schema,
partition_spec: PartitionSpec,
sort_order: SortOrder,
location: str,
properties: Properties = EMPTY_DICT,
table_uuid: Optional[uuid.UUID] = None,
) -> TableMetadata:
fresh_schema = assign_fresh_schema_ids(schema)
fresh_partition_spec = assign_fresh_partition_spec_ids(partition_spec, schema, fresh_schema)
fresh_sort_order = assign_fresh_sort_order_ids(sort_order, schema, fresh_schema)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are all good suggestions @HonahX . Thank you for taking the time to write out the detailed response 😄 .

I'm not too keen on assigning 'random' IDs in post-order in _ConvertToIceberg or in _ConvertToIcebergWithFreshIds because that'll make it more difficult to write out the correct 'expected' outcome for the test case of the visitor when it is assigning fresh and unique IDs. Although it is deterministic, I think it's a bit difficult to leave logic that exists only for intermediary purpose, that is difficult to understand.

I also do not know the exact reason and context here. Similarly, Java side does something more than a simple pre-order traversal. Looks like the java implementation can be traced back to the very beginning of iceberg.

If there's no additional context, I'm taking the liberty to entertain the idea of refactoring _SetFreshIDs visitor by analyzing its existing functionality, which is just to:

  1. assign IDs in pre-order
  2. track old field_id to reserved_id mapping just to ensure that old identifier_fields are mapped to the new ones at the end

If functionality (2) is the only reason we have the requirement of the field_ids needing to be unique... how do you folks feel about the idea of refactoring _SetFreshIDs to remove this constraint by using a set to keep track of identifier_field_ids when we _get_and_increment (assign) new IDs, and instead assign all field_ids as '-1' using _ConvertToIceberg/_ConvertToIcebergFreshIDs? And then use the refactored _SetFreshIDs visitor to assign the IDs in order?

class _SetFreshIDs(PreOrderSchemaVisitor[IcebergType]):
    """Traverses the schema and assigns monotonically increasing ids."""

    identifier_field_ids: Set[int]

    def __init__(self, next_id_func: Optional[Callable[[], int]] = None) -> None:
        self.identifier_field_ids= set()
        counter = itertools.count(1)
        self.next_id_func = next_id_func if next_id_func is not None else lambda: next(counter)

    def _get_and_increment(self, current_id: int) -> int:
        new_id = self.next_id_func()

        if current_id in identifier_field_ids:
            identifier_field_ids.pop(current_id)
            identifier_field_ids.add(new_id)
        return new_id

Copy link
Contributor

@HonahX HonahX Jan 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @syun64 for sharing the comprehensive analysis!

assign IDs in pre-order

Just want to make sure we are on the same page, my understanding is that _SetFreshIds is a little different from a normal pre-order visitor because it increment the counter for each field and then visit children. This makes it also rely on reserved_ids to store new ids for fields on the same level. We want to maintain this behavior when refactoring the _SetFreshIds. When refactoring, we could probably do it like the jave implementation

assign all field_ids as '-1' using _ConvertToIceberg/_ConvertToIcebergFreshIDs? And then use the refactored _SetFreshIDs visitor to assign the IDs in order?

Overall, this approach sounds reasonable to me if we can find an easy way to refactor the _SetFreshIds. @Fokko, I'd appreciate your perspective on refactoring _SetFreshIds. Do you see any issues with this approach?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for your response! I'll give this a try and see how it turns out

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also noticed that the Python implementation differs from the Java side. I don't know what the historical reason for this is. I couldn't find anything on the original PR about why this was done that way apache/iceberg#5627. I'm okay with aligning this with the Java implementation.

Overall, this approach sounds reasonable to me if we can find an easy way to refactor the _SetFreshIds. @Fokko, I'd appreciate your perspective on refactoring _SetFreshIds. Do you see any issues with this approach?

That's an okay approach, as long as the visitor to do this is hidden inside the package. We should not expose setting -1 field IDs to the outside.

What I like about the current implementation is that the visitor can be used on its own. Converting a field with all -1 IDs doesn't provide much value on its own.

I would love to get this in with the 0.6.0 release to simplify the creation of tables.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would love to get this in with the 0.6.0 release to simplify the creation of tables.

Sounds good 👍 I'll try my best to bring this review to a consensus in the next few days. We already have a working version, so we just need to agree on the final details 😄

That's an okay approach, as long as the visitor to do this is hidden inside the package. We should not expose setting -1 field IDs to the outside.

What I like about the current implementation is that the visitor can be used on its own. Converting a field with all -1 IDs doesn't provide much value on its own.

I'm 🆗 with both approach, but I have a preference for the '-1' approach... and here's my current thought.

Both approaches will work, and in both approaches, we are introducing an intermediate state of the Iceberg Schema that should never be exposed. But what I like about this approach is that we are creating an intermediate Iceberg Schema that is very visibly wrong, that we could use to check the validity of the schema before we allow the erroneous intermediate state to materialize a lasting result. An example could be to introduce a new sanity check before writing the individual files, or before making a schema update, that checks that no two IDs are the same, or that the IDs are greater than 0.

Hence, I think the fact that the intermediate Iceberg Schema is "seemingly" correct actually feels more like a downside to me...

I'm curious to hear if that argument resonates with you or the rest of the group. Hopefully it will be the last discussion item before we settle on the final approach 🎉🎈

@Fokko
Copy link
Contributor

Fokko commented Jan 25, 2024

Could you also add a section to the documentation? Would be a pity if people aren't able to find this feature :)

@kevinjqliu kevinjqliu mentioned this pull request Jan 27, 2024
@Fokko Fokko added this to the PyIceberg 0.6.0 release milestone Jan 27, 2024
@Fokko Fokko changed the title create_table with PyArrow Schema create_table with a PyArrow Schema Jan 27, 2024
mkdocs/docs/api.md Outdated Show resolved Hide resolved
mkdocs/docs/api.md Outdated Show resolved Hide resolved
pyiceberg/catalog/__init__.py Show resolved Hide resolved
tests/catalog/test_base.py Outdated Show resolved Hide resolved
def test_pyarrow_schema_to_schema_fresh_ids_simple_schema(
pyarrow_schema_simple_without_ids: pa.Schema, iceberg_schema_simple: Schema
) -> None:
assert pre_order_visit_pyarrow(pyarrow_schema_simple_without_ids, _ConvertToIcebergWithFreshIds()) == iceberg_schema_simple
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there a need here to check for schema_id equality? Schema.__eq__ does not check for schema_id. schema_id should be 0 here

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a sharp observation @kevinjqliu :) However, I don't think there's a need to check for schema_id equality because a new schema is generated with INITIAL_SCHEMA_ID in both _SetFreshIDs and _ConvertToIcebergWithFreshIds. There's an ongoing discussion on this PR that might affect whether or not we keep this visitor, so I'll leave this suggestion untouched until we reach a consensus there:
#305 (comment)

sungwy and others added 4 commits January 27, 2024 14:35
Co-authored-by: Kevin Liu <kevinjqliu@users.noreply.github.com>
Co-authored-by: Kevin Liu <kevinjqliu@users.noreply.github.com>
assert expected == catalog._convert_schema_if_needed(schema)


def test_create_table_pyarrow_schema(catalog: InMemoryCatalog, pyarrow_schema_simple_without_ids: pa.Schema) -> None:
Copy link
Contributor

@HonahX HonahX Jan 28, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about combining this with the above test, so that we only need to verify if the created table contain the expected schema:

table = catalog.create_table(identifier..., schema=pyarrow_schema)
assert table.schema() == expected_schema

I think it is better to test the API directly instead of the internal method when possible. This might also resolve your concern about creating expected outcome for the "random" id assigned by whatever approach we finally choose to implement the _convert_schema_if_needed. WDYT?

("iceberg_schema_simple", "iceberg_schema_simple"),
("iceberg_schema_nested", "iceberg_schema_nested"),
("pyarrow_schema_nested_without_ids", "iceberg_schema_nested"),
],
Copy link
Contributor

@HonahX HonahX Jan 28, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to take advantage of pytest_lazyfixture
Like

@pytest.mark.parametrize(
'catalog',
[
lazy_fixture('catalog_memory'),
lazy_fixture('catalog_sqlite'),
],
)
def test_create_table_default_sort_order(catalog: SqlCatalog, table_schema_nested: Schema, random_identifier: Identifier) -> None:

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for the suggestion! :)

@@ -761,6 +761,32 @@ def primitive(self, primitive: pa.DataType) -> T:
"""Visit a primitive type."""


class PreOrderPyArrowSchemaVisitor(Generic[T], ABC):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
class PreOrderPyArrowSchemaVisitor(Generic[T], ABC):
class _PreOrderPyArrowSchemaVisitor(Generic[T], ABC):

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch - I missed this one. I'll remove this Visitor as its no longer in use

@@ -906,6 +932,21 @@ def after_map_value(self, element: pa.Field) -> None:
self._field_names.pop()


class _ConvertToIcebergWithNoIds(_ConvertToIceberg):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Style suggestion, feel free to ignore:

Suggested change
class _ConvertToIcebergWithNoIds(_ConvertToIceberg):
class _ConvertToIcebergWithoutIDs(_ConvertToIceberg):

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for the suggestion :)

@@ -1221,50 +1221,57 @@ def assign_fresh_schema_ids(schema_or_type: Union[Schema, IcebergType], next_id:
class _SetFreshIDs(PreOrderSchemaVisitor[IcebergType]):
"""Traverses the schema and assigns monotonically increasing ids."""

reserved_ids: Dict[int, int]
old_id_to_new_id: Dict[int, int]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think at some point we just want to align this with Java. But let's do that in a separate PR: https://github.com/apache/iceberg/blob/main/api/src/main/java/org/apache/iceberg/types/AssignFreshIds.java

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That makes sense - I'm having an aha moment here, and just wanted to summarize my thoughts...

I think we could include the work to enable passing the baseSchema as an input argument to the visitor to REPLACE TABLE support since that's the function that requires fetching the original IDs from the baseSchema. CREATE TABLE, which is the only operation we've supported so far doesn't need to compare any IDs against a baseSchema. In the java code, idFor function uses the existing ID from the baseSchema by searching it by name if it exists. If it doesn't exist, it assigns the new ID from the counter.

Another difference I see with AssignFreshIds in the java code is that it doesn't handle the reassignment of the identifier_field_ids within the visitor. Instead, it returns the schema object and searches the new identifier field IDs by name. Is there a reason we decided to step away from this approach in the Python implementation?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CREATE TABLE, which is the only operation we've supported so far doesn't need to compare any IDs against a baseSchema. In the java code, idFor function uses the existing ID from the baseSchema by searching it by name if it exists. If it doesn't exist, it assigns the new ID from the counter.

Exactly, we could just pass in an empty schema (to avoid all the null-checks).

Instead, it returns the schema object and searches the new identifier field IDs by name. Is there a reason we decided to step away from this approach in the Python implementation?

I don't think there was an explicit choice made there. The lookup by column name is correct, we do the same when evolving the schema:

_identifier_field_names: Set[str]

Copy link
Collaborator Author

@sungwy sungwy Jan 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah interesting! Does this mean we could remove this from _SetFieldIDs, and instead handle the identifier_field_names update in the corresponding operation that requires updating identifier_field_names to the new IDs? That would simplify this visitor even more 😄

Right now, only create_table uses this visitor and doesn't require identifier_field_names to be updated (it'll always be an empty list for a new table)

If UpdateSchema is already doing this within corresponding class that wraps the transaction, I think we could do the same in other operations that would need to do the same (like ReplaceTable) instead of keeping this feature within _SetFieldIDs

Copy link
Collaborator Author

@sungwy sungwy Jan 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried this out, and I think we might have to leave this logic in for this iteration.

In PyIceberg, a user can create a table using IcebergSchema, instead of a PyArrow schema. This means that the user can choose to include identifier_field_ids in the IcebergSchema that they define, and the current create_table or assign_fresh_ids function calls don’t have the logic to handle this outside of _SetFreshIDs

I think it might make sense to consolidate the way we update the identifier_field_ids on create_table, update_schema, replace_table and other similar operations in a future PR.

Comment on lines +1247 to +1253
NestedField(
field_id=field_id,
name=field.name,
field_type=field_type(),
required=field.required,
doc=field.doc,
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is cleaner to keep this part in the field method, since it creates a field, and the field() now doesn't return a field, but a type.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I realized that too...

I think the refactored behavior actually mirrors the Java code more closely now in how it takes the Type, and then creates the NestedField with the result of the field level Callable within the struct function call in order to assign the IDs first to the fields, and then to its children.

I think preserving this approach allows us to avoid introducing the new constraint of requiring the field IDs to be unique, and limits that restriction to just the correctness in the task of refreshing the identifier_field_ids, which requires referring to the field Name by index as well as by name.

@@ -906,6 +986,76 @@ def after_map_value(self, element: pa.Field) -> None:
self._field_names.pop()


class _ConvertToIcebergWithFreshIds(PreOrderPyArrowSchemaVisitor[Union[IcebergType, Schema]]):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also noticed that the Python implementation differs from the Java side. I don't know what the historical reason for this is. I couldn't find anything on the original PR about why this was done that way apache/iceberg#5627. I'm okay with aligning this with the Java implementation.

Overall, this approach sounds reasonable to me if we can find an easy way to refactor the _SetFreshIds. @Fokko, I'd appreciate your perspective on refactoring _SetFreshIds. Do you see any issues with this approach?

That's an okay approach, as long as the visitor to do this is hidden inside the package. We should not expose setting -1 field IDs to the outside.

What I like about the current implementation is that the visitor can be used on its own. Converting a field with all -1 IDs doesn't provide much value on its own.

I would love to get this in with the 0.6.0 release to simplify the creation of tables.

Copy link
Contributor

@Fokko Fokko left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this one is good to go. Would love to get @HonahX's opinion before moving this forward.

There is one discrepancy with the Java implementation that we want to fix, but maybe better to do that in a separate PR.

Copy link
Contributor

@HonahX HonahX left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM! I have one more comment about a test, but it should not block this PR.

properties=TEST_TABLE_PROPERTIES,
)
assert catalog.load_table(TEST_TABLE_IDENTIFIER) == table

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it may be better to have another assertion here:

assert table.schema() == iceberg_schema_simple

maybe also add fixture parametrization to include the pyarrow_schema_nested

This is not a big deal. So probably we can discuss/modify this in some follow-up PRs.

Currently, test_sql.py has covered this case:
https://github.com/apache/iceberg-python/pull/305/files#diff-fad2e9bc49f619f03bf0631fce12da1f524c786008dbdfc01a35f6caf19a7c01R161

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for the great suggestion @HonahX .

I just tried to update the test in test_base.py, but it requires us to use new_table_metadata instead of the existing mechanism of creating a new TableMetadataV1 instance, which creates a new uuid and breaks the cli/test_console.py tests.

So I think it might be simpler to fix this once we get #289 merged in

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for trying this!

@sungwy
Copy link
Collaborator Author

sungwy commented Jan 30, 2024

Looks good to merge from my side. I left my closing comments on all the open discussions. Thank you @Fokko @kevinjqliu and @HonahX for your reviews!

@HonahX
Copy link
Contributor

HonahX commented Jan 30, 2024

Thanks @syun64 for the great work! Thanks @Fokko and @kevinjqliu for reviewing!

@HonahX HonahX merged commit 02e6430 into apache:main Jan 30, 2024
6 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants