Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for delta atomic commit read #1079

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

hzzlzz
Copy link

@hzzlzz hzzlzz commented Apr 19, 2022

Description

This PR adds support for atomic commits read when reading data from a delta table.

PIP

Resolves #1026

How was this patch tested?

Unit tests

Does this PR introduce any user-facing changes?

Add a new config when reading delta as a streaming source

val q = spark.readStream
  .format("delta")
  .option("readAtomicCommits", "true")
  .load(path)

Co-authored-by: Liqiang Guo <guoliqiang2006@gmail.com>
Signed-off-by: Lizhezheng Zhang <hzzlzz@hotmail.com>
@hzzlzz
Copy link
Author

hzzlzz commented Apr 19, 2022

I wonder if there is an API to manually create multi-action commits with custom data.
We found some samples in DeltaLogSuite, but they seem for creating logs purely.
Appreciate any idea how this option should be implemented and tested!

@allisonport-db
Copy link
Collaborator

Hi @hzzlzz thanks for making this PR and writing up the PIP. We'll review this as soon as possible.

I wonder if there is an API to manually create multi-action commits with custom data.
We found some samples in DeltaLogSuite, but they seem for creating logs purely.
Appreciate any idea how this option should be implemented and tested!

I think testing it the same way we test maxFilesPerTrigger and maxBytesPerTrigger should be good. Is there a reason you want to do something else?

@hzzlzz
Copy link
Author

hzzlzz commented Apr 20, 2022

Hi @hzzlzz thanks for making this PR and writing up the PIP. We'll review this as soon as possible.

I wonder if there is an API to manually create multi-action commits with custom data.
We found some samples in DeltaLogSuite, but they seem for creating logs purely.
Appreciate any idea how this option should be implemented and tested!

I think testing it the same way we test maxFilesPerTrigger and maxBytesPerTrigger should be good. Is there a reason you want to do something else?

That is good. Please go ahead and review. The reason for asking is I need multi-action commits to test this option and I'm relying on repartition for creating that right now. But it seems not guaranteed to produce the exact number of files (for instance 2) if the data volume is small in my offline test so I write 50 items each time in the unit test.

@zsxwing
Copy link
Member

zsxwing commented May 4, 2022

@hzzlzz If I understand correctly, the behavior of readAtomicCommits would be if maxFilesPerTrigger or maxBytesPerTrigger causes us to stop in the middle of commit log file, readAtomicCommits will force us to continue to read the log file until we reach the end. Right?

@hzzlzz
Copy link
Author

hzzlzz commented May 5, 2022

@hzzlzz If I understand correctly, the behavior of readAtomicCommits would be if maxFilesPerTrigger or maxBytesPerTrigger causes us to stop in the middle of commit log file, readAtomicCommits will force us to continue to read the log file until we reach the end. Right?

Exactly! And in the case of isStartingVersion=true, it will read the whole snapshot in one micro-batch.

@zsxwing
Copy link
Member

zsxwing commented May 9, 2022

And in the case of isStartingVersion=true, it will read the whole snapshot in one micro-batch.

I'm a bit concerned about this. Loading the entire snapshot likely will cause stability issues. You mentioned that you built a service for Delta Lake: #1026 (comment) How do you plan to handle the case that a big snapshot may take down your service?

@guoliqiang
Copy link

guoliqiang commented May 10, 2022

And in the case of isStartingVersion=true, it will read the whole snapshot in one micro-batch.

I'm a bit concerned about this. Loading the entire snapshot likely will cause stability issues. You mentioned that you built a service for Delta Lake: #1026 (comment) How do you plan to handle the case that a big snapshot may take down your service?

This is a totally reasonable concern.
Because snapshot has lost previous transaction info, we finalized two solutions in our internal discussion.

  1. Do NOT support this new option (e.g. throw an exception) if the application use isStartingVersion=true
  2. Read the whole snapshot in one micro-batch (like @hzzlzz mentioned) when the application use isStartingVersion=true , and explicitly call out this behavior in the documentation to let the application make a decision.

We adopt No. 2 in our system because we have small/medium tables which is OK to read the whole snapshot, and this is our cold start use case too.
Not sure which one delta lake prefers (or any other idea)?

@zsxwing zsxwing self-assigned this May 10, 2022
@zsxwing
Copy link
Member

zsxwing commented May 13, 2022

I feel neither is ideal. We don't want to build a feature that a user can shoot themself easily. In a second thought, will your problem be solved by using Merge Into? We have a few examples in our doc to show how to use Merge Into to track updates and make idempotent changes a Delta table: https://docs.delta.io/latest/delta-update.html#merge-examples

@guoliqiang
Copy link

guoliqiang commented May 16, 2022

Thanks @zsxwing
Although our state store/sink is not delta tables due to various reasons (e.g. perf), as we mentioned on #1026, adding additional information (e.g. timestamp for each key) and do not perform hard-delete operation could also resolve our problem partly, we thought change streaming reader behavior is a cheapest & generic solution.

Thinking further, I thought only delta table stream source could provide such ability, Kafka/Eventhub has no straight forward way.

@zsxwing
Copy link
Member

zsxwing commented May 16, 2022

Thinking further, I thought only delta table stream source could provide such ability, Kafka/Eventhub has no straight forward way.

Yeah. If you would like to provide some general solution for data sources other than Delta, changing just Delta would not be sufficient. Looks like you have to add something to the data in order to achieve this. Do you still want to add this into Delta Lake? Otherwise, I'm inclined to close this one as I prefer to not add an unscalable solution if possible.

@hzzlzz
Copy link
Author

hzzlzz commented May 16, 2022

Thanks @zsxwing
To clarify, with this change, we don't have to add anything to the data, just Delta side change is sufficient. In our use case, there is within transaction data disordering caused by multiple worker nodes generating multiple parquet files in parallel. If the entire transaction is consumed in one micro-batch, there is no need to perform data dedup/reordering in a cross micro-batch manner, which would require a state store or state sink.

@guoliqiang
Copy link

Yeah. If you would like to provide some general solution for data sources other than Delta, changing just Delta would not be sufficient. Looks like you have to add something to the data in order to achieve this. Do you still want to add this into Delta Lake? Otherwise, I'm inclined to close this one as I prefer to not add an unscalable solution if possible.

Thanks @zsxwing
When I say "only delta table source could provide such ability", I intend to mean this option will be a unique feature for delta lake to attract customers :) . In fact, in our internal system, we are moving away from Kafka/EventHub to delta lake due to its cool ACID/unified Batch and Streaming read features.
Because our source is only delta table, we will not need to change the data with this option.
This is our thought. :)

@allisonport-db allisonport-db removed their request for review May 18, 2022 02:47
@allisonport-db allisonport-db removed their assignment May 18, 2022
@tdas tdas requested a review from scottsand-db May 19, 2022 19:16
@scottsand-db
Copy link
Collaborator

I feel neither is ideal. We don't want to build a feature that a user can shoot themself easily. In a second thought, will your problem be solved by using Merge Into? We have a few examples in our doc to show how to use Merge Into to track updates and make idempotent changes a Delta table: https://docs.delta.io/latest/delta-update.html#merge-examples

@zsxwing , what's the next step for figuring out what to do with these two options?

@zsxwing
Copy link
Member

zsxwing commented May 20, 2022

@scottsand-db since you are working on CDF and it actually needs to read a commit entirely, could you think about how to unify CDF and the normal streaming Delta source together?

@scottsand-db
Copy link
Collaborator

Hi @hzzlzz - as @zsxwing mentioned, I am working on CDF (Change Data Feed) #1105. The next PR I'm working on is CDF + Streaming.

One thing that CDF generates for UPDATE operations is pre-image and post-image row-level changes. For CDC reads of AddCDCFiles, we want to admit the entire commit or nothing at all (to avoid having the pre-image and post-image row-level changes in separate micro-batches). (For AddFile or RemoveFIle, we do not have this requirement).

So, I think it makes sense to wait for this CDF PR to be merged first (it's a WIP) and then you can add your read-atomic-commits to the latter part of CDF (for Add and Remove files).

Does this sound good?

@scottsand-db
Copy link
Collaborator

Update: that CDF PR is here: #1154

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

support streaming read all data or none for a transcation
5 participants