-
Notifications
You must be signed in to change notification settings - Fork 339
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Issue 823 > Use max_wait_time while sleeping in fetch_batches #825
Issue 823 > Use max_wait_time while sleeping in fetch_batches #825
Conversation
@ElenaHenderson let us know when this is ready for testing and review |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code looks good to me. @paneq can you verify that this actually resolves the issue?
@dasch some comments from me: a) It resolves the latency issue b) I had to explicitly add
c) I am worried we are not adding here any test issue to cover this situation. Does this gem have any end-to-end tests that could be utilized to measure latency so that similar problem is not introduced in the future. d) Honestly, I still fail to understand how this gem (or Kafka) works internally. Particularly, I don't understand why between fetch requests which receive data from last 0.1s (configured Code: #producer
loop do
kafka.deliver_message({time: Time.now.to_f}.to_json, topic: "greetings", key: "Hello")
puts "delivered"
sleep(0.005)
end
# consumer
consumer.each_message(min_bytes: 1, max_wait_time: 0.1) do |message|
now = Time.now.to_f
sent = JSON.parse(message.value)['time']
diff = now - sent
puts("diff: #{diff.round(2)}")
end Here is my output:
If I change my producer to produce less often: loop do
kafka.deliver_message({time: Time.now.to_f}.to_json, topic: "greetings", key: "Hello")
puts "delivered"
sleep(0.2)
end and keep the consumer the same: consumer.each_message(min_bytes: 1, max_wait_time: 0.1) do |message|
now = Time.now.to_f
sent = JSON.parse(message.value)['time']
diff = now - sent
puts("diff: #{diff.round(2)}")
end Then there are less of those fetch request between actual processing.
Do I understand correctly that there is on thread doing data fetching, and a different thread doing data processing? And that the processing thread sleeps when there is nothing to process and while it sleeps we might get data from multiple fetch requests (because of min_bytes: 1) ? |
Awesome!
Does that only happen on this branch or also on master?
There are no tests for latency. There are end-to-end tests, but it can be difficult to make them reliable because Kafka can be a bit unpredictable.
That's exactly right. There's a buffer between the processor and the fetcher that gets pushed to even when the processor is busy – this greatly improves throughput. The sleep was done in order to "back off" when there's nothing to process in order to avoid pegging the CPU. It was a too high sleep duration, making it match |
@paneq Thank you for testing this change. I am ready to merge whenever you give me a green flag and satisfied with everything. |
Only on this branch.
Let's try not to add them in this fix, but would you be OK with having them in general if someone could make them reliably work?
Got it. In such case, I think the sleep should 1/Nth of |
Do you have a small reproducible example?
I would love that!
What specific sleep duration would you want? |
@dasch I finally had a moment to calculate it with a very sophisticated algorithm ;) On my Macbook Pro (which I know is not representable):
So I believe the sleep should be way lower than |
@dasch I know this PR is merged, but what do you think about my previous comment. |
I think 0.005 is way too low. We have dozens of these processes running, also on our laptops as part of our dev environment. I would be happy with a PR that adds an optional configuration key for this, but using |
This an attempt to fix #823 that causes consumer to sleep for 2 seconds and thus causing an avg 1.5 second pause between when a message is published and then started to be processed by a consumer.
Note that my fix is based on research done by https://github.com/paneq in the above issue.