Skip to content

Commit

Permalink
fix: restore (*OffsetCommitRequest) AddBlock func
Browse files Browse the repository at this point in the history
In v1.34.0 a breaking API change to (*OffsetCommitRequest) AddBlock was
inadvertently introduced by 59a3d39
We weren't aware that anyone was driving the offset commit protocol
directly via the broker.go call rather than via offset_manager.go (or
via a consumer client)

For now we restore the old AddBlock signature and move the new one to
AddBlockWithLeaderEpoch. This will unfortunately impact anyone who had
updated their own code to call the new func signature, but it's probably
more important that we restore the longer term backwards compatibility
now until we move to a v2 release.

Fixes #2358
  • Loading branch information
dnwe committed Jul 17, 2023
1 parent c2cab9d commit 1d949d6
Show file tree
Hide file tree
Showing 3 changed files with 9 additions and 5 deletions.
6 changes: 5 additions & 1 deletion offset_commit_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,11 @@ func (r *OffsetCommitRequest) requiredVersion() KafkaVersion {
}
}

func (r *OffsetCommitRequest) AddBlock(topic string, partitionID int32, offset int64, leaderEpoch int32, timestamp int64, metadata string) {
func (r *OffsetCommitRequest) AddBlock(topic string, partitionID int32, offset int64, timestamp int64, metadata string) {
r.AddBlockWithLeaderEpoch(topic, partitionID, offset, 0, timestamp, metadata)
}

func (r *OffsetCommitRequest) AddBlockWithLeaderEpoch(topic string, partitionID int32, offset int64, leaderEpoch int32, timestamp int64, metadata string) {
if r.blocks == nil {
r.blocks = make(map[string]map[int32]*offsetCommitRequestBlock)
}
Expand Down
6 changes: 3 additions & 3 deletions offset_commit_request_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func TestOffsetCommitRequestV0(t *testing.T) {
request.ConsumerGroup = "foobar"
testRequest(t, "no blocks v0", request, offsetCommitRequestNoBlocksV0)

request.AddBlock("topic", 0x5221, 0xDEADBEEF, 0, 0, "metadata")
request.AddBlock("topic", 0x5221, 0xDEADBEEF, 0, "metadata")
testRequest(t, "one block v0", request, offsetCommitRequestOneBlockV0)
}

Expand All @@ -82,7 +82,7 @@ func TestOffsetCommitRequestV1(t *testing.T) {
request.Version = 1
testRequest(t, "no blocks v1", request, offsetCommitRequestNoBlocksV1)

request.AddBlock("topic", 0x5221, 0xDEADBEEF, 0, ReceiveTime, "metadata")
request.AddBlock("topic", 0x5221, 0xDEADBEEF, ReceiveTime, "metadata")
testRequest(t, "one block v1", request, offsetCommitRequestOneBlockV1)
}

Expand All @@ -96,7 +96,7 @@ func TestOffsetCommitRequestV2ToV4(t *testing.T) {
request.Version = int16(version)
testRequest(t, fmt.Sprintf("no blocks v%d", version), request, offsetCommitRequestNoBlocksV2)

request.AddBlock("topic", 0x5221, 0xDEADBEEF, 0, 0, "metadata")
request.AddBlock("topic", 0x5221, 0xDEADBEEF, 0, "metadata")
testRequest(t, fmt.Sprintf("one block v%d", version), request, offsetCommitRequestOneBlockV2)
}
}
Expand Down
2 changes: 1 addition & 1 deletion offset_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,7 @@ func (om *offsetManager) constructRequest() *OffsetCommitRequest {
for _, pom := range topicManagers {
pom.lock.Lock()
if pom.dirty {
r.AddBlock(pom.topic, pom.partition, pom.offset, pom.leaderEpoch, perPartitionTimestamp, pom.metadata)
r.AddBlockWithLeaderEpoch(pom.topic, pom.partition, pom.offset, pom.leaderEpoch, perPartitionTimestamp, pom.metadata)
}
pom.lock.Unlock()
}
Expand Down

0 comments on commit 1d949d6

Please sign in to comment.