Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create Iceberg Table from pyarrow Schema with no IDs #278

Closed
sungwy opened this issue Jan 18, 2024 · 11 comments
Closed

Create Iceberg Table from pyarrow Schema with no IDs #278

sungwy opened this issue Jan 18, 2024 · 11 comments
Assignees

Comments

@sungwy
Copy link
Collaborator

sungwy commented Jan 18, 2024

Feature Request / Improvement

I see three ways a user would want to create an Iceberg table:

  1. Completely manual - by specifying the schema, field by field
  2. By inferring the schema from an existing strongly-typed file or pyarrow table
  3. By copying the schema of an existing iceberg table (migration)

create_table function currently takes a pyiceberg.Schema as the input. The existing visitors support patterns (1) and (3), but not (2).

This is because the creation of a pyiceberg.Schema is only supported in the following two ways:

  1. From a pyarrow schema with valid field-id metadata
  2. using a NameMapping which have field-ids. Currently, the only way to create a NameMapping is by constructing it field-id by field-id, or by using a utility function on an existing Iceberg Schema.

Therefore, we need to update an existing Visitor, or create a new Visitor in order to support the generation of a pyiceberg.Schema from a pyarrow Schema with no IDs.

On #219 the following approaches have been discussed so far:

  1. Update _ConvertToIceberg to create a pyiceberg.Schema from pyarrow schema by assigning "-1" field_ids and use _SetFreshIDs to assign ordered fresh IDs. This idea unfortunately does not work as _SetFreshIDs requires different IDs to track each column and assign new ones.
  2. Create a new Visitor _CreateMappingFromPyArrowSchema that creates a name mapping from PyArrow schema and assigns fresh IDs if it does not have one. This is different from existing _CreateMapping visitor which is a pyiceberg Schema visitor.
  3. Use a separate visitor _ConvertToIcebergWithFreshIds which assigns fresh IDs based on the order of the fields' appearance in the pyarrow schema.

When we are entertaining different ideas to reduce code duplication in the new visitor, we need to keep in mind that the task of assigning fresh IDs works best in a pre-order traversal order. This is how _SetFreshIDs works now. All the existing schema visitors discussed above that construct the NameMapping or pyiceberg Schema are done in post-order traversal order.

@sungwy sungwy mentioned this issue Jan 18, 2024
3 tasks
@Fokko
Copy link
Contributor

Fokko commented Jan 19, 2024

To add some more context. As also mentioned in the earlier conversation, I don't think assigning fresh IDs is safe: #219 (comment)

On the Java side, this is also being deprecated: apache/iceberg#9324

@sungwy
Copy link
Collaborator Author

sungwy commented Jan 19, 2024

Thank you for adding the context @Fokko :)

Just so that we make sure new readers aren’t confused, do you think it’s fair to say that we are talking about ‘assigning fresh IDs’ in two separate ways?

  1. Auto assigning fresh IDs in column order when reading iceberg table files
  2. Assigning fresh IDs for the purpose of new Iceberg table creation based on an arrow table that does not have field_ids

The discussions so far, and the PR above, have alluded to the fact that (1) above is dangerous and will not be supported in Python and Java. (2) however is necessary for applications to use PyIceberg in a similar way that Spark is being used. Spark Dataframes currently can be used to create new iceberg tables without field_id metadata.

In that sense, it will be helpful to design the function in a way that prevents users from introducing bad behaviors through (1) and limits the scope of these Visitor and function to its intended usage of (2)

@Fokko
Copy link
Contributor

Fokko commented Jan 19, 2024

What do you think of the following approach:

I think the problem here is that we don't have an API like in Spark where we can nicely hide things. I'm tempted to allow creating a table from a PyArrow table create_table_from_table(df: pa.Table), but that mixes in PyArrow into the main API, but refrains us from exposing these things to the user (which isn't super user friendly in general).

This would require:

  • We can add this to the Catalog, where we can generalize this for all the catalogs.
  • Can create a name mapping from a pa.Table, then we can use this name-mapping to assign new IDs on it. I think the outcome will be the same as the pre-order visitor, but we don't do it by position, but by name. We'll reuse the name-mapping that you already added 🙌

@sungwy
Copy link
Collaborator Author

sungwy commented Jan 19, 2024

That sounds good @Fokko

I think having a _CreateMappingFromPyArrowSchma preorder visitor does a good job of separating out the two concerns above.

I think the outcome will be the same as the pre-order visitor, but we don’t do it by position, but by name.

I think this bit about not doing it by position is catching me a bit off guard because I’m not convinced that we can assign ids without relying on the position when generating the name mapping. Just to make sure we are on the same page, this new Visitor will:

  1. Map field_ids from PyArrow Schema if the field_id exists
  2. Have a Boolean flag to _assign_fresh_ids by ignoring existing field_ids (or an automatic fallback to assign ids if field_ids don’t exist) and assign field ids by position

And then, we will use the name mapping generated from the pyarrow schema to assign field ids by name and create a new Iceberg Schema.

Does that approach sound consistent with your current thought?

@HonahX
Copy link
Contributor

HonahX commented Jan 21, 2024

Thanks for summarizing the approaches and explanation on the concerns.

I’m not convinced that we can assign ids without relying on the position when generating the name mapping.

Same question as @syun64 mentioned. My understanding is the _CreateMappingFromPyArrowSchma will be very similar to assign_fresh_schema_ids which incremently assigns new ids by position as we visit the given schema.

I have another related question. If we

  1. create a name-mapping from pa.Table
  2. use the name-mapping to generate a new iceberg schema
  3. use the new iceberg schema to create the table.

what do we do with the name-mapping created in step 1 after the table is created? Do we just discard it or put it in schema.name-mapping.default? If the later, I think we need either to update the new_table_metadata to not assign fresh ids when a name-mapping present or update the _SetFreshIds to respect name-mapping if given. I would appreciate any thoughts on this matter!

@sungwy
Copy link
Collaborator Author

sungwy commented Jan 22, 2024

what do we do with the name-mapping created in step 1 after the table is created? Do we just discard it or put it in schema.name-mapping.default? If the later, I think we need either to update the new_table_metadata to not assign fresh ids when a name-mapping present or update the _SetFreshIds to respect name-mapping if given. I would appreciate any thoughts on this matter!

Great question @HonahX my understanding is that the act of putting in a name mapping into schema.name-mapping.default isn't done automatically by any operation, and requires the user to actually insert the name mapping json as a table property into the iceberg table.

I think regardless of whether we create this visitor to create a name mapping (which in turn will be used to create an iceberg schema), or an iceberg schema directly, it will need have to have the ability to incrementally assign a new id by position. Because we are trying to create a new iceberg schema based on an arrow schema that does not have the field_id metadata.

Imagine we are trying to grab a 100 column parquet file from a vendor and create an Iceberg table based on it, and it doens't have PARQUET:FIELD_ID metadata on its columns. Currently, there's no way to create this iceberg table and ingest this data without manually coding and labelling each and every column using the Iceberg schema types to create an Iceberg schema.

@anupam-saini
Copy link
Contributor

Hello, I would like to put up a PR as per the discussion above if no one has started working already. Please let me know if this is fine. Also, @syun64 and I work together hence I can get up to speed quickly with the discussion.

@Fokko
Copy link
Contributor

Fokko commented Jan 23, 2024

Alright, I went to the source and talked with @danielcweeks and @rdblue. It looks like we made things more complicated than actually needed.

So when reading and writing Parquet, we need to make sure that the IDs are aligned properly. When we are working with runtime data (pa.Table's) then we match everything up based on names.

I also discussed with Dan about adding Arrow types to the create_table statement, and he liked the idea, where I was a bit reluctant. But thinking of it, I think it makes sense since it will allow us to create Iceberg tables from a dataframe:

catalog = load_catalog()
catalog.create_table('some.table', df=df)

We can just convert the schema, and assign fresh IDs.

And then:

# It will wire up the schema by name
tbl.overwrite(df)
# Should be quite easy with union by name:
tbl.append(df, merge_schema=True)

We don't want to keep the IDs around when we have the Arrow table. Tink of the situation where you read from a table, and then write to another table. You don't want to re-use the IDs.

Sorry for making this bigger than it actually was 🙏

@sungwy
Copy link
Collaborator Author

sungwy commented Jan 23, 2024

That makes sense @Fokko .

Just to make sure we are on the same page, does the following approach align with your thoughts?

We are proposing to update the create_table API to:

    def create_table(
        self,
        identifier: Union[str, Identifier],
        schema: Union[Schema, pa.sSchema],
        location: Optional[str] = None,
        partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC,
        sort_order: SortOrder = UNSORTED_SORT_ORDER,
        properties: Properties = EMPTY_DICT,
    ) -> Table:
    ...
    if isinstance(schema, pa.Schema):
        schema = pre_order_visit_pyarrow(schema, _ConvertToIcebergWithFreshIds())
    ...
    # existing code

We will call the function like:

table: pa.Table
catalog = load_catalog()
catalog.create_table('some.table', schema=table.schema)

And use the previously proposed Visitor: https://github.com/syun64/iceberg-python/blob/preorder-fresh-schema/pyiceberg/io/pyarrow.py#L994 since new_table_metadata has to take field_ided Iceberg Schema as the input?

@Fokko
Copy link
Contributor

Fokko commented Jan 25, 2024

@syun64 Yes that sounds like a reasonable proposal to me. On thing to mention. We would also like to use PyIceberg without Arrow, and we can do this by making the type annotation lazy:

    def create_table(
        self,
        identifier: Union[str, Identifier],
        schema: Union[Schema, "pa.Schema"],
        location: Optional[str] = None,
        partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC,
        sort_order: SortOrder = UNSORTED_SORT_ORDER,
        properties: Properties = EMPTY_DICT,
    ) -> Table:

@sungwy
Copy link
Collaborator Author

sungwy commented Jan 25, 2024

@syun64 Yes that sounds like a reasonable proposal to me. On thing to mention. We would also like to use PyIceberg without Arrow, and we can do this by making the type annotation lazy:

    def create_table(
        self,
        identifier: Union[str, Identifier],
        schema: Union[Schema, "pa.Schema"],
        location: Optional[str] = None,
        partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC,
        sort_order: SortOrder = UNSORTED_SORT_ORDER,
        properties: Properties = EMPTY_DICT,
    ) -> Table:

Oh! That's very neat. Thank you for the suggestion :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants