From 061575139e47e7dac69aaf7d78a3a3ee6f8957b0 Mon Sep 17 00:00:00 2001 From: Dominic Evans Date: Thu, 3 Aug 2023 14:51:23 +0100 Subject: [PATCH] fix(proto): correct JoinGroup usage for wider version range Signed-off-by: Dominic Evans --- consumer_group.go | 19 ++++++++++++++++++- join_group_request.go | 12 +++++++++--- join_group_response.go | 10 ++++++++-- 3 files changed, 35 insertions(+), 6 deletions(-) diff --git a/consumer_group.go b/consumer_group.go index 541cc7dfd..1b08f6025 100644 --- a/consumer_group.go +++ b/consumer_group.go @@ -433,7 +433,24 @@ func (c *consumerGroup) joinGroupRequest(coordinator *Broker, topics []string) ( req.Version = 1 req.RebalanceTimeout = int32(c.config.Consumer.Group.Rebalance.Timeout / time.Millisecond) } - if c.groupInstanceId != nil { + if c.config.Version.IsAtLeast(V0_11_0_0) { + req.Version = 2 + } + if c.config.Version.IsAtLeast(V0_11_0_0) { + req.Version = 2 + } + if c.config.Version.IsAtLeast(V2_0_0_0) { + req.Version = 3 + } + // XXX: protocol states "Starting from version 4, the client needs to issue a + // second request to join group", so not enabling this until we can + // investigate + /* + if c.config.Version.IsAtLeast(V2_2_0_0) { + req.Version = 4 + } + */ + if c.config.Version.IsAtLeast(V2_3_0_0) { req.Version = 5 req.GroupInstanceId = c.groupInstanceId } diff --git a/join_group_request.go b/join_group_request.go index de5a2df77..b574ce475 100644 --- a/join_group_request.go +++ b/join_group_request.go @@ -156,14 +156,20 @@ func (r *JoinGroupRequest) isValidVersion() bool { func (r *JoinGroupRequest) requiredVersion() KafkaVersion { switch r.Version { - case 4, 5: + case 5: return V2_3_0_0 - case 2, 3: + case 4: + return V2_2_0_0 + case 3: + return V2_0_0_0 + case 2: return V0_11_0_0 case 1: return V0_10_1_0 + case 0: + return V0_10_0_0 default: - return V0_9_0_0 + return V2_3_0_0 } } diff --git a/join_group_response.go b/join_group_response.go index 0409f88eb..76645242e 100644 --- a/join_group_response.go +++ b/join_group_response.go @@ -153,14 +153,20 @@ func (r *JoinGroupResponse) isValidVersion() bool { func (r *JoinGroupResponse) requiredVersion() KafkaVersion { switch r.Version { - case 3, 4, 5: + case 5: return V2_3_0_0 + case 4: + return V2_2_0_0 + case 3: + return V2_0_0_0 case 2: return V0_11_0_0 case 1: return V0_10_1_0 + case 0: + return V0_10_0_0 default: - return V0_9_0_0 + return V2_3_0_0 } }