Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expose PyIceberg table as PyArrow Dataset #30

Open
Fokko opened this issue Oct 2, 2023 · 20 comments
Open

Expose PyIceberg table as PyArrow Dataset #30

Fokko opened this issue Oct 2, 2023 · 20 comments

Comments

@Fokko
Copy link
Contributor

Fokko commented Oct 2, 2023

Feature Request / Improvement

Migrated from apache/iceberg#7598:

Hi, I've been looking at seeing what we can do to make PyArrow Datasets extensible for various table formats and making them consumable to various compute engines (including DuckDB, Polars, DataFusion, Dask). I've written up my observations here: https://docs.google.com/document/d/1r56nt5Un2E7yPrZO9YPknBN4EDtptpx-tqOZReHvq1U/edit?usp=sharing

What this means for PyIceberg's API

Currently, integration with engines like DuckDB means filters and projections have to be specified up front, rather than pushed down from the query:

con = table.scan(
    row_filter=GreaterThanOrEqual("trip_distance", 10.0),
    selected_fields=("VendorID", "tpep_pickup_datetime", "tpep_dropoff_datetime"),
).to_duckdb(table_name="distant_taxi_trips")

Ideally, we can export the table as a dataset, register it in DuckDB (or some other engine), and then filters and projections can be pushed down as the engine sees fit. Then the following would perform equivalent to the above, but would be more user friendly:

dataset = table.to_pyarrow_dataset()
con.register(dataset, "distant_taxi_trips")
conn.sql(""""SELECT VendorID, tpep_pickup_datetime, tpep_dropoff_datetime
    FROM distant_taxi_trips
    WHERE trip_distance > 10.0""")

Query engine

Other

@RahulDubey391
Copy link

Hi @Fokko , I can have a look into the issue!

@stefnba
Copy link

stefnba commented Jan 8, 2024

Hi, is there any update on this topic? Thanks.

@jwills
Copy link

jwills commented Feb 1, 2024

Yo, just chiming in that we would love this for dbt-duckdb use cases-- thanks!

@jwills
Copy link

jwills commented Feb 1, 2024

(If this is a thing I can add, please lmk- I can be surprisingly useful)

@Fokko
Copy link
Contributor Author

Fokko commented Feb 1, 2024

Hey @jwills I think many folks are looking forward to this, so it would be great if you would be willing to spend time on getting this in 🙌

@jwills
Copy link

jwills commented Feb 1, 2024

sg @Fokko, will dive in here

@Fokko
Copy link
Contributor Author

Fokko commented Feb 1, 2024

Awesome, let me know if there are any questions. Happy to provide context

@jwills
Copy link

jwills commented Feb 1, 2024

@Fokko okay I'm read in here; is the best approach atm something like this comment from the original issue?

I looked at the code in PyIceberg again and I remembered an idea I had that I never tested. Right now, the implementation eagerly loads a table for every file-level projection and concats them. Would it be possible instead to create a pyarrow dataset for every file and return a union dataset that combines them? I've never touched these lower level features of PyArrow datasets before so this idea is all based on hazy recollection of source code reading from long ago.

If this is something PyArrow supports today (unioning datasets with different projection plans that produce the same final schema, without materializing a table), then it could be the easiest way to achieve the "pyiceberg returns a dataset that is compatible with iceberg schema evolution", at least for copy-on-write workloads.

Are there any dragons here or downsides to attempting this that would make it not worthwhile to attempt?

@stefnba
Copy link

stefnba commented Feb 1, 2024

Maybe this helps.

A PyArrow Dataset can be initiated from a list of file paths:

Create a FileSystemDataset from explicitly given files. The files must be located on the same filesystem given by the filesystem parameter. Note that in contrary of construction from a single file, passing URIs as paths is not allowed.

More info here.

To make it work, the corresponding cloud filesystem, e.g. pyarrow.fs.S3FileSystem has to be specified, see here.

@Fokko
Copy link
Contributor Author

Fokko commented Feb 1, 2024

Hey @jwills Having a union dataset feels like a step in the right direction to me, however I don't think it will really help when it comes to performance.

Loading the files through PyArrow is very slow at the moment. The biggest issue there is that we aren't able to do the schema evolution in pure Arrow. That's why we materialize to a table, do all the changes needed to the schema, and then we concat all the tables in the end. This is very costly to do in Python. The main issue here is that Arrow does not support fetching schema's/filtering through field-ids which is the basis of Iceberg.

A cleaner option would be to have the arrow dataset expose a protocol that we can implement. This was suggested a while ago, but they we're very reluctant on this and wanted to do everything through substrait.

@jwills
Copy link

jwills commented Feb 1, 2024

@Fokko agreed, the union approach seems like a perf killer. Will noodle on this a bit more-- thanks for the context here!

@Fokko
Copy link
Contributor Author

Fokko commented Feb 1, 2024

Just for context, don't know if it helps. I was recently playing by pushing the union of the tables into Arrow, including all the schema evolution. This would prevent PyIceberg from doing this itself which is slow. The idea was to create an empty table with the requested schema. And then union all the parquet files to it. With the new option in concat table to automatically do schema evolution. The missing part there is that Arrow cannot re-order struct fields :(

@jwills
Copy link

jwills commented Feb 1, 2024

That is helpful, thank you.

One other option I was considering on my side, given that I have access to https://github.com/duckdb/duckdb_iceberg : Using pyiceberg to fetch the metadata for an Iceberg table (like path and which snapshot to read) but then delegating the actual reading to the Iceberg scan operation built-in to DuckDB (which looks to me like it bypasses the arrow issues entirely.)

Do you have thoughts on that approach, or is it outside of your wheelhouse?

@Fokko
Copy link
Contributor Author

Fokko commented Feb 1, 2024

I'm always in for in for creative solutions. I think that would well, also my colleague did something similar: https://gist.github.com/kainoa21/f3d01c607fce2741cef22683048a22a3 which is a really nifty trick!

@wonnor-pro
Copy link

Hi team, do we have an update on this? We are really excited with this feature.

@mfatihaktas
Copy link

Just to note, we would also love this feature. It would allow us to support Iceberg read/write in Ibis.

@TiansuYu
Copy link
Contributor

TiansuYu commented Aug 23, 2024

A PyArrow Dataset can be initiated from a list of file paths:

Create a FileSystemDataset from explicitly given files. The files must be located on the same filesystem given by the filesystem parameter. Note that in contrary of construction from a single file, passing URIs as paths is not allowed.

I have the opposite idea in mind: a Pyarrow representation / dataset protocol should be something like a MemoryBuffer that offers a set of APIs available to every table implementation.

Then other query engine can then load the dataset via this "InMemoryDataset" (thats kind of my mental model for Arrow) as intermediary (not sure we want to expose this directly like this, could be something thats hidden and low level).

@noklam
Copy link

noklam commented Sep 25, 2024

Would love this feature, I am coming from ibis-project/ibis#7712 as well.

@kevinjqliu
Copy link
Contributor

@corleyma
Copy link

corleyma commented Oct 31, 2024

@kevinjqliu alas it's not as simple for iceberg because of the need to do field id-based projection to handle schema evolution.

Somewhat relatedly: from what I remember, and assuming nothing has changed, PyArrow Datasets can't have fragments with different input schema. PyArrow Dataset implementation assumes a dataset has a single schema and that it matches the incoming schema of the fragments, so e.g. you can't specify per-fragment projections.

I know @kevinjqliu understands this already, but for others who may want to pick up work on this in the future, here's what this means in practice:
Say you're reading an iceberg table snapshot, and the schema of that snapshot should be the schema of the PyArrow dataset (the "output schema" of the dataset scan). Since schema changes in Iceberg are metadata-only operations, it's possible that your table has e.g. historical partitions written with an older schema that have not been re-written. An Iceberg-compliant reader is supposed to project these old files to the new schema at read time. That means, in PyArrow Dataset terminology, you need fragments with potentially different schemas to be projected to the final schema of the dataset. And the set of things this has to account for includes e.g. column renames (would be handled by field id-based projection if pyarrow supported it), but also e.g. type widening (so e.g., old fragments have column foo stored as an int, new fragments store foo as long after a schema update; pyarrow dataset needs to cast foo to long when reading the old fragments).

There was a discussion long ago about creating a PyArrow Dataset Python protocol. The most recent discussion that I am aware of proposes that third parties could implement scanners with their own logic around an interface that accepts projections and filters expressed as substrait predicates, and returns one or more streams of arrow data. Libraries that consume PyArrow datasets would be able to consume these other implementations using the PyCapsule protocol without needing any special integration logic.

The PyArrow dataset protocol hasn't been adopted, but it would indeed allow pyiceberg to define a PyArrow Dataset-compatible scanner that does whatever it wants. However, unless pyiceberg lifted much of the underlying implementation into iceberg-rust, a pure-python dataset implementation might not be able to achieve the performance folks want. In a world where pyiceberg continues to rely mostly on pyarrow for native C++ implementations of performance-critical logic, pyiceberg would probably benefit most from extensions to the dataset implementation in PyArrow allowing folks to specify per-fragment projections to the final dataset output schema. That way all the actual concurrent scanning logic could be farmed out to pyarrow's native implementations.

So... there are a few threads folks could pull on to try to make this happen. If the goal is compatibility with the wide variety of execution engines (duckdb, polars, etc) that support pyarrow datasets today, this may still be a path worth pursuing. The alternative is continuing the development of engine-specific integrations but I have not seen a lot of real development for e.g., the duckdb_iceberg extension, which is still very incomplete, and in general that might still result in a lot of duplicative work.

If you don't care about which query engine, but just want at least 1 in-process query engine in python that can read iceberg great performance, I think following up with the effort to add FFI for TableProviders in datafusion and data-fusion python, and then helping out over in iceberg-rust to expose their IcebergTableProvider in Python (and making sure it has any of the features it needs to be performant) might actually be the quickest path forward.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

10 participants