diff --git a/consumer_group.go b/consumer_group.go index 68f463976..541cc7dfd 100644 --- a/consumer_group.go +++ b/consumer_group.go @@ -141,8 +141,8 @@ func newConsumerGroup(groupID string, client Client) (ConsumerGroup, error) { userData: config.Consumer.Group.Member.UserData, metricRegistry: newCleanupRegistry(config.MetricRegistry), } - if client.Config().Consumer.Group.InstanceId != "" && config.Version.IsAtLeast(V2_3_0_0) { - cg.groupInstanceId = &client.Config().Consumer.Group.InstanceId + if config.Consumer.Group.InstanceId != "" && config.Version.IsAtLeast(V2_3_0_0) { + cg.groupInstanceId = &config.Consumer.Group.InstanceId } return cg, nil } @@ -556,32 +556,43 @@ func (c *consumerGroup) leave() error { return err } - // KIP-345 if groupInstanceId is set, don not leave group when consumer closed. - // Since we do not discover ApiVersion for brokers, LeaveGroupRequest still use the old version request for now - if c.groupInstanceId == nil { - resp, err := coordinator.LeaveGroup(&LeaveGroupRequest{ - GroupId: c.groupID, + // as per KIP-345 if groupInstanceId is set, i.e. static membership is in action, then do not leave group when consumer closed, just clear memberID + if c.groupInstanceId != nil { + c.memberID = "" + return nil + } + req := &LeaveGroupRequest{ + GroupId: c.groupID, + MemberId: c.memberID, + } + if c.config.Version.IsAtLeast(V0_11_0_0) { + req.Version = 1 + } + if c.config.Version.IsAtLeast(V2_0_0_0) { + req.Version = 2 + } + if c.config.Version.IsAtLeast(V2_4_0_0) { + req.Version = 3 + req.Members = append(req.Members, MemberIdentity{ MemberId: c.memberID, }) - if err != nil { - _ = coordinator.Close() - return err - } + } - // Unset memberID - c.memberID = "" + resp, err := coordinator.LeaveGroup(req) + if err != nil { + _ = coordinator.Close() + return err + } - // Check response - switch resp.Err { - case ErrRebalanceInProgress, ErrUnknownMemberId, ErrNoError: - return nil - default: - return resp.Err - } - } else { - c.memberID = "" + // clear the memberID + c.memberID = "" + + switch resp.Err { + case ErrRebalanceInProgress, ErrUnknownMemberId, ErrNoError: + return nil + default: + return resp.Err } - return nil } func (c *consumerGroup) handleError(err error, topic string, partition int32) {