Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Slowdown due to blocking track records in RecordProcessorImpl.processRecords #12

Open
trucnguyenlam opened this issue Jan 28, 2020 · 0 comments

Comments

@trucnguyenlam
Copy link

trucnguyenlam commented Jan 28, 2020

I have observed a problem using this library while processing Kinesis stream.
I currently have a stream of 20shards, and I want to read it from trim horizon, therefore I create 20 workers (same application name, different ids) to process the stream. In theory, 20 shards would allow me to read up to 2MB x 20 x 60 = 2400MB per minute, however, I only observer a maximum of ~ 1000MB. This was very weird hence I did an experiment by changing the code (commented out the trackRecords). Then the snapshot build could obtain the maximum read throughput (however, there is no checkpoint on DynamoDB table due to no tracking).

    abortStreamOnError("processRecords") {
      val records = transformRecords(processRecordsInput.records())

      //trackRecords(records)
      //checkpointIfNeeded(processRecordsInput.checkpointer())

      records.grouped(EnqueueBatchSize).foreach { r =>
        enqueueRecords(r)
        checkpointIfNeeded(processRecordsInput.checkpointer())
      }
    }

Could you please let me know is there anyway to increase the throughput without losing correctness?

Many thanks,
Truc

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Development

No branches or pull requests

1 participant