-
Notifications
You must be signed in to change notification settings - Fork 1.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Consumer: check offset before returning ConsumePartition. #418
Conversation
When calling ConsumePartition, always check whether the offset is within the offset range. This means we now have to do two OffsetRequests for every ConsumePartition call, even if the offset is provided. The good news is that the method will immediately return an error and never start a goroutine, instead of starting the goroutine and returning an error in the Errors() channel which you can easily ignore.
@Shopify/kafka I had to disable one consumer test due to concurrency issues on the mock broker. Ideas on how to overcome this are welcome. |
@@ -298,6 +350,8 @@ func TestConsumerInterleavedClose(t *testing.T) { | |||
} | |||
|
|||
func TestConsumerBounceWithReferenceOpen(t *testing.T) { | |||
t.Skip("This is not yet working due to concurrency on the mock broker") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You'll have to add four offset responses (newest/oldest for two partitions) but I didn't think this test was particularly fragile. It should be ok as long as you don't return the second offset pair until after the first ConsumePartition
call has returned.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The problem is that the two consumers are talking to the same broker. The fetch request/responses that happen right after the offset responses, but we have to ensure we handle the offset requests for both consumers before we start dealing with the fetch requests. AFAICS that's not possible right now.
I believe I fixed the test you had skipped. If travis agrees, this LGTM. |
There's probably some test cleanup we can do, but I'll leave that for a separate PR. |
Consumer: check offset before returning ConsumePartition.
When calling ConsumePartition, always check whether the offset is within the offset range. This means we now have to do two OffsetRequests for every ConsumePartition call, even if the offset is provided. The good news is that the method will immediately return an error and never start a goroutine, instead of starting the goroutine and returning an error in the Errors() channel which you can easily ignore.
This replaces #378. It now uses two OffsetRequests instead of changing OffsetRequests to allow obtaiing multiple offsets for a single partition.