Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(python): Add DataFrame.write_iceberg #15018

Open
wants to merge 8 commits into
base: main
Choose a base branch
from

Conversation

kevinjqliu
Copy link

@kevinjqliu kevinjqliu commented Mar 12, 2024

Resolves #14610

This PR introduces the write_iceberg function to the Dataframe API. write_iceberg requires a pyiceberg.table.Table object and a mode (either append or overwrite).

Note, partitioned write is currently not supported (blocked on iceberg-python #208)

Constructing the Iceberg catalog and Table object is left outside of this function. The test (test_write_iceberg) provides an example of creating an in-memory catalog and a Iceberg Table object.

Example

import polars as pl
df = pl.DataFrame(
    {
        "foo": [1, 2, 3, 4, 5],
        "bar": [6, 7, 8, 9, 10],
        "ham": ["a", "b", "c", "d", "e"],
    }
)

table_path = "/tmp/path/to/iceberg-table/"

# create catalog and table
from pyiceberg.catalog.sql import SqlCatalog
catalog = SqlCatalog(
    "default", uri="sqlite:///:memory:", warehouse=f"file://{table_path}"
)
catalog.create_namespace("foo")
table = catalog.create_table(
    "foo.bar",
    schema=df.to_arrow().schema,
)

df.write_iceberg(table, mode="overwrite")
print(table.location())

# read back the iceberg table 
pl.scan_iceberg(table).collect()

Files created

(.venv) ➜  py-polars git:(kevinjqliu/iceberg-write) tree /tmp/path/to/iceberg-table/                  
/tmp/path/to/iceberg-table/
└── default.db
    └── table
        ├── data
        │   └── 00000-0-3a2c5080-0e03-4dc5-a21a-73b8423ae181.parquet
        └── metadata
            ├── 00000-4ed15e3e-cafb-436c-b0ec-3bd08e6079ab.metadata.json
            ├── 00001-47503a3a-c2d9-4024-8605-28e92b6448c0.metadata.json
            ├── 3cd50fc2-83c6-4e5d-a68a-c34b56418631-m0.avro
            └── snap-3013071076850462756-0-3cd50fc2-83c6-4e5d-a68a-c34b56418631.avro

4 directories, 5 files

Copy link

codecov bot commented Mar 12, 2024

Codecov Report

All modified and coverable lines are covered by tests ✅

Project coverage is 79.89%. Comparing base (dddf0b7) to head (d2f64d2).
Report is 1 commits behind head on main.

Additional details and impacted files
@@           Coverage Diff           @@
##             main   #15018   +/-   ##
=======================================
  Coverage   79.88%   79.89%           
=======================================
  Files        1513     1513           
  Lines      203466   203471    +5     
  Branches     2892     2893    +1     
=======================================
+ Hits       162546   162555    +9     
+ Misses      40372    40369    -3     
+ Partials      548      547    -1     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@kevinliu-stripe
Copy link

waiting for pyiceberg 0.6.1 release, should be released soon, which will fix the schema issue described above

@mkarbo
Copy link

mkarbo commented May 3, 2024

@kevinliu-stripe fyi 0.6.1 has been released now :)

https://github.com/apache/iceberg-python/releases/tag/pyiceberg-0.6.1

@kevinjqliu kevinjqliu marked this pull request as ready for review July 2, 2024 19:04
@kevinjqliu kevinjqliu changed the title [WIP] Write support for Iceberg Write support for Iceberg Jul 2, 2024
Copy link
Member

@stinodego stinodego left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR. A few comments.

@@ -3834,6 +3835,32 @@ def unpack_table_name(name: str) -> tuple[str | None, str | None, str]:
msg = f"unrecognised connection type {connection!r}"
raise TypeError(msg)

def write_iceberg(
self,
table: pyiceberg.table.Table,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does this method take a Table and not a Pathlike, like other write functions?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Writing to an iceberg table requires creating the pyiceberg.table.Table object, which also requires an iceberg catalog.
I think it's best to delegate the creation of the catalog and table to be outside of this function and just pass in the table object.

The original implementation takes a str/Pathlike representing the "warehouse location" for the iceberg table. In Iceberg world, writing to a table usually requires an update to the catalog.

Comment on lines +3857 to +4080
data = self.to_arrow()

if mode == "append":
table.append(data)
else:
table.overwrite(data)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This logic is so simple that this does not warrant its own method, in my opinion.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The main goal is to implement the top-level write_iceberg function for the dataframe API. The actual implementation code can be moved somewhere else

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The point is that, if users already have an Iceberg table object, they can just write table.append(df.to_arrow). It's even shorter than df.write_iceberg(table, mode="append"). So there is not much added value to a write_iceberg method.

If there is some complex logic required to set up the iceberg table, or if to_arrow is not sufficient to handle all data types correctly, we can consider adding a write_iceberg to save all users some implementation hassle. But right now it doesn't seem warranted.

Copy link
Author

@kevinjqliu kevinjqliu Jul 4, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is true. Pyiceberg is well integrated with arrow. With an arrow dataframe and a PyIceberg table object, one can just invoke the pyiceberg write functions .append/.overwrite.

Given the above, is there still value in implementing a simple write_iceberg method?

Looking at the write_delta method, aside from the merge functionality, the function just pass data as arrow into the write_deltalake function.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there is some complex logic required to set up the iceberg table

There's a scenario where a user might want to write an Iceberg table to a location in blob store. In such case, the write_iceberg function can take care of creating an in-memory catalog and iceberg table object before writing.

That was the initial version of this PR
0801012

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like a write_iceberg would be worth it just by virtue of having full support from polars / parity with delta.

@stinodego stinodego changed the title Write support for Iceberg feat(python): Add DataFrame.write_iceberg Jul 4, 2024
@github-actions github-actions bot added enhancement New feature or an improvement of an existing feature python Related to Python Polars labels Jul 4, 2024
Comment on lines 174 to 180
df = pl.DataFrame(
{
"foo": [1, 2, 3, 4, 5],
"bar": [6, 7, 8, 9, 10],
"ham": ["a", "b", "c", "d", "e"],
}
)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This needs much more extensive testing. Try using the df fixture (it has a mix of many data types), or define your own dataframe with lots of different data types.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! I'll take a look at this fixture

Copy link
Member

@stinodego stinodego left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See comments. Needs testing on more different data types before this can be merged.

@glesperance
Copy link

This is great. Excited to try this out

@mkarbo
Copy link

mkarbo commented Aug 20, 2024

Excited to try this one out!

@kevinjqliu
Copy link
Author

hey @stinodego thanks for the previous review, could you take another look at this PR?

I've changed the test to use the df test fixture.
Currently, pyiceberg does not support time64[ns] and returning the dtype as Enum or Categorical. Hopefully, we can add those incrementally.

@mariotaddeucci
Copy link

Hey @kevinjqliu , pyiceberg now supports partition and hiddent partitions

The overwrite method supports a overwrite filter parameter, similar to delete_where on delta writer

@kevinjqliu
Copy link
Author

@mariotaddeucci thanks for the pointer. I want to get this simple implementation in ASAP and add the partitioned write later

@andreasbolstad
Copy link

Hey, really looking forward to this feature. Any reason progress has stopped / merging is blocked? @stinodego
Not to rush anyone, just bumping this PR as I don't want it to go stale.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or an improvement of an existing feature python Related to Python Polars
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Write support for Apache Iceberg
7 participants