Skip to content

Commit

Permalink
pass through protocol name for sync group
Browse files Browse the repository at this point in the history
  • Loading branch information
rhansen2 committed Jul 14, 2022
1 parent 45cbf89 commit 52863f0
Showing 1 changed file with 13 additions and 11 deletions.
24 changes: 13 additions & 11 deletions consumergroup.go
Original file line number Diff line number Diff line change
Expand Up @@ -812,11 +812,12 @@ func (cg *ConsumerGroup) nextGeneration(memberID string) (string, error) {
var generationID int
var groupAssignments GroupMemberAssignments
var assignments map[string][]int
var protocolName string
var err error

// join group. this will join the group and prepare assignments if our
// consumer is elected leader. it may also change or assign the member ID.
memberID, generationID, groupAssignments, err = cg.joinGroup(memberID)
memberID, generationID, protocolName, groupAssignments, err = cg.joinGroup(memberID)
if err != nil {
cg.withErrorLogger(func(log Logger) {
log.Printf("Failed to join group %s: %v", cg.config.ID, err)
Expand All @@ -828,7 +829,7 @@ func (cg *ConsumerGroup) nextGeneration(memberID string) (string, error) {
})

// sync group
assignments, err = cg.syncGroup(memberID, generationID, groupAssignments)
assignments, err = cg.syncGroup(memberID, generationID, protocolName, groupAssignments)
if err != nil {
cg.withErrorLogger(func(log Logger) {
log.Printf("Failed to sync group %s: %v", cg.config.ID, err)
Expand Down Expand Up @@ -905,10 +906,10 @@ func (cg *ConsumerGroup) nextGeneration(memberID string) (string, error) {
// * InconsistentGroupProtocol:
// * InvalidSessionTimeout:
// * GroupAuthorizationFailed:
func (cg *ConsumerGroup) joinGroup(memberID string) (string, int, GroupMemberAssignments, error) {
func (cg *ConsumerGroup) joinGroup(memberID string) (string, int, string, GroupMemberAssignments, error) {
request, err := cg.makeJoinGroupRequest(memberID)
if err != nil {
return "", 0, nil, err
return "", 0, "", nil, err
}

response, err := cg.coord.joinGroup(request)
Expand All @@ -919,20 +920,19 @@ func (cg *ConsumerGroup) joinGroup(memberID string) (string, int, GroupMemberAss
memberID = response.MemberID
}
if err != nil {
return memberID, 0, nil, err
return memberID, 0, "", nil, err
}

generationID := response.GenerationID

cg.withLogger(func(l Logger) {
l.Printf("joined group %s as member %s in generation %d", cg.config.ID, memberID, generationID)
})

var assignments GroupMemberAssignments
if iAmLeader := response.MemberID == response.LeaderID; iAmLeader {
v, err := cg.assignTopicPartitions(response)
if err != nil {
return memberID, 0, nil, err
return memberID, 0, "", nil, err
}
assignments = v

Expand All @@ -949,7 +949,7 @@ func (cg *ConsumerGroup) joinGroup(memberID string) (string, int, GroupMemberAss
l.Printf("joinGroup succeeded for response, %v. generationID=%v, memberID=%v", cg.config.ID, response.GenerationID, response.MemberID)
})

return memberID, generationID, assignments, nil
return memberID, generationID, response.ProtocolName, assignments, nil
}

// makeJoinGroupRequest handles the logic of constructing a joinGroup
Expand Down Expand Up @@ -1044,8 +1044,8 @@ func (cg *ConsumerGroup) assignTopicPartitions(group *JoinGroupResponse) (GroupM
// * IllegalGeneration:
// * RebalanceInProgress:
// * GroupAuthorizationFailed:
func (cg *ConsumerGroup) syncGroup(memberID string, generationID int, memberAssignments GroupMemberAssignments) (map[string][]int, error) {
request := cg.makeSyncGroupRequest(memberID, generationID, memberAssignments)
func (cg *ConsumerGroup) syncGroup(memberID string, generationID int, protocolName string, memberAssignments GroupMemberAssignments) (map[string][]int, error) {
request := cg.makeSyncGroupRequest(memberID, generationID, protocolName, memberAssignments)
response, err := cg.coord.syncGroup(request)
if err == nil && response.Error != nil {
err = response.Error
Expand All @@ -1066,11 +1066,13 @@ func (cg *ConsumerGroup) syncGroup(memberID string, generationID int, memberAssi
return response.Assignment.AssignedPartitions, nil
}

func (cg *ConsumerGroup) makeSyncGroupRequest(memberID string, generationID int, memberAssignments GroupMemberAssignments) *SyncGroupRequest {
func (cg *ConsumerGroup) makeSyncGroupRequest(memberID string, generationID int, protocolName string, memberAssignments GroupMemberAssignments) *SyncGroupRequest {
request := &SyncGroupRequest{
GroupID: cg.config.ID,
GenerationID: generationID,
MemberID: memberID,
ProtocolType: defaultProtocolType,
ProtocolName: protocolName,
}

if memberAssignments != nil {
Expand Down

0 comments on commit 52863f0

Please sign in to comment.