Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Python: Support writer protocol V2 in write_deltalake #575

Closed
1 of 2 tasks
wjones127 opened this issue Mar 23, 2022 · 3 comments · Fixed by #834
Closed
1 of 2 tasks

Python: Support writer protocol V2 in write_deltalake #575

wjones127 opened this issue Mar 23, 2022 · 3 comments · Fixed by #834
Labels
enhancement New feature or request

Comments

@wjones127
Copy link
Collaborator

wjones127 commented Mar 23, 2022

Description

Most delta tables are now V2 by default, so writer isn't yet compatible with most systems.

Two features needed are:

Related Issue(s)
Umbrella issue:#542

@wjones127 wjones127 added the enhancement New feature or request label Mar 23, 2022
@PadenZach
Copy link
Contributor

@GraemeCliffe-inspirato Are you still interested in taking the append only part of this issue? If not, I may be able to look into it. If so I'll hold off and see if there's other good first issues for me on this :)

@WarSame
Copy link
Contributor

WarSame commented Apr 18, 2022

@PadenZach I'm still interested in the appendOnly section! Thank you for checking! I have just now got the project building and tests running, so I'm just starting on the issue

@wjones127
Copy link
Collaborator Author

My initial guess on what needs to be done on invariants is something like this:

# ...inside of write_deltalake
def iter_batches(data, invariants) -> Iterator[RecordBatch]:
    for batch in data:
        for sql_clause in invariants:
           res = execute(sql_clause, batch)
           if res != True:
               raise ValueError("Invariant violated: ...")
           yield batch

invariants = configuration["delta.invariants"]
data = convertToRecordBatchReader(data)

batch_iter = iter_batches(data, invariants)
# ... pass batch_iter to write_dataset

The hard part is we don't have a SQL parser in the deltalake package, so not sure how that execute() function would work. One option is to turn on the datafusion delta-rs option in Python (which I suspect we might do eventually anyways) and then implement execute() in Rust using datafusion. Moving the record batch temporarily into Rust with zero-copy should be possible (that's used by python-datafusion), but might take a little bit of glue code.

It's probably worth researching what typical invariants are used and allowed by existing engines. The spec is very vague, but it's likely the Spark implementation has a limited set of column types and operations we need to care about.

wjones127 added a commit that referenced this issue Sep 28, 2022
# Description

Adds support to retrieve invariants from the Delta schema and also a
struct `DeltaDataChecker` to use DataFusion to check them and report
useful errors.

This also hooks it up to the Python bindings, allowing
`write_deltalake()` to support Writer Protocol V2.

I looked briefly at the Rust writer, but then realized we don't want to
introduce a dependency on DataFusion. We should discuss how we want to
design that API. I suspect we'll turn DeltaDataChecker into a trait, so
we can have a DataFusion one available but also allow other engines to
implement it themselves if they don't wish to use DataFusion.

# Related Issue(s)

- closes #592
- closes #575

# Documentation


https://github.com/delta-io/delta/blob/master/PROTOCOL.md#column-invariants
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants