-
Notifications
You must be signed in to change notification settings - Fork 1.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Ensure ordering of offset commit requests #2941
Conversation
Add locking to prevent concurrent calls to commit from potentially being re-ordered between determining which offsets to include in the offset commit request, and sending the request to Kafka. Move finding the coordinator before creating the request to avoid holding the lock in the case the coordinator is unknown. This requires a change to the "new offset manager" test, which previously didn't expect the mock broker to receive a find coordinator request. Fixes: IBM#2940 Co-authored-by: Michael Burgess <michburg@uk.ibm.com> Signed-off-by: Adrian Preston <PRESTONA@uk.ibm.com>
e675806
to
ef1e100
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fix LGTM, though it would be helpful if we could possibly get a reproduce of the out-of-order commits into the FV suite in a followup PR?
Yep. Good point. The manual reproduce needed quite a few repetitions before we hit the window, but we can take a look and see what an FV suite test would look like, and how often it can be made to detect the condition. |
This test simulates multiple concurrent commits for a single group running in a single process. The expected behavior is that commits are delivered in order to the mock broker, with no offset in the OffsetCommitRequest being for a lower value than previously committed for the partition. Signed-off-by: Adrian Preston <PRESTONA@uk.ibm.com>
I've pushed an extra commit that adds a unit test for validating commit ordering. Compared to the FV suite, a unit test has the advantage that it can really hammer the offset commit code-path from multiple goroutines. YMMV but for me the test fails after about 10-20 offset commits. |
offset_manager_test.go
Outdated
func() { | ||
mu.Lock() | ||
defer mu.Unlock() | ||
if outOfOrder == "" { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Prefer reordering this to an early return/guard condition:
if outOfOrder != "" {
return
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Superseded by switch to sync/atomic.Pointer
offset_manager_test.go
Outdated
outOfOrder = | ||
fmt.Sprintf("out of order commit to partition %d, current committed offset: %d, offset in request: %d", | ||
partition, last, offset.offset) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider using a sync/atomic.Value
with a pointer to a struct{ partition, last, offset IntType }
shape struct, to avoid the inherent serialization caused by mutex locking.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good idea. I ended up using an atomic.Pointer
, as it avoids the need to make a type assertion when calling Load()
.
offset_manager_test.go
Outdated
const numPartitions = 10 | ||
const commitsPerPartition = 1000 | ||
|
||
wg := &sync.WaitGroup{} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Prefer simply declaring a variable var wg sync.WaitGroup
rather than assigning a zero value / pointer to zero value.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
offset_manager_test.go
Outdated
defer safeClose(t, testClient) | ||
om, err := NewOffsetManagerFromClient("group", testClient) | ||
if err != nil { | ||
t.Error(err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can the test keep going after this? om
is almost certainly a nil pointer, so the first use is going to nil
pointer exception, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well spotted. Substituted with t.Fatal(...)
.
offset_manager_test.go
Outdated
mu.Lock() | ||
defer mu.Unlock() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This lock should be unnecessary. The wg.Wait()
ensures all writes should be complete, and performs all the synchronization necessary.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Superseded by switch to sync/atomic.Pointer. The intent was to ensure that the error message assigned to outOfOrder
(by the goroutine running the handler in the mock broker) was guaranteed to be observed by the goroutine running the test function.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I figured it was added out of an abundance of caution. The information here (helpful apart from even this code review) is that the wg.Wait()
itself is synchronizing, as all calls to wg.Done()
happen before the return of wg.Wait()
, the same as if you had used a mutex.
offset_manager_test.go
Outdated
// Test that the correct sequence of offset commit messages is sent to a broker when | ||
// multiple goroutines for a group are committing offsets at the same time | ||
func TestOffsetManagerCommitSequence(t *testing.T) { | ||
mu := &sync.Mutex{} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as below, just go ahead and declare the variable instead: var mu sync.Mutex
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Superseded by switch to sync/atomic.Pointer - which I've declared as per your suggestion.
Specifically: - Simplify variable declarations for zero-value values - Use sync/atomic.Pointer instead of sync.Mutex - t.Error -> t.Fatal to avoid continuing and de-referencing nil - Remove unused named return values (copy and paste error) Signed-off-by: Adrian Preston <PRESTONA@uk.ibm.com>
@puellanivis, thanks for taking the time to review. I hope I've addressed your concerns in 21e6e58. I've also cherry-picked this change onto #2947 to keep that consistent. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks acceptable.
Superseded by 2947 |
Add locking to prevent concurrent calls to commit from potentially being re-ordered between determining which offsets to include in the offset commit request, and sending the request to Kafka.
Note: we think this can be improved upon (by not adding more locks). A potential improvement would be to use the broker lock, and only hold this until the request is sent (avoiding holding a lock for the entire network round-trip time). However, it looks like #2409 has made sendAndReceive hold the broker lock until a response is received from the broker, so we need to understand that a little better first.
Fixes: #2940