Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Proposal to create logical plans for operations #2006

Open
Blajda opened this issue Dec 31, 2023 · 6 comments
Open

Proposal to create logical plans for operations #2006

Blajda opened this issue Dec 31, 2023 · 6 comments
Assignees
Labels
binding/rust Issues for the Rust crate enhancement New feature or request

Comments

@Blajda
Copy link
Collaborator

Blajda commented Dec 31, 2023

Description

Propose further work that I'd like to perform regarding the creation reusable logical relations. Also helps with identifying relations we would need with substrait.

Delta Find Files
Purpose: Identify files that contain records that satisfy a predicate.

This relation will generate a record batch stream with a single column called path. path will then map to an Add action in the Delta table.
This relation will also maintain a list of files that satisfy the predicate which can be passed sideways to relations downstream.

Delta Scan
Purpose: Scan the Delta Table

Update DeltaScan to take an optional input stream that contains paths of files to be scanned. This will enable DeltaScan to consume output of DeltaFindFile.
Currently when using find files, we must wait for the entire operation to complete and then we build the scan. The change enables Delta Scan to start when the first candidate file is identified.
I think this will require some significant work since it will involve refactoring the current DeltaScan implementation.

Delta Write
Purpose: Write records to storage, conflict resolution, and commit creation

Takes an single input stream of data that matches that tables schema and creates Add actions for each new file.
Information can be passed sideways to include additional delta actions to add to the commit. E.G DeltaDelete can provide a stream of Remove actions.

Delta Delete
Purpose: Delete Records from the table.

Given a predicate delete records from the Delta table.
Delta Delete can take an optional stream of records and will output records that do NOT satisfy the predicate.
It will maintain a stream of Remove actions can be passed sideways to other operations downstream.

The input stream is optional since there are cases where delete determine which files to remove without a need for a scan. An optimization phase can help determine when this is the case.

Diagram

High level diagram of how these relation will connect.

               ┌───────────────────────┐
               │   Delta Find Files    │
               │                       │
               │  Predicate:           │
           ┌───┤    Version:           │
           │   │                       │
           │   └──────────┬────────────┘
           │              │
           │              ▼
   Files   │   ┌───────────────────────┐
  Matched  │   │     Delta Scan        │
   List    │   │                       │
           │   │   Version:            │
           │   │                       │
           │   │                       │
           │   └──────────┬────────────┘
           │              │
           │              ▼
           │   ┌───────────────────────┐
           └──►│     Delta Delete      │
               │                       │
               │  Predicate:           │
           ┌───┤                       │
           │   └──────────┬────────────┘
  Remove   │              │
  Actions  │              ▼
           │   ┌───────────────────────┐
           │   │     Delta Write       │
           └──►│                       │
               │                       │
               │                       │
               └───────────────────────┘

Converting the ReplaceWhere operation to a logical view can look something like this


               ┌───────────────────────┐
               │   Delta Find Files    │
               │                       │
               │  Predicate:           │
           ┌───┤    Version:           │
           │   │                       │
           │   └──────────┬────────────┘
           │              │
           │              ▼                     ┌────────────────────────────┐
   Files   │   ┌───────────────────────┐        │        Data  Source        │
  Matched  │   │     Delta Scan        │        │                            │
   List    │   │                       │        │                            │
           │   │   Version:            │        └────────────┬───────────────┘
           │   │                       │                     │
           │   │                       │                     ▼
           │   └──────────┬────────────┘        ┌────────────────────────────┐
           │              │                     │    Delta Constraint Check  │
           │              ▼                     │                            │
           │   ┌───────────────────────┐        └────────────┬───────────────┘
           └──►│     Delta Delete      │                     │
               │                       │                     │
               │  Predicate:           │                     │
           ┌───┤                       │                     │
           │   └──────────┬────────────┘                     │
  Remove   │              │                                  │
  Actions  │              └────────────────┐   ┌─────────────┘
           │                               ▼   ▼
           │       ┌──────────────────────────────────────────────────────────┐
           │       │                      Union                               │
           │       │                                                          │
           │       └─────────────────────────┬────────────────────────────────┘
           │                                 │
           │                                 ▼
           │                      ┌───────────────────────┐
           │                      │     Delta Write       │
           └──────────────────────┤                       │
                                  │                       │
                                  │                       │
                                  └───────────────────────┘

Use Case

Once we have logical plans for Update and Delete we can expose new Datafusion SQL statements for them
May help with reuse of Delete & Update other for logical plans.

Related Issue(s)

@Blajda Blajda added the enhancement New feature or request label Dec 31, 2023
@Blajda Blajda changed the title Propsal to create logical plans for operations Proposal to create logical plans for operations Dec 31, 2023
@roeap
Copy link
Collaborator

roeap commented Jan 1, 2024

Great writeup @Blajda - I am a great fan of going the logical plan route for all our operations.

In another repo, I have been experimenting a bit with the end to end flow starting with parsing delta-specific SQL and will upstream some of these changes soon. Reading through your proposals, it seems the "find files" plan would essentially contain the logic from kernel and the concept of a Snapshot. Some of the work I have been doing around moving the state management to Arrow RecodBatch'es touches some of that I think.

Essentially the plan is to make out state less eager and hopefully improve processing performance along the way. The logic is as follows.

  • create a LogSegment - a list of commit and checkpoint files relevant for a given version.
  • On creation of a Snapshot replay the Log, to find the Protocol and Metadata actions ASAP - discard everything else.
    • I recently discovered some updates in Datafusion around NDJSON reading, that we should leverage.
  • on replay / scan - essentially whenever a file list is requested - "replay" (find relevant files based on log reconciliation rules / predicate

I think this would pay into the proposed DeltaFindFiles and DeltaScan operations and also support the described steam processing to start.

As a "corollary" of this work we also have some progress on a much improved parquet reader the supports selective reading of leaf columns (also for nested structs) as well as more fine-granulary casting the schema - i.e. support schema evolution.

Good news is, recently we have been consolidating the used APIs from the current DeltaTableState to become a snapshot. With the removal of parquet2 support in #1995 we will also greatly simplify log parsing by using the existing arrow integration in parquet crate, and eleminating the need to maintain manual parsing.

Implicity your proposal also moves us to keep data much longer as RecordBatches and avoid crating actual Add, Remove structs etc. which I strongly support as well :).

As you said, this will be a lot of work, but also generate a lot of impact!

@ion-elgreco ion-elgreco added the binding/rust Issues for the Rust crate label Jan 1, 2024
@Blajda
Copy link
Collaborator Author

Blajda commented Jan 1, 2024

As a "corollary" of this work we also have some progress on a much improved parquet reader the supports selective reading of leaf columns (also for nested structs) as well as more fine-granulary casting the schema - i.e. support schema evolution.

Is this work available anywhere? To start I want to update DeltaScans implement to manage parquet reading itself instead of depending on depending entirely on Datafusion's parquet reader. We have multiple bugs caused by non-compliant writers (spark) that write timestamp info using Int96. This can be resolved by having an 'adapter' to conform data to the correct type and can help with 'Big' datatypes that are also used in the ecosystem. The 'adapter' will also align with schema evolution since not every parquet file will have the same physical schema.


I'm aligned with changes being made for LogSegment and Snapshot. Something that might beneficial is to have some sort of local 'registry'/'cache' that given a physical name / 'table handle' and version returns a shared structure in memory to reduce our overall memory footprint and network calls. When building operators there are some cases were clone Snapshot to satisfy the borrow checker.

@Blajda
Copy link
Collaborator Author

Blajda commented Jan 1, 2024

To clarify about operations needing to clone Snapshot. Operations are pinned to a particular version of the table and this data is read is multiple places however when the operation ends we return a table with a new version. The snapshot of the new table is cloned from the previous. Suppose each snapshot takes 10 units of memory we now may consume 20 units.
It would be neat if we can build some of 'linked list' where snapshots can share data and each version would just maintain a logical view of which segments they need.

@hntd187
Copy link
Collaborator

hntd187 commented Jan 26, 2024

So, part of #2095 I was suggesting was refactoring the builders to pass back their actions instead of doing everything in their own internal way. I'm in agreeance that moving to this method would help a lot for generalizing execution, but at a slightly higher level how do we think this would alter operation builders? Their public facing API obviously would not change, but internally they would pass back actions and an execution plan? Or does the execution plan also take care of actions and commit log updates?

@Blajda
Copy link
Collaborator Author

Blajda commented Jan 27, 2024

@hntd187 The execution plan would also take care of the actions and commit log updates at this point. In the above diagrams this would have been done in the Delta write relation however we can consider splitting write and commit into two different relation if it makes it easier.

@hntd187
Copy link
Collaborator

hntd187 commented Jan 28, 2024

Well I suppose the logical plans don't need to care about the details of what has to happen with actions, but the physical ones will need to be able to pass that context along. I don't have a strong opinion about where the write and commit happen, just that you can compose the actions of various logical operators into a single planned write and commit. Hopefully that makes sense.

@rtyler rtyler self-assigned this Feb 26, 2024
Blajda pushed a commit that referenced this issue Mar 24, 2024
# Description
Some of my first workings on David's proposal in #2006, this is also
meant to push #2048 and general CDF forward as well by making the
logical operations of delta tables more composable than they are today.

# Related Issue(s)
#2006 
#2048 

I think and @Blajda correct me here, we can build upon this and
eventually move towards a `DeltaPlanner` esq enum for operations and
their associated logical plan building.

# Still to do

- [ ] Implement different path for partition columns that don't require
scanning the file
- [ ] Plumbing into `DeltaScan` so delta scan can make use of this
logical node
- [ ] General polish and cleanup, there are lots of unnecessary fields
and way things are built
- [ ] More tests, there is currently a large integration style end to
end test, but this can / should be broken down
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
binding/rust Issues for the Rust crate enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

5 participants