From 52863f023e473fc7f6eaa6e86632ace459ef9eda Mon Sep 17 00:00:00 2001 From: rhansen2 Date: Wed, 13 Jul 2022 21:07:58 -0700 Subject: [PATCH] pass through protocol name for sync group --- consumergroup.go | 24 +++++++++++++----------- 1 file changed, 13 insertions(+), 11 deletions(-) diff --git a/consumergroup.go b/consumergroup.go index 206f921ff..9ab2f16e1 100644 --- a/consumergroup.go +++ b/consumergroup.go @@ -812,11 +812,12 @@ func (cg *ConsumerGroup) nextGeneration(memberID string) (string, error) { var generationID int var groupAssignments GroupMemberAssignments var assignments map[string][]int + var protocolName string var err error // join group. this will join the group and prepare assignments if our // consumer is elected leader. it may also change or assign the member ID. - memberID, generationID, groupAssignments, err = cg.joinGroup(memberID) + memberID, generationID, protocolName, groupAssignments, err = cg.joinGroup(memberID) if err != nil { cg.withErrorLogger(func(log Logger) { log.Printf("Failed to join group %s: %v", cg.config.ID, err) @@ -828,7 +829,7 @@ func (cg *ConsumerGroup) nextGeneration(memberID string) (string, error) { }) // sync group - assignments, err = cg.syncGroup(memberID, generationID, groupAssignments) + assignments, err = cg.syncGroup(memberID, generationID, protocolName, groupAssignments) if err != nil { cg.withErrorLogger(func(log Logger) { log.Printf("Failed to sync group %s: %v", cg.config.ID, err) @@ -905,10 +906,10 @@ func (cg *ConsumerGroup) nextGeneration(memberID string) (string, error) { // * InconsistentGroupProtocol: // * InvalidSessionTimeout: // * GroupAuthorizationFailed: -func (cg *ConsumerGroup) joinGroup(memberID string) (string, int, GroupMemberAssignments, error) { +func (cg *ConsumerGroup) joinGroup(memberID string) (string, int, string, GroupMemberAssignments, error) { request, err := cg.makeJoinGroupRequest(memberID) if err != nil { - return "", 0, nil, err + return "", 0, "", nil, err } response, err := cg.coord.joinGroup(request) @@ -919,7 +920,7 @@ func (cg *ConsumerGroup) joinGroup(memberID string) (string, int, GroupMemberAss memberID = response.MemberID } if err != nil { - return memberID, 0, nil, err + return memberID, 0, "", nil, err } generationID := response.GenerationID @@ -927,12 +928,11 @@ func (cg *ConsumerGroup) joinGroup(memberID string) (string, int, GroupMemberAss cg.withLogger(func(l Logger) { l.Printf("joined group %s as member %s in generation %d", cg.config.ID, memberID, generationID) }) - var assignments GroupMemberAssignments if iAmLeader := response.MemberID == response.LeaderID; iAmLeader { v, err := cg.assignTopicPartitions(response) if err != nil { - return memberID, 0, nil, err + return memberID, 0, "", nil, err } assignments = v @@ -949,7 +949,7 @@ func (cg *ConsumerGroup) joinGroup(memberID string) (string, int, GroupMemberAss l.Printf("joinGroup succeeded for response, %v. generationID=%v, memberID=%v", cg.config.ID, response.GenerationID, response.MemberID) }) - return memberID, generationID, assignments, nil + return memberID, generationID, response.ProtocolName, assignments, nil } // makeJoinGroupRequest handles the logic of constructing a joinGroup @@ -1044,8 +1044,8 @@ func (cg *ConsumerGroup) assignTopicPartitions(group *JoinGroupResponse) (GroupM // * IllegalGeneration: // * RebalanceInProgress: // * GroupAuthorizationFailed: -func (cg *ConsumerGroup) syncGroup(memberID string, generationID int, memberAssignments GroupMemberAssignments) (map[string][]int, error) { - request := cg.makeSyncGroupRequest(memberID, generationID, memberAssignments) +func (cg *ConsumerGroup) syncGroup(memberID string, generationID int, protocolName string, memberAssignments GroupMemberAssignments) (map[string][]int, error) { + request := cg.makeSyncGroupRequest(memberID, generationID, protocolName, memberAssignments) response, err := cg.coord.syncGroup(request) if err == nil && response.Error != nil { err = response.Error @@ -1066,11 +1066,13 @@ func (cg *ConsumerGroup) syncGroup(memberID string, generationID int, memberAssi return response.Assignment.AssignedPartitions, nil } -func (cg *ConsumerGroup) makeSyncGroupRequest(memberID string, generationID int, memberAssignments GroupMemberAssignments) *SyncGroupRequest { +func (cg *ConsumerGroup) makeSyncGroupRequest(memberID string, generationID int, protocolName string, memberAssignments GroupMemberAssignments) *SyncGroupRequest { request := &SyncGroupRequest{ GroupID: cg.config.ID, GenerationID: generationID, MemberID: memberID, + ProtocolType: defaultProtocolType, + ProtocolName: protocolName, } if memberAssignments != nil {