Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

LockClientError #2379

Closed
ravid08 opened this issue Apr 3, 2024 · 7 comments
Closed

LockClientError #2379

ravid08 opened this issue Apr 3, 2024 · 7 comments
Labels
binding/python Issues for the Python package bug Something isn't working storage/aws AWS S3 storage related

Comments

@ravid08
Copy link

ravid08 commented Apr 3, 2024

Environment: PyPI deltalake 0.16.0

Delta-rs version: deltalake 0.16.0

Binding: Python

Environment:

  • Cloud provider: AWS
  • OS: Linux
  • Other:

Bug

What happened:
I am using Lambda to write streaming data (from DynamoDB) to deltalake. Although I have disabled concurrent batches per shard, but I do not have full control on concurrent Lambda invocations, because Lambda polls each shard separately and may trigger another invocation within the same batch window. What I observed is, when there are concurrent Lambda invocations in the same second with each using write_deltalake to write to the same table path in S3, I get LockClientError::VersionAlreadyExists(77). Error occured: Failed to commit transaction: Metadata changed since last commit. And the particular batch of the data is not written to the table.
I was hoping Lock Provider could somehow get locks based on the full path including partition columns instead of securing lock at the table level, in which case this issue could be avoided.
Any suggestion how can I resolve this issue? Thanks!

What you expected to happen:
The expectation is to be able to load the data successfully without failures.

How to reproduce it:

More details:

@ravid08 ravid08 added the bug Something isn't working label Apr 3, 2024
@quartox
Copy link

quartox commented May 6, 2024

I ran into the same issue and used a full try/except on the write. I think for your case that could make future writes have many conflicts since will continue to conflict with the next batch until there is a time window with any conflicting writer.

In my testing having N concurrent writers increased the average write time by ~N. A single writer takes about 1 second per write and 10 writers each take about 10 seconds per write. Deltalake 0.17.2.

@rtyler rtyler added binding/python Issues for the Python package storage/aws AWS S3 storage related labels May 7, 2024
@rtyler
Copy link
Member

rtyler commented May 7, 2024

Concurrent writes had some issues, particularly in Lambda which the linked post discusses (I wrote it 😄 )

Basically "locking" before deltalake 0.17.x relied on a "full table lock" which caused the problems, especially in highly parallel environments like Lambda. Since then we have adopted the S3DynamoDbLogStore implementation which has a very different approach and does not perform a full table lock allowing for increased parallelism.

That said, ultimately the contention with Delta Lake on AWS S3 is around the table and its transaction log rather than partitions/etc. You may find it useful to serialize workloads via a queue upstream of the Lambda, or reduce the concurrency of the Lambda function to meet your needs.

I am going to close this because there's not much I believe we can do here.

@rtyler rtyler closed this as completed May 7, 2024
@ravid08
Copy link
Author

ravid08 commented May 7, 2024

I ran into the same issue and used a full try/except on the write. I think for your case that could make future writes have many conflicts since will continue to conflict with the next batch until there is a time window with any conflicting writer.

In my testing having N concurrent writers increased the average write time by ~N. A single writer takes about 1 second per write and 10 writers each take about 10 seconds per write. Deltalake 0.17.2.

Yes, even with a batch size that is well within payload size limits, duration increases consistently and within a few days reached the timeout. Increasing the timeout only delays the inevitable, so this solution is not viable unless I use something like Firehose maybe to do serial processing.

@ravid08
Copy link
Author

ravid08 commented May 7, 2024

Concurrent writes had some issues, particularly in Lambda which the linked post discusses (I wrote it 😄 )

Basically "locking" before deltalake 0.17.x relied on a "full table lock" which caused the problems, especially in highly parallel environments like Lambda. Since then we have adopted the S3DynamoDbLogStore implementation which has a very different approach and does not perform a full table lock allowing for increased parallelism.

That said, ultimately the contention with Delta Lake on AWS S3 is around the table and its transaction log rather than partitions/etc. You may find it useful to serialize workloads via a queue upstream of the Lambda, or reduce the concurrency of the Lambda function to meet your needs.

I am going to close this because there's not much I believe we can do here.

Thank you for the detailed write up! I guess this is not suitable for running in parallel using Lambda, so a FIFO queue upstream of Lambda or a Firehose downstream of Lambda will have to be implemented to enforce single writer to Deltalake.

@rtyler
Copy link
Member

rtyler commented May 7, 2024

There are a lot of usage of delta-rs in Lambda (I'm speaking at Data and AI Summit on the subject this year).

There are a few which I rely on heavily in production in the oxbow repository. For a lot of high throughput use-cases I have found that separating writing data files (parquet) and creating transactions can help for high throughput append use-cases.

For merge/update type scenarios, limiting the concurrency has been sufficient to provide decent throughput with concurrent Lambdas on 0.17.x and later.

@ravid08
Copy link
Author

ravid08 commented May 7, 2024

There are a lot of usage of delta-rs in Lambda (I'm speaking at Data and AI Summit on the subject this year).

There are a few which I rely on heavily in production in the oxbow repository. For a lot of high throughput use-cases I have found that separating writing data files (parquet) and creating transactions can help for high throughput append use-cases.

For merge/update type scenarios, limiting the concurrency has been sufficient to provide decent throughput with concurrent Lambdas on 0.17.x and later.

Great, will look forward to both your sessions when they become publicly available, now I regret not taking the pass to the Summit this year :) Will take a look oxbow as well. For something like a DynamoDB stream receiving hundreds of items every second, and the requirement to do append-only to Deltalake in S3, I am still trying to determine what the best solution is to load the data in near real-time.

@rtyler
Copy link
Member

rtyler commented May 7, 2024

Depending on what the cost profile looks like, take a look at using Kinesis Data Firehose, which can output parquet directly into S3. Then Oxbow can observe those files landing in the bucket and update the transaction log. Oxbow doesn't ever read data files, so it's very fast :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
binding/python Issues for the Python package bug Something isn't working storage/aws AWS S3 storage related
Projects
None yet
Development

No branches or pull requests

3 participants