From c7f15127548b60b91777bdc7de2e61a13138e723 Mon Sep 17 00:00:00 2001 From: unknown Date: Mon, 16 May 2022 23:08:51 +0800 Subject: [PATCH] kip-394: support static membership --- admin.go | 31 ++- config.go | 31 +++ config_test.go | 44 ++++ consumer_group.go | 102 +++++--- describe_groups_request.go | 29 ++- describe_groups_request_test.go | 47 +++- describe_groups_response.go | 48 +++- describe_groups_response_test.go | 196 ++++++++++++++- functional_consumer_group_test.go | 20 +- functional_consumer_staticmembership_test.go | 237 +++++++++++++++++++ heartbeat_request.go | 26 +- heartbeat_request_test.go | 86 ++++++- heartbeat_response.go | 20 +- heartbeat_response_test.go | 64 ++++- join_group_request.go | 16 +- join_group_request_test.go | 60 ++++- join_group_response.go | 40 +++- join_group_response_test.go | 58 ++++- leave_group_request.go | 58 ++++- leave_group_request_test.go | 66 +++++- leave_group_response.go | 60 ++++- leave_group_response_test.go | 88 +++++-- mockresponses.go | 6 +- offset_commit_request.go | 53 ++++- offset_commit_request_test.go | 114 ++++++++- offset_commit_response.go | 2 + offset_commit_response_test.go | 77 +++++- offset_manager.go | 87 ++++--- sync_group_request.go | 21 +- sync_group_request_test.go | 51 +++- sync_group_response.go | 17 +- sync_group_response_test.go | 72 ++++-- 32 files changed, 1727 insertions(+), 200 deletions(-) create mode 100644 functional_consumer_staticmembership_test.go diff --git a/admin.go b/admin.go index 7683b44fb2..83279c42b9 100644 --- a/admin.go +++ b/admin.go @@ -141,6 +141,11 @@ type ClusterAdmin interface { // locally cached value if it's available. Controller() (*Broker, error) + // Remove members from the consumer group by given member identities. + // This operation is supported by brokers with version 2.3 or higher + // This is for static membership feature. KIP-345 + RemoveMemberFromConsumerGroup(groupId string, groupInstanceIds []string) (*LeaveGroupResponse, error) + // Close shuts down the admin and closes underlying client. Close() error } @@ -900,9 +905,13 @@ func (ca *clusterAdmin) DescribeConsumerGroups(groups []string) (result []*Group } for broker, brokerGroups := range groupsPerBroker { - response, err := broker.DescribeGroups(&DescribeGroupsRequest{ + describeReq := &DescribeGroupsRequest{ Groups: brokerGroups, - }) + } + if ca.conf.Version.IsAtLeast(V2_3_0_0) { + describeReq.Version = 4 + } + response, err := broker.DescribeGroups(describeReq) if err != nil { return nil, err } @@ -1196,3 +1205,21 @@ func (ca *clusterAdmin) AlterClientQuotas(entity []QuotaEntityComponent, op Clie return nil } + +func (ca *clusterAdmin) RemoveMemberFromConsumerGroup(groupId string, groupInstanceIds []string) (*LeaveGroupResponse, error) { + controller, err := ca.client.Coordinator(groupId) + if err != nil { + return nil, err + } + request := &LeaveGroupRequest{ + Version: 3, + GroupId: groupId, + } + for _, instanceId := range groupInstanceIds { + groupInstanceId := instanceId + request.Members = append(request.Members, MemberIdentity{ + GroupInstanceId: &groupInstanceId, + }) + } + return controller.LeaveGroup(request) +} diff --git a/config.go b/config.go index 4cb99232ae..a404fb8802 100644 --- a/config.go +++ b/config.go @@ -296,6 +296,8 @@ type Config struct { // coordinator for the group. UserData []byte } + // support KIP-345 + InstanceId string } Retry struct { @@ -509,6 +511,7 @@ func NewConfig() *Config { // Validate checks a Config instance. It will return a // ConfigurationError if the specified values don't make sense. +//nolint:gocyclo // This function's cyclomatic complexity has go beyond 100 func (c *Config) Validate() error { // some configuration values should be warned on but not fail completely, do those first if !c.Net.TLS.Enable && c.Net.TLS.Config != nil { @@ -754,6 +757,14 @@ func (c *Config) Validate() error { case c.Consumer.Group.Rebalance.Retry.Backoff < 0: return ConfigurationError("Consumer.Group.Rebalance.Retry.Backoff must be >= 0") } + if c.Consumer.Group.InstanceId != "" { + if !c.Version.IsAtLeast(V2_3_0_0) { + return ConfigurationError("Consumer.Group.InstanceId need Version >= 2.3") + } + if err := validateGroupInstanceId(c.Consumer.Group.InstanceId); err != nil { + return err + } + } // validate misc shared values switch { @@ -778,3 +789,23 @@ func (c *Config) getDialer() proxy.Dialer { } } } + +const MAX_GROUP_INSTANCE_ID_LENGTH = 249 + +var GROUP_INSTANCE_ID_REGEXP = regexp.MustCompile(`^[0-9a-zA-Z\._\-]+$`) + +func validateGroupInstanceId(id string) error { + if id == "" { + return ConfigurationError("Group instance id must be non-empty string") + } + if id == "." || id == ".." { + return ConfigurationError(`Group instance id cannot be "." or ".."`) + } + if len(id) > MAX_GROUP_INSTANCE_ID_LENGTH { + return ConfigurationError(fmt.Sprintf(`Group instance id cannot be longer than %v, characters: %s`, MAX_GROUP_INSTANCE_ID_LENGTH, id)) + } + if !GROUP_INSTANCE_ID_REGEXP.MatchString(id) { + return ConfigurationError(fmt.Sprintf(`Group instance id %s is illegal, it contains a character other than, '.', '_' and '-'`, id)) + } + return nil +} diff --git a/config_test.go b/config_test.go index 5877520757..35d861de67 100644 --- a/config_test.go +++ b/config_test.go @@ -3,6 +3,7 @@ package sarama import ( "errors" "os" + "strings" "testing" "github.com/rcrowley/go-metrics" @@ -501,6 +502,49 @@ func TestZstdConfigValidation(t *testing.T) { } } +func TestValidGroupInstanceId(t *testing.T) { + tests := []struct { + grouptInstanceId string + shouldHaveErr bool + }{ + {"groupInstanceId1", false}, + {"", true}, + {".", true}, + {"..", true}, + {strings.Repeat("a", 250), true}, + {"group_InstanceId.1", false}, + {"group-InstanceId1", false}, + {"group#InstanceId1", true}, + } + for _, testcase := range tests { + err := validateGroupInstanceId(testcase.grouptInstanceId) + if !testcase.shouldHaveErr { + if err != nil { + t.Errorf("Expected validGroupInstanceId %s to pass, got error %v", testcase.grouptInstanceId, err) + } + } else { + if err == nil { + t.Errorf("Expected validGroupInstanceId %s to be error, got nil", testcase.grouptInstanceId) + } + if _, ok := err.(ConfigurationError); !ok { + t.Errorf("Excepted err to be ConfigurationError, got %v", err) + } + } + } +} + +func TestGroupInstanceIdAndVersionValidation(t *testing.T) { + config := NewTestConfig() + config.Consumer.Group.InstanceId = "groupInstanceId1" + if err := config.Validate(); string(err.(ConfigurationError)) != "Consumer.Group.InstanceId need Version >= 2.3" { + t.Error("Expected invalid zstd/kafka version error, got ", err) + } + config.Version = V2_3_0_0 + if err := config.Validate(); err != nil { + t.Error("Expected zstd to work, got ", err) + } +} + // This example shows how to integrate with an existing registry as well as publishing metrics // on the standard output func ExampleConfig_metrics() { diff --git a/consumer_group.go b/consumer_group.go index 845fb857ae..d50b093bc8 100644 --- a/consumer_group.go +++ b/consumer_group.go @@ -79,11 +79,12 @@ type ConsumerGroup interface { type consumerGroup struct { client Client - config *Config - consumer Consumer - groupID string - memberID string - errors chan error + config *Config + consumer Consumer + groupID string + groupInstanceId *string + memberID string + errors chan error lock sync.Mutex closed chan none @@ -127,7 +128,7 @@ func newConsumerGroup(groupID string, client Client) (ConsumerGroup, error) { return nil, err } - return &consumerGroup{ + cg := &consumerGroup{ client: client, consumer: consumer, config: config, @@ -135,7 +136,11 @@ func newConsumerGroup(groupID string, client Client) (ConsumerGroup, error) { errors: make(chan error, config.ChannelBufferSize), closed: make(chan none), userData: config.Consumer.Group.Member.UserData, - }, nil + } + if client.Config().Consumer.Group.InstanceId != "" && config.Version.IsAtLeast(V2_3_0_0) { + cg.groupInstanceId = &client.Config().Consumer.Group.InstanceId + } + return cg, nil } // Errors implements ConsumerGroup. @@ -300,6 +305,19 @@ func (c *consumerGroup) newSession(ctx context.Context, topics []string, handler return nil, join.Err } return c.retryNewSession(ctx, topics, handler, retries, true) + case ErrMemberIdRequired: + // from JoinGroupRequest v4, if client start with empty member id, + // it need to get member id from response and send another join request to join group + if retries <= 0 { + return nil, join.Err + } + c.memberID = join.MemberId + return c.retryNewSession(ctx, topics, handler, retries, false) + case ErrFencedInstancedId: + if c.groupInstanceId != nil { + Logger.Printf("JoinGroup failed: group instance id %s has been fenced\n", *c.groupInstanceId) + } + return nil, join.Err default: return nil, join.Err } @@ -348,6 +366,11 @@ func (c *consumerGroup) newSession(ctx context.Context, topics []string, handler return nil, groupRequest.Err } return c.retryNewSession(ctx, topics, handler, retries, true) + case ErrFencedInstancedId: + if c.groupInstanceId != nil { + Logger.Printf("JoinGroup failed: group instance id %s has been fenced\n", *c.groupInstanceId) + } + return nil, groupRequest.Err default: return nil, groupRequest.Err } @@ -389,6 +412,10 @@ func (c *consumerGroup) joinGroupRequest(coordinator *Broker, topics []string) ( req.Version = 1 req.RebalanceTimeout = int32(c.config.Consumer.Group.Rebalance.Timeout / time.Millisecond) } + if c.groupInstanceId != nil { + req.Version = 5 + req.GroupInstanceId = c.groupInstanceId + } meta := &ConsumerGroupMemberMetadata{ Topics: topics, @@ -409,6 +436,10 @@ func (c *consumerGroup) syncGroupRequest(coordinator *Broker, plan BalanceStrate GenerationId: generationID, } strategy := c.config.Consumer.Group.Rebalance.Strategy + if c.groupInstanceId != nil { + req.Version = 3 + req.GroupInstanceId = c.groupInstanceId + } for memberID, topics := range plan { assignment := &ConsumerGroupMemberAssignment{Topics: topics} userDataBytes, err := strategy.AssignmentData(memberID, topics, generationID) @@ -429,6 +460,10 @@ func (c *consumerGroup) heartbeatRequest(coordinator *Broker, memberID string, g MemberId: memberID, GenerationId: generationID, } + if c.groupInstanceId != nil { + req.Version = 3 + req.GroupInstanceId = c.groupInstanceId + } return coordinator.Heartbeat(req) } @@ -466,25 +501,32 @@ func (c *consumerGroup) leave() error { return err } - resp, err := coordinator.LeaveGroup(&LeaveGroupRequest{ - GroupId: c.groupID, - MemberId: c.memberID, - }) - if err != nil { - _ = coordinator.Close() - return err - } + // KIP-345 if groupInstanceId is set, don not leave group when consumer closed. + // Since we do not discover ApiVersion for brokers, LeaveGroupRequest still use the old version request for now + if c.groupInstanceId == nil { + resp, err := coordinator.LeaveGroup(&LeaveGroupRequest{ + GroupId: c.groupID, + MemberId: c.memberID, + }) + if err != nil { + _ = coordinator.Close() + return err + } - // Unset memberID - c.memberID = "" + // Unset memberID + c.memberID = "" - // Check response - switch resp.Err { - case ErrRebalanceInProgress, ErrUnknownMemberId, ErrNoError: - return nil - default: - return resp.Err + // Check response + switch resp.Err { + case ErrRebalanceInProgress, ErrUnknownMemberId, ErrNoError: + return nil + default: + return resp.Err + } + } else { + c.memberID = "" } + return nil } func (c *consumerGroup) handleError(err error, topic string, partition int32) { @@ -628,15 +670,15 @@ type consumerGroupSession struct { } func newConsumerGroupSession(ctx context.Context, parent *consumerGroup, claims map[string][]int32, memberID string, generationID int32, handler ConsumerGroupHandler) (*consumerGroupSession, error) { + // init context + ctx, cancel := context.WithCancel(ctx) + // init offset manager - offsets, err := newOffsetManagerFromClient(parent.groupID, memberID, generationID, parent.client) + offsets, err := newOffsetManagerFromClient(parent.groupID, memberID, generationID, parent.client, cancel) if err != nil { return nil, err } - // init context - ctx, cancel := context.WithCancel(ctx) - // init session sess := &consumerGroupSession{ parent: parent, @@ -862,6 +904,12 @@ func (s *consumerGroupSession) heartbeatLoop() { s.cancel() case ErrUnknownMemberId, ErrIllegalGeneration: return + case ErrFencedInstancedId: + if s.parent.groupInstanceId != nil { + Logger.Printf("JoinGroup failed: group instance id %s has been fenced\n", *s.parent.groupInstanceId) + } + s.parent.handleError(resp.Err, "", -1) + return default: s.parent.handleError(resp.Err, "", -1) return diff --git a/describe_groups_request.go b/describe_groups_request.go index f8962da58f..f81f69ac4b 100644 --- a/describe_groups_request.go +++ b/describe_groups_request.go @@ -1,16 +1,33 @@ package sarama type DescribeGroupsRequest struct { - Groups []string + Version int16 + Groups []string + IncludeAuthorizedOperations bool } func (r *DescribeGroupsRequest) encode(pe packetEncoder) error { - return pe.putStringArray(r.Groups) + if err := pe.putStringArray(r.Groups); err != nil { + return err + } + if r.Version >= 3 { + pe.putBool(r.IncludeAuthorizedOperations) + } + return nil } func (r *DescribeGroupsRequest) decode(pd packetDecoder, version int16) (err error) { + r.Version = version r.Groups, err = pd.getStringArray() - return + if err != nil { + return err + } + if r.Version >= 3 { + if r.IncludeAuthorizedOperations, err = pd.getBool(); err != nil { + return err + } + } + return nil } func (r *DescribeGroupsRequest) key() int16 { @@ -18,7 +35,7 @@ func (r *DescribeGroupsRequest) key() int16 { } func (r *DescribeGroupsRequest) version() int16 { - return 0 + return r.Version } func (r *DescribeGroupsRequest) headerVersion() int16 { @@ -26,6 +43,10 @@ func (r *DescribeGroupsRequest) headerVersion() int16 { } func (r *DescribeGroupsRequest) requiredVersion() KafkaVersion { + switch r.Version { + case 1, 2, 3, 4: + return V2_3_0_0 + } return V0_9_0_0 } diff --git a/describe_groups_request_test.go b/describe_groups_request_test.go index 7d45f3fee4..a2d87b4b62 100644 --- a/describe_groups_request_test.go +++ b/describe_groups_request_test.go @@ -5,19 +5,19 @@ import "testing" var ( emptyDescribeGroupsRequest = []byte{0, 0, 0, 0} - singleDescribeGroupsRequest = []byte{ + singleDescribeGroupsRequestV0 = []byte{ 0, 0, 0, 1, // 1 group 0, 3, 'f', 'o', 'o', // group name: foo } - doubleDescribeGroupsRequest = []byte{ + doubleDescribeGroupsRequestV0 = []byte{ 0, 0, 0, 2, // 2 groups 0, 3, 'f', 'o', 'o', // group name: foo 0, 3, 'b', 'a', 'r', // group name: foo } ) -func TestDescribeGroupsRequest(t *testing.T) { +func TestDescribeGroupsRequestV0(t *testing.T) { var request *DescribeGroupsRequest request = new(DescribeGroupsRequest) @@ -25,10 +25,47 @@ func TestDescribeGroupsRequest(t *testing.T) { request = new(DescribeGroupsRequest) request.AddGroup("foo") - testRequest(t, "one group", request, singleDescribeGroupsRequest) + testRequest(t, "one group", request, singleDescribeGroupsRequestV0) request = new(DescribeGroupsRequest) request.AddGroup("foo") request.AddGroup("bar") - testRequest(t, "two groups", request, doubleDescribeGroupsRequest) + testRequest(t, "two groups", request, doubleDescribeGroupsRequestV0) +} + +var ( + emptyDescribeGroupsRequestV3 = []byte{0, 0, 0, 0, 0} + + singleDescribeGroupsRequestV3 = []byte{ + 0, 0, 0, 1, // 1 group + 0, 3, 'f', 'o', 'o', // group name: foo + 0, + } + + doubleDescribeGroupsRequestV3 = []byte{ + 0, 0, 0, 2, // 2 groups + 0, 3, 'f', 'o', 'o', // group name: foo + 0, 3, 'b', 'a', 'r', // group name: foo + 1, + } +) + +func TestDescribeGroupsRequestV3(t *testing.T) { + var request *DescribeGroupsRequest + + request = new(DescribeGroupsRequest) + request.Version = 3 + testRequest(t, "no groups", request, emptyDescribeGroupsRequestV3) + + request = new(DescribeGroupsRequest) + request.Version = 3 + request.AddGroup("foo") + testRequest(t, "one group", request, singleDescribeGroupsRequestV3) + + request = new(DescribeGroupsRequest) + request.Version = 3 + request.AddGroup("foo") + request.AddGroup("bar") + request.IncludeAuthorizedOperations = true + testRequest(t, "two groups", request, doubleDescribeGroupsRequestV3) } diff --git a/describe_groups_response.go b/describe_groups_response.go index 1027b33d02..21c9cbc469 100644 --- a/describe_groups_response.go +++ b/describe_groups_response.go @@ -1,24 +1,40 @@ package sarama type DescribeGroupsResponse struct { - Groups []*GroupDescription + Version int16 + ThrottleTimeMs int32 + Groups []*GroupDescription + AuthorizedOperations int32 } func (r *DescribeGroupsResponse) encode(pe packetEncoder) error { + if r.Version >= 1 { + pe.putInt32(r.ThrottleTimeMs) + } if err := pe.putArrayLength(len(r.Groups)); err != nil { return err } for _, groupDescription := range r.Groups { + groupDescription.Version = r.Version if err := groupDescription.encode(pe); err != nil { return err } } + if r.Version >= 3 { + pe.putInt32(r.AuthorizedOperations) + } return nil } func (r *DescribeGroupsResponse) decode(pd packetDecoder, version int16) (err error) { + r.Version = version + if r.Version >= 1 { + if r.ThrottleTimeMs, err = pd.getInt32(); err != nil { + return err + } + } n, err := pd.getArrayLength() if err != nil { return err @@ -27,10 +43,16 @@ func (r *DescribeGroupsResponse) decode(pd packetDecoder, version int16) (err er r.Groups = make([]*GroupDescription, n) for i := 0; i < n; i++ { r.Groups[i] = new(GroupDescription) + r.Groups[i].Version = r.Version if err := r.Groups[i].decode(pd); err != nil { return err } } + if r.Version >= 3 { + if r.AuthorizedOperations, err = pd.getInt32(); err != nil { + return err + } + } return nil } @@ -40,7 +62,7 @@ func (r *DescribeGroupsResponse) key() int16 { } func (r *DescribeGroupsResponse) version() int16 { - return 0 + return r.Version } func (r *DescribeGroupsResponse) headerVersion() int16 { @@ -48,10 +70,16 @@ func (r *DescribeGroupsResponse) headerVersion() int16 { } func (r *DescribeGroupsResponse) requiredVersion() KafkaVersion { + switch r.Version { + case 1, 2, 3, 4: + return V2_3_0_0 + } return V0_9_0_0 } type GroupDescription struct { + Version int16 + Err KError GroupId string State string @@ -84,6 +112,8 @@ func (gd *GroupDescription) encode(pe packetEncoder) error { if err := pe.putString(memberId); err != nil { return err } + // encode with version + groupMemberDescription.Version = gd.Version if err := groupMemberDescription.encode(pe); err != nil { return err } @@ -129,6 +159,7 @@ func (gd *GroupDescription) decode(pd packetDecoder) (err error) { } gd.Members[memberId] = new(GroupMemberDescription) + gd.Members[memberId].Version = gd.Version if err := gd.Members[memberId].decode(pd); err != nil { return err } @@ -138,6 +169,9 @@ func (gd *GroupDescription) decode(pd packetDecoder) (err error) { } type GroupMemberDescription struct { + Version int16 + + GroupInstanceId *string ClientId string ClientHost string MemberMetadata []byte @@ -145,6 +179,11 @@ type GroupMemberDescription struct { } func (gmd *GroupMemberDescription) encode(pe packetEncoder) error { + if gmd.Version >= 4 { + if err := pe.putNullableString(gmd.GroupInstanceId); err != nil { + return err + } + } if err := pe.putString(gmd.ClientId); err != nil { return err } @@ -162,6 +201,11 @@ func (gmd *GroupMemberDescription) encode(pe packetEncoder) error { } func (gmd *GroupMemberDescription) decode(pd packetDecoder) (err error) { + if gmd.Version >= 4 { + if gmd.GroupInstanceId, err = pd.getNullableString(); err != nil { + return + } + } if gmd.ClientId, err = pd.getString(); err != nil { return } diff --git a/describe_groups_response_test.go b/describe_groups_response_test.go index 22f56836be..75e903600d 100644 --- a/describe_groups_response_test.go +++ b/describe_groups_response_test.go @@ -7,11 +7,11 @@ import ( ) var ( - describeGroupsResponseEmpty = []byte{ + describeGroupsResponseEmptyV0 = []byte{ 0, 0, 0, 0, // no groups } - describeGroupsResponsePopulated = []byte{ + describeGroupsResponsePopulatedV0 = []byte{ 0, 0, 0, 2, // 2 groups 0, 0, // no error @@ -35,17 +35,17 @@ var ( } ) -func TestDescribeGroupsResponse(t *testing.T) { +func TestDescribeGroupsResponseV0(t *testing.T) { var response *DescribeGroupsResponse response = new(DescribeGroupsResponse) - testVersionDecodable(t, "empty", response, describeGroupsResponseEmpty, 0) + testVersionDecodable(t, "empty", response, describeGroupsResponseEmptyV0, 0) if len(response.Groups) != 0 { t.Error("Expected no groups") } response = new(DescribeGroupsResponse) - testVersionDecodable(t, "populated", response, describeGroupsResponsePopulated, 0) + testVersionDecodable(t, "populated", response, describeGroupsResponsePopulatedV0, 0) if len(response.Groups) != 2 { t.Error("Expected two groups") } @@ -90,3 +90,189 @@ func TestDescribeGroupsResponse(t *testing.T) { t.Error("Unxpected groups[1].Members, found", group0.Members) } } + +var ( + describeGroupsResponseEmptyV3 = []byte{ + 0, 0, 0, 0, // throttle time 0 + 0, 0, 0, 0, // no groups + 0, 0, 0, 0, // authorizedOperations 0 + } + + describeGroupsResponsePopulatedV3 = []byte{ + 0, 0, 0, 0, // throttle time 0 + 0, 0, 0, 2, // 2 groups + + 0, 0, // no error + 0, 3, 'f', 'o', 'o', // Group ID + 0, 3, 'b', 'a', 'r', // State + 0, 8, 'c', 'o', 'n', 's', 'u', 'm', 'e', 'r', // ConsumerProtocol type + 0, 3, 'b', 'a', 'z', // Protocol name + 0, 0, 0, 1, // 1 member + 0, 2, 'i', 'd', // Member ID + 0, 6, 's', 'a', 'r', 'a', 'm', 'a', // Client ID + 0, 9, 'l', 'o', 'c', 'a', 'l', 'h', 'o', 's', 't', // Client Host + 0, 0, 0, 3, 0x01, 0x02, 0x03, // MemberMetadata + 0, 0, 0, 3, 0x04, 0x05, 0x06, // MemberAssignment + + 0, 30, // ErrGroupAuthorizationFailed + 0, 0, + 0, 0, + 0, 0, + 0, 0, + 0, 0, 0, 0, + + 0, 0, 0, 0, // authorizedOperations 0 + } + + describeGroupsResponseEmptyV4 = []byte{ + 0, 0, 0, 0, // throttle time 0 + 0, 0, 0, 0, // no groups + 0, 0, 0, 0, // authorizedOperations 0 + } + + describeGroupsResponsePopulatedV4 = []byte{ + 0, 0, 0, 0, // throttle time 0 + 0, 0, 0, 2, // 2 groups + + 0, 0, // no error + 0, 3, 'f', 'o', 'o', // Group ID + 0, 3, 'b', 'a', 'r', // State + 0, 8, 'c', 'o', 'n', 's', 'u', 'm', 'e', 'r', // ConsumerProtocol type + 0, 3, 'b', 'a', 'z', // Protocol name + 0, 0, 0, 1, // 1 member + 0, 2, 'i', 'd', // Member ID + 0, 3, 'g', 'i', 'd', // Group Instance ID + 0, 6, 's', 'a', 'r', 'a', 'm', 'a', // Client ID + 0, 9, 'l', 'o', 'c', 'a', 'l', 'h', 'o', 's', 't', // Client Host + 0, 0, 0, 3, 0x01, 0x02, 0x03, // MemberMetadata + 0, 0, 0, 3, 0x04, 0x05, 0x06, // MemberAssignment + + 0, 30, // ErrGroupAuthorizationFailed + 0, 0, + 0, 0, + 0, 0, + 0, 0, + 0, 0, 0, 0, + + 0, 0, 0, 0, // authorizedOperations 0 + } +) + +func TestDescribeGroupsResponseV1plus(t *testing.T) { + groupInstanceId := "gid" + tests := []struct { + CaseName string + Version int16 + MessageBytes []byte + Message *DescribeGroupsResponse + }{ + { + "empty", + 3, + describeGroupsResponseEmptyV3, + &DescribeGroupsResponse{ + Version: 3, + ThrottleTimeMs: int32(0), + Groups: []*GroupDescription{}, + AuthorizedOperations: int32(0), + }, + }, + { + "populated", + 3, + describeGroupsResponsePopulatedV3, + &DescribeGroupsResponse{ + Version: 3, + ThrottleTimeMs: int32(0), + Groups: []*GroupDescription{ + { + Version: 3, + Err: KError(0), + GroupId: "foo", + State: "bar", + ProtocolType: "consumer", + Protocol: "baz", + Members: map[string]*GroupMemberDescription{ + "id": { + Version: 3, + ClientId: "sarama", + ClientHost: "localhost", + MemberMetadata: []byte{1, 2, 3}, + MemberAssignment: []byte{4, 5, 6}, + }, + }, + }, + { + Version: 3, + Err: KError(30), + GroupId: "", + State: "", + ProtocolType: "", + Protocol: "", + Members: nil, + }, + }, + AuthorizedOperations: int32(0), + }, + }, + { + "empty", + 4, + describeGroupsResponseEmptyV4, + &DescribeGroupsResponse{ + Version: 4, + ThrottleTimeMs: int32(0), + Groups: []*GroupDescription{}, + AuthorizedOperations: int32(0), + }, + }, + { + "populated", + 4, + describeGroupsResponsePopulatedV4, + &DescribeGroupsResponse{ + Version: 4, + ThrottleTimeMs: int32(0), + Groups: []*GroupDescription{ + { + Version: 4, + Err: KError(0), + GroupId: "foo", + State: "bar", + ProtocolType: "consumer", + Protocol: "baz", + Members: map[string]*GroupMemberDescription{ + "id": { + Version: 4, + GroupInstanceId: &groupInstanceId, + ClientId: "sarama", + ClientHost: "localhost", + MemberMetadata: []byte{1, 2, 3}, + MemberAssignment: []byte{4, 5, 6}, + }, + }, + }, + { + Version: 4, + Err: KError(30), + GroupId: "", + State: "", + ProtocolType: "", + Protocol: "", + Members: nil, + }, + }, + AuthorizedOperations: int32(0), + }, + }, + } + + for _, c := range tests { + response := new(DescribeGroupsResponse) + testVersionDecodable(t, c.CaseName, response, c.MessageBytes, c.Version) + if !reflect.DeepEqual(c.Message, response) { + t.Errorf("case %s decode failed, expected:%+v got %+v", c.CaseName, c.Message, response) + } + testEncodable(t, c.CaseName, c.Message, c.MessageBytes) + } +} diff --git a/functional_consumer_group_test.go b/functional_consumer_group_test.go index e6ac4dc4c1..59f31939c1 100644 --- a/functional_consumer_group_test.go +++ b/functional_consumer_group_test.go @@ -339,14 +339,15 @@ func (s *testFuncConsumerGroupSink) Close() map[string][]string { type testFuncConsumerGroupMember struct { ConsumerGroup - clientID string - claims map[string]int - state int32 - handlers int32 - errs []error - maxMessages int32 - isCapped bool - sink *testFuncConsumerGroupSink + clientID string + claims map[string]int + generationId int32 + state int32 + handlers int32 + errs []error + maxMessages int32 + isCapped bool + sink *testFuncConsumerGroupSink t *testing.T mu sync.RWMutex @@ -458,6 +459,9 @@ func (m *testFuncConsumerGroupMember) Setup(s ConsumerGroupSession) error { m.claims = claims m.mu.Unlock() + // store generationID + atomic.StoreInt32(&m.generationId, s.GenerationID()) + // enter post-setup state atomic.StoreInt32(&m.state, 2) return nil diff --git a/functional_consumer_staticmembership_test.go b/functional_consumer_staticmembership_test.go new file mode 100644 index 0000000000..7981cea810 --- /dev/null +++ b/functional_consumer_staticmembership_test.go @@ -0,0 +1,237 @@ +//+build functional + +package sarama + +import ( + "encoding/json" + "math" + "reflect" + "sync/atomic" + "testing" +) + +func TestFuncConsumerGroupStaticMembership_Basic(t *testing.T) { + checkKafkaVersion(t, "2.3.0") + setupFunctionalTest(t) + defer teardownFunctionalTest(t) + groupID := testFuncConsumerGroupID(t) + + t.Helper() + + config1 := NewTestConfig() + config1.ClientID = "M1" + config1.Version = V2_3_0_0 + config1.Consumer.Offsets.Initial = OffsetNewest + config1.Consumer.Group.InstanceId = "Instance1" + m1 := runTestFuncConsumerGroupMemberWithConfig(t, groupID, 100, config1, nil, "test.4") + defer m1.Close() + + config2 := NewTestConfig() + config2.ClientID = "M2" + config2.Version = V2_3_0_0 + config2.Consumer.Offsets.Initial = OffsetNewest + config2.Consumer.Group.InstanceId = "Instance2" + m2 := runTestFuncConsumerGroupMemberWithConfig(t, groupID, 100, config2, nil, "test.4") + defer m2.Close() + + m1.WaitForState(2) + m2.WaitForState(2) + + err := testFuncConsumerGroupProduceMessage("test.4", 1000) + if err != nil { + t.Fatal(err) + } + + admin, err := NewClusterAdmin(FunctionalTestEnv.KafkaBrokerAddrs, config1) + if err != nil { + t.Fatal(err) + } + res, err := admin.DescribeConsumerGroups([]string{groupID}) + if err != nil { + t.Fatal(err) + } + if len(res) != 1 { + t.Errorf("group description should be only 1, got %v\n", len(res)) + } + if len(res[0].Members) != 2 { + t.Errorf("should have 2 members in group , got %v\n", len(res[0].Members)) + } + + m1.WaitForState(4) + m2.WaitForState(4) + + m1.AssertCleanShutdown() + m2.AssertCleanShutdown() +} + +func TestFuncConsumerGroupStaticMembership_RejoinAndLeave(t *testing.T) { + checkKafkaVersion(t, "2.3.0") + setupFunctionalTest(t) + defer teardownFunctionalTest(t) + groupID := testFuncConsumerGroupID(t) + + t.Helper() + + config1 := NewTestConfig() + config1.ClientID = "M1" + config1.Version = V2_3_0_0 + config1.Consumer.Offsets.Initial = OffsetNewest + config1.Consumer.Group.InstanceId = "Instance1" + m1 := runTestFuncConsumerGroupMemberWithConfig(t, groupID, math.MaxInt32, config1, nil, "test.4") + defer m1.Close() + + config2 := NewTestConfig() + config2.ClientID = "M2" + config2.Version = V2_3_0_0 + config2.Consumer.Offsets.Initial = OffsetNewest + config2.Consumer.Group.InstanceId = "Instance2" + m2 := runTestFuncConsumerGroupMemberWithConfig(t, groupID, math.MaxInt32, config2, nil, "test.4") + defer m2.Close() + + m1.WaitForState(2) + m2.WaitForState(2) + + admin, err := NewClusterAdmin(FunctionalTestEnv.KafkaBrokerAddrs, config1) + if err != nil { + t.Fatal(err) + } + res1, err := admin.DescribeConsumerGroups([]string{groupID}) + if err != nil { + t.Fatal(err) + } + if len(res1) != 1 { + t.Errorf("group description should be only 1, got %v\n", len(res1)) + } + if len(res1[0].Members) != 2 { + t.Errorf("should have 2 members in group , got %v\n", len(res1[0].Members)) + } + + generationId1 := m1.generationId + + // shut down m2, membership should not change (we didn't leave group when close) + m2.AssertCleanShutdown() + + res2, err := admin.DescribeConsumerGroups([]string{groupID}) + if err != nil { + t.Fatal(err) + } + + if !reflect.DeepEqual(res1, res2) { + res1Bytes, _ := json.Marshal(res1) + res2Bytes, _ := json.Marshal(res2) + t.Errorf("group description be the same before %s, after %s", res1Bytes, res2Bytes) + } + + generationId2 := atomic.LoadInt32(&m1.generationId) + if generationId2 != generationId1 { + t.Errorf("m1 generation should not increase expect %v, actual %v", generationId1, generationId2) + } + + // m2 rejoin, should generate a new memberId, no re-balance happens + m2 = runTestFuncConsumerGroupMemberWithConfig(t, groupID, math.MaxInt32, config2, nil, "test.4") + m2.WaitForState(2) + m1.WaitForState(2) + + res3, err := admin.DescribeConsumerGroups([]string{groupID}) + if err != nil { + t.Fatal(err) + } + if err != nil { + t.Fatal(err) + } + if len(res3) != 1 { + t.Errorf("group description should be only 1, got %v\n", len(res3)) + } + if len(res3[0].Members) != 2 { + t.Errorf("should have 2 members in group , got %v\n", len(res3[0].Members)) + } + + generationId3 := atomic.LoadInt32(&m1.generationId) + if generationId3 != generationId1 { + t.Errorf("m1 generation should not increase expect %v, actual %v", generationId1, generationId2) + } + + m2.AssertCleanShutdown() + removeResp, err := admin.RemoveMemberFromConsumerGroup(groupID, []string{config2.Consumer.Group.InstanceId}) + if err != nil { + t.Fatal(err) + } + if removeResp.Err != ErrNoError { + t.Errorf("remove %s from consumer group failed %v", config2.Consumer.Group.InstanceId, removeResp.Err) + } + m1.WaitForHandlers(4) + + generationId4 := atomic.LoadInt32(&m1.generationId) + if generationId4 == generationId1 { + t.Errorf("m1 generation should increase expect %v, actual %v", generationId1, generationId2) + } +} + +func TestFuncConsumerGroupStaticMembership_Fenced(t *testing.T) { + checkKafkaVersion(t, "2.3.0") + setupFunctionalTest(t) + defer teardownFunctionalTest(t) + groupID := testFuncConsumerGroupID(t) + + t.Helper() + + config1 := NewTestConfig() + config1.ClientID = "M1" + config1.Version = V2_3_0_0 + config1.Consumer.Offsets.Initial = OffsetNewest + config1.Consumer.Group.InstanceId = "Instance1" + m1 := runTestFuncConsumerGroupMemberWithConfig(t, groupID, math.MaxInt32, config1, nil, "test.4") + defer m1.Close() + + config2 := NewTestConfig() + config2.ClientID = "M2" + config2.Version = V2_3_0_0 + config2.Consumer.Offsets.Initial = OffsetNewest + config2.Consumer.Group.InstanceId = "Instance2" + m2 := runTestFuncConsumerGroupMemberWithConfig(t, groupID, math.MaxInt32, config2, nil, "test.4") + defer m2.Close() + + m1.WaitForState(2) + m2.WaitForState(2) + + config3 := NewTestConfig() + config3.ClientID = "M3" + config3.Version = V2_3_0_0 + config3.Consumer.Offsets.Initial = OffsetNewest + config3.Consumer.Group.InstanceId = "Instance2" // same instance id as config2 + + m3 := runTestFuncConsumerGroupMemberWithConfig(t, groupID, math.MaxInt32, config3, nil, "test.4") + defer m3.Close() + + m3.WaitForState(2) + + m2.WaitForState(4) + if len(m2.errs) < 1 { + t.Errorf("expect m2 to be fenced by group instanced id, but got no err") + } + if m2.errs[0] != ErrFencedInstancedId { + t.Errorf("expect m2 to be fenced by group instanced id, but got wrong err %v", m2.errs[0]) + } + + m1.AssertCleanShutdown() + m3.AssertCleanShutdown() +} + +// -------------------------------------------------------------------- + +func testFuncConsumerGroupProduceMessage(topic string, count int) error { + client, err := NewClient(FunctionalTestEnv.KafkaBrokerAddrs, NewTestConfig()) + if err != nil { + return err + } + defer func() { _ = client.Close() }() + + producer, err := NewAsyncProducerFromClient(client) + if err != nil { + return err + } + for i := 0; i < count; i++ { + producer.Input() <- &ProducerMessage{Topic: topic, Value: ByteEncoder([]byte("testdata"))} + } + return producer.Close() +} diff --git a/heartbeat_request.go b/heartbeat_request.go index e9d9af1911..511910e712 100644 --- a/heartbeat_request.go +++ b/heartbeat_request.go @@ -1,9 +1,11 @@ package sarama type HeartbeatRequest struct { - GroupId string - GenerationId int32 - MemberId string + Version int16 + GroupId string + GenerationId int32 + MemberId string + GroupInstanceId *string } func (r *HeartbeatRequest) encode(pe packetEncoder) error { @@ -17,10 +19,17 @@ func (r *HeartbeatRequest) encode(pe packetEncoder) error { return err } + if r.Version >= 3 { + if err := pe.putNullableString(r.GroupInstanceId); err != nil { + return err + } + } + return nil } func (r *HeartbeatRequest) decode(pd packetDecoder, version int16) (err error) { + r.Version = version if r.GroupId, err = pd.getString(); err != nil { return } @@ -30,6 +39,11 @@ func (r *HeartbeatRequest) decode(pd packetDecoder, version int16) (err error) { if r.MemberId, err = pd.getString(); err != nil { return } + if r.Version >= 3 { + if r.GroupInstanceId, err = pd.getNullableString(); err != nil { + return + } + } return nil } @@ -39,7 +53,7 @@ func (r *HeartbeatRequest) key() int16 { } func (r *HeartbeatRequest) version() int16 { - return 0 + return r.Version } func (r *HeartbeatRequest) headerVersion() int16 { @@ -47,5 +61,9 @@ func (r *HeartbeatRequest) headerVersion() int16 { } func (r *HeartbeatRequest) requiredVersion() KafkaVersion { + switch { + case r.Version >= 3: + return V2_3_0_0 + } return V0_9_0_0 } diff --git a/heartbeat_request_test.go b/heartbeat_request_test.go index 2653f82c70..a8e39c7939 100644 --- a/heartbeat_request_test.go +++ b/heartbeat_request_test.go @@ -1,17 +1,81 @@ package sarama -import "testing" +import ( + "reflect" + "testing" +) -var basicHeartbeatRequest = []byte{ - 0, 3, 'f', 'o', 'o', // Group ID - 0x00, 0x01, 0x02, 0x03, // Generatiuon ID - 0, 3, 'b', 'a', 'z', // Member ID -} +var ( + basicHeartbeatRequestV0 = []byte{ + 0, 3, 'f', 'o', 'o', // Group ID + 0x00, 0x01, 0x02, 0x03, // Generatiuon ID + 0, 3, 'b', 'a', 'z', // Member ID + } + + basicHeartbeatRequestV3_GID = []byte{ + 0, 3, 'f', 'o', 'o', // Group ID + 0x00, 0x01, 0x02, 0x03, // Generatiuon ID + 0, 3, 'b', 'a', 'z', // Member ID + 0, 3, 'g', 'i', 'd', // Group Instance ID + } + basicHeartbeatRequestV3_NOGID = []byte{ + 0, 3, 'f', 'o', 'o', // Group ID + 0x00, 0x01, 0x02, 0x03, // Generatiuon ID + 0, 3, 'b', 'a', 'z', // Member ID + 255, 255, // Group Instance ID + } +) func TestHeartbeatRequest(t *testing.T) { - request := new(HeartbeatRequest) - request.GroupId = "foo" - request.GenerationId = 66051 - request.MemberId = "baz" - testRequest(t, "basic", request, basicHeartbeatRequest) + groupInstanceId := "gid" + tests := []struct { + CaseName string + Version int16 + MessageBytes []byte + Message *HeartbeatRequest + }{ + { + "v0_basic", + 0, + basicHeartbeatRequestV0, + &HeartbeatRequest{ + Version: 0, + GroupId: "foo", + GenerationId: 0x00010203, + MemberId: "baz", + }, + }, + { + "v3_basic", + 3, + basicHeartbeatRequestV3_GID, + &HeartbeatRequest{ + Version: 3, + GroupId: "foo", + GenerationId: 0x00010203, + MemberId: "baz", + GroupInstanceId: &groupInstanceId, + }, + }, + { + "v3_basic", + 3, + basicHeartbeatRequestV3_NOGID, + &HeartbeatRequest{ + Version: 3, + GroupId: "foo", + GenerationId: 0x00010203, + MemberId: "baz", + GroupInstanceId: nil, + }, + }, + } + for _, c := range tests { + testEncodable(t, c.CaseName, c.Message, c.MessageBytes) + request := new(HeartbeatRequest) + testVersionDecodable(t, c.CaseName, request, c.MessageBytes, c.Version) + if !reflect.DeepEqual(c.Message, request) { + t.Errorf("case %s decode failed, expected:%+v got %+v", c.CaseName, c.Message, request) + } + } } diff --git a/heartbeat_response.go b/heartbeat_response.go index 577ab72e57..95ef97f47a 100644 --- a/heartbeat_response.go +++ b/heartbeat_response.go @@ -1,15 +1,27 @@ package sarama type HeartbeatResponse struct { - Err KError + Version int16 + ThrottleTime int32 + Err KError } func (r *HeartbeatResponse) encode(pe packetEncoder) error { + if r.Version >= 1 { + pe.putInt32(r.ThrottleTime) + } pe.putInt16(int16(r.Err)) return nil } func (r *HeartbeatResponse) decode(pd packetDecoder, version int16) error { + var err error + r.Version = version + if r.Version >= 1 { + if r.ThrottleTime, err = pd.getInt32(); err != nil { + return err + } + } kerr, err := pd.getInt16() if err != nil { return err @@ -24,7 +36,7 @@ func (r *HeartbeatResponse) key() int16 { } func (r *HeartbeatResponse) version() int16 { - return 0 + return r.Version } func (r *HeartbeatResponse) headerVersion() int16 { @@ -32,5 +44,9 @@ func (r *HeartbeatResponse) headerVersion() int16 { } func (r *HeartbeatResponse) requiredVersion() KafkaVersion { + switch r.Version { + case 1, 2, 3: + return V2_3_0_0 + } return V0_9_0_0 } diff --git a/heartbeat_response_test.go b/heartbeat_response_test.go index c7ad5933a5..b9c5d56f1e 100644 --- a/heartbeat_response_test.go +++ b/heartbeat_response_test.go @@ -1,18 +1,66 @@ package sarama import ( - "errors" + "reflect" "testing" ) -var heartbeatResponseNoError = []byte{ - 0x00, 0x00, -} +var ( + heartbeatResponseNoError_V0 = []byte{ + 0x00, 0x00, + } + heartbeatResponseNoError_V1 = []byte{ + 0, 0, 0, 100, + 0, 0, + } + heartbeatResponseError_V1 = []byte{ + 0, 0, 0, 100, + 0, byte(ErrFencedInstancedId), + } func TestHeartbeatResponse(t *testing.T) { - response := new(HeartbeatResponse) - testVersionDecodable(t, "no error", response, heartbeatResponseNoError, 0) - if !errors.Is(response.Err, ErrNoError) { - t.Error("Decoding error failed: no error expected but found", response.Err) + tests := []struct { + CaseName string + Version int16 + MessageBytes []byte + Message *HeartbeatResponse + }{ + { + "v0_noErr", + 0, + heartbeatResponseNoError_V0, + &HeartbeatResponse{ + Version: 0, + Err: ErrNoError, + }, + }, + { + "v1_noErr", + 1, + heartbeatResponseNoError_V1, + &HeartbeatResponse{ + Version: 1, + Err: ErrNoError, + ThrottleTime: 100, + }, + }, + { + "v1_Err", + 1, + heartbeatResponseError_V1, + &HeartbeatResponse{ + Version: 1, + Err: ErrFencedInstancedId, + ThrottleTime: 100, + }, + }, + } + for _, c := range tests { + testEncodable(t, c.CaseName, c.Message, c.MessageBytes) + response := new(HeartbeatResponse) + testVersionDecodable(t, c.CaseName, response, c.MessageBytes, c.Version) + if !reflect.DeepEqual(c.Message, response) { + t.Errorf("case %s decode failed, expected:%+v got %+v", c.CaseName, c.Message, response) + } } } diff --git a/join_group_request.go b/join_group_request.go index 3734e82e40..432338cd59 100644 --- a/join_group_request.go +++ b/join_group_request.go @@ -30,6 +30,7 @@ type JoinGroupRequest struct { SessionTimeout int32 RebalanceTimeout int32 MemberId string + GroupInstanceId *string ProtocolType string GroupProtocols map[string][]byte // deprecated; use OrderedGroupProtocols OrderedGroupProtocols []*GroupProtocol @@ -46,6 +47,11 @@ func (r *JoinGroupRequest) encode(pe packetEncoder) error { if err := pe.putString(r.MemberId); err != nil { return err } + if r.Version >= 5 { + if err := pe.putNullableString(r.GroupInstanceId); err != nil { + return err + } + } if err := pe.putString(r.ProtocolType); err != nil { return err } @@ -101,6 +107,12 @@ func (r *JoinGroupRequest) decode(pd packetDecoder, version int16) (err error) { return } + if version >= 5 { + if r.GroupInstanceId, err = pd.getNullableString(); err != nil { + return + } + } + if r.ProtocolType, err = pd.getString(); err != nil { return } @@ -140,7 +152,9 @@ func (r *JoinGroupRequest) headerVersion() int16 { func (r *JoinGroupRequest) requiredVersion() KafkaVersion { switch r.Version { - case 2: + case 4, 5: + return V2_3_0_0 + case 2, 3: return V0_11_0_0 case 1: return V0_10_1_0 diff --git a/join_group_request_test.go b/join_group_request_test.go index a2e17f9802..2597b86694 100644 --- a/join_group_request_test.go +++ b/join_group_request_test.go @@ -1,6 +1,9 @@ package sarama -import "testing" +import ( + "reflect" + "testing" +) var ( joinGroupRequestV0_NoProtocols = []byte{ @@ -81,3 +84,58 @@ func TestJoinGroupRequestV1(t *testing.T) { request.GroupProtocols["one"] = []byte{0x01, 0x02, 0x03} testRequestDecode(t, "V1", request, packet) } + +var ( + joinGroupRequestV5 = []byte{ + 0, 9, 'T', 'e', 's', 't', 'G', 'r', 'o', 'u', 'p', // Group ID + 0, 0, 0, 100, // Session timeout + 0, 0, 0, 200, // Rebalance timeout + 0, 11, 'O', 'n', 'e', 'P', 'r', 'o', 't', 'o', 'c', 'o', 'l', // Member ID + 0, 3, 'g', 'i', 'd', // GroupInstanceId + 0, 8, 'c', 'o', 'n', 's', 'u', 'm', 'e', 'r', // Protocol Type + 0, 0, 0, 1, // 1 group protocol + 0, 3, 'o', 'n', 'e', // Protocol name + 0, 0, 0, 3, 0x01, 0x02, 0x03, // protocol metadata + } +) + +func TestJoinGroupRequestV3plus(t *testing.T) { + groupInstanceId := "gid" + tests := []struct { + CaseName string + Version int16 + MessageBytes []byte + Message *JoinGroupRequest + }{ + { + "v5", + 5, + joinGroupRequestV5, + &JoinGroupRequest{ + Version: 5, + GroupId: "TestGroup", + SessionTimeout: 100, + RebalanceTimeout: 200, + MemberId: "OneProtocol", + GroupInstanceId: &groupInstanceId, + ProtocolType: "consumer", + GroupProtocols: map[string][]byte{ + "one": {1, 2, 3}, + }, + OrderedGroupProtocols: []*GroupProtocol{ + {Name: "one", Metadata: []byte{1, 2, 3}}, + }, + }, + }, + } + for _, c := range tests { + request := new(JoinGroupRequest) + testVersionDecodable(t, c.CaseName, request, c.MessageBytes, c.Version) + if !reflect.DeepEqual(c.Message, request) { + t.Errorf("case %s decode failed, expected:%+v got %+v", c.CaseName, c.Message, request) + } + // This is to avoid error check "cannot specify both GroupProtocols and OrderedGroupProtocols on JoinGroupRequest" + c.Message.GroupProtocols = nil + testEncodable(t, c.CaseName, c.Message, c.MessageBytes) + } +} diff --git a/join_group_response.go b/join_group_response.go index 54b0a45c28..16342a3a4e 100644 --- a/join_group_response.go +++ b/join_group_response.go @@ -8,17 +8,23 @@ type JoinGroupResponse struct { GroupProtocol string LeaderId string MemberId string - Members map[string][]byte + Members []GroupMember +} + +type GroupMember struct { + MemberId string + GroupInstanceId *string + Metadata []byte } func (r *JoinGroupResponse) GetMembers() (map[string]ConsumerGroupMemberMetadata, error) { members := make(map[string]ConsumerGroupMemberMetadata, len(r.Members)) - for id, bin := range r.Members { + for _, member := range r.Members { meta := new(ConsumerGroupMemberMetadata) - if err := decode(bin, meta); err != nil { + if err := decode(member.Metadata, meta); err != nil { return nil, err } - members[id] = *meta + members[member.MemberId] = *meta } return members, nil } @@ -44,12 +50,16 @@ func (r *JoinGroupResponse) encode(pe packetEncoder) error { return err } - for memberId, memberMetadata := range r.Members { - if err := pe.putString(memberId); err != nil { + for _, member := range r.Members { + if err := pe.putString(member.MemberId); err != nil { return err } - - if err := pe.putBytes(memberMetadata); err != nil { + if r.Version >= 5 { + if err := pe.putNullableString(member.GroupInstanceId); err != nil { + return err + } + } + if err := pe.putBytes(member.Metadata); err != nil { return err } } @@ -97,19 +107,27 @@ func (r *JoinGroupResponse) decode(pd packetDecoder, version int16) (err error) return nil } - r.Members = make(map[string][]byte) + r.Members = make([]GroupMember, n) for i := 0; i < n; i++ { memberId, err := pd.getString() if err != nil { return err } + var groupInstanceId *string = nil + if r.Version >= 5 { + groupInstanceId, err = pd.getNullableString() + if err != nil { + return err + } + } + memberMetadata, err := pd.getBytes() if err != nil { return err } - r.Members[memberId] = memberMetadata + r.Members[i] = GroupMember{MemberId: memberId, GroupInstanceId: groupInstanceId, Metadata: memberMetadata} } return nil @@ -129,6 +147,8 @@ func (r *JoinGroupResponse) headerVersion() int16 { func (r *JoinGroupResponse) requiredVersion() KafkaVersion { switch r.Version { + case 3, 4, 5: + return V2_3_0_0 case 2: return V0_11_0_0 case 1: diff --git a/join_group_response_test.go b/join_group_response_test.go index 8ab26bb973..e163f07d16 100644 --- a/join_group_response_test.go +++ b/join_group_response_test.go @@ -112,8 +112,11 @@ func TestJoinGroupResponseV0(t *testing.T) { if len(response.Members) != 1 { t.Error("Decoding Members failed, found:", response.Members) } - if !reflect.DeepEqual(response.Members["foo"], []byte{0x01, 0x02, 0x03}) { - t.Error("Decoding foo member failed, found:", response.Members["foo"]) + if response.Members[0].MemberId != "foo" { + t.Error("Decoding MemberId failed, found:", response.Members[0].MemberId) + } + if !reflect.DeepEqual(response.Members[0].Metadata, []byte{0x01, 0x02, 0x03}) { + t.Error("Decoding foo member failed, found:", response.Members[0].Metadata) } } @@ -171,3 +174,54 @@ func TestJoinGroupResponseV2(t *testing.T) { t.Error("Decoding Members failed, found:", response.Members) } } + +var ( + joinGroupResponseV5 = []byte{ + 0, 0, 0, 100, // ThrottleTimeMs + 0x00, 0x00, // No error + 0x00, 0x01, 0x02, 0x03, // Generation ID + 0, 8, 'p', 'r', 'o', 't', 'o', 'c', 'o', 'l', // Protocol name chosen + 0, 3, 'f', 'o', 'o', // Leader ID + 0, 3, 'b', 'a', 'r', // Member ID + 0, 0, 0, 1, // One member info + 0, 3, 'm', 'i', 'd', // memeberId + 0, 3, 'g', 'i', 'd', // GroupInstanceId + 0, 0, 0, 3, 1, 2, 3, // Metadata + } +) + +func TestJoinGroupResponse3plus(t *testing.T) { + groupInstanceId := "gid" + tests := []struct { + CaseName string + Version int16 + MessageBytes []byte + Message *JoinGroupResponse + }{ + { + "v5", + 5, + joinGroupResponseV5, + &JoinGroupResponse{ + Version: 5, + ThrottleTime: 100, + Err: ErrNoError, + GenerationId: 0x00010203, + GroupProtocol: "protocol", + LeaderId: "foo", + MemberId: "bar", + Members: []GroupMember{ + {"mid", &groupInstanceId, []byte{1, 2, 3}}, + }, + }, + }, + } + for _, c := range tests { + response := new(JoinGroupResponse) + testVersionDecodable(t, c.CaseName, response, c.MessageBytes, c.Version) + if !reflect.DeepEqual(c.Message, response) { + t.Errorf("case %s decode failed, expected:%+v got %+v", c.CaseName, c.Message, response) + } + testEncodable(t, c.CaseName, c.Message, c.MessageBytes) + } +} diff --git a/leave_group_request.go b/leave_group_request.go index d7789b68db..741b7290a8 100644 --- a/leave_group_request.go +++ b/leave_group_request.go @@ -1,27 +1,69 @@ package sarama +type MemberIdentity struct { + MemberId string + GroupInstanceId *string +} + type LeaveGroupRequest struct { + Version int16 GroupId string - MemberId string + MemberId string // Removed in Version 3 + Members []MemberIdentity // Added in Version 3 } func (r *LeaveGroupRequest) encode(pe packetEncoder) error { if err := pe.putString(r.GroupId); err != nil { return err } - if err := pe.putString(r.MemberId); err != nil { - return err + if r.Version < 3 { + if err := pe.putString(r.MemberId); err != nil { + return err + } + } + if r.Version >= 3 { + if err := pe.putArrayLength(len(r.Members)); err != nil { + return err + } + for _, member := range r.Members { + if err := pe.putString(member.MemberId); err != nil { + return err + } + if err := pe.putNullableString(member.GroupInstanceId); err != nil { + return err + } + } } return nil } func (r *LeaveGroupRequest) decode(pd packetDecoder, version int16) (err error) { + r.Version = version if r.GroupId, err = pd.getString(); err != nil { return } - if r.MemberId, err = pd.getString(); err != nil { - return + if r.Version < 3 { + if r.MemberId, err = pd.getString(); err != nil { + return + } + } + if r.Version >= 3 { + memberCount, err := pd.getArrayLength() + if err != nil { + return err + } + r.Members = make([]MemberIdentity, memberCount) + for i := 0; i < memberCount; i++ { + memberIdentity := MemberIdentity{} + if memberIdentity.MemberId, err = pd.getString(); err != nil { + return err + } + if memberIdentity.GroupInstanceId, err = pd.getNullableString(); err != nil { + return err + } + r.Members[i] = memberIdentity + } } return nil @@ -32,7 +74,7 @@ func (r *LeaveGroupRequest) key() int16 { } func (r *LeaveGroupRequest) version() int16 { - return 0 + return r.Version } func (r *LeaveGroupRequest) headerVersion() int16 { @@ -40,5 +82,9 @@ func (r *LeaveGroupRequest) headerVersion() int16 { } func (r *LeaveGroupRequest) requiredVersion() KafkaVersion { + switch r.Version { + case 1, 2, 3: + return V2_3_0_0 + } return V0_9_0_0 } diff --git a/leave_group_request_test.go b/leave_group_request_test.go index b674e48b28..4b8d482f71 100644 --- a/leave_group_request_test.go +++ b/leave_group_request_test.go @@ -1,15 +1,63 @@ package sarama -import "testing" +import ( + "reflect" + "testing" +) -var basicLeaveGroupRequest = []byte{ - 0, 3, 'f', 'o', 'o', - 0, 3, 'b', 'a', 'r', -} +var ( + basicLeaveGroupRequestV0 = []byte{ + 0, 3, 'f', 'o', 'o', + 0, 3, 'b', 'a', 'r', + } + basicLeaveGroupRequestV3 = []byte{ + 0, 3, 'f', 'o', 'o', + 0, 0, 0, 2, // Two Member + 0, 4, 'm', 'i', 'd', '1', // MemberId + 255, 255, // GroupInstanceId nil + 0, 4, 'm', 'i', 'd', '2', // MemberId + 0, 3, 'g', 'i', 'd', // GroupInstanceId + } +) func TestLeaveGroupRequest(t *testing.T) { - request := new(LeaveGroupRequest) - request.GroupId = "foo" - request.MemberId = "bar" - testRequest(t, "basic", request, basicLeaveGroupRequest) + groupInstanceId := "gid" + tests := []struct { + CaseName string + Version int16 + MessageBytes []byte + Message *LeaveGroupRequest + }{ + { + "v0", + 0, + basicLeaveGroupRequestV0, + &LeaveGroupRequest{ + Version: 0, + GroupId: "foo", + MemberId: "bar", + }, + }, + { + "v3", + 3, + basicLeaveGroupRequestV3, + &LeaveGroupRequest{ + Version: 3, + GroupId: "foo", + Members: []MemberIdentity{ + {"mid1", nil}, + {"mid2", &groupInstanceId}, + }, + }, + }, + } + for _, c := range tests { + request := new(LeaveGroupRequest) + testVersionDecodable(t, c.CaseName, request, c.MessageBytes, c.Version) + if !reflect.DeepEqual(c.Message, request) { + t.Errorf("case %s decode failed, expected:%+v got %+v", c.CaseName, c.Message, request) + } + testEncodable(t, c.CaseName, c.Message, c.MessageBytes) + } } diff --git a/leave_group_response.go b/leave_group_response.go index 25f8d5eb36..18ed357e83 100644 --- a/leave_group_response.go +++ b/leave_group_response.go @@ -1,21 +1,73 @@ package sarama +type MemberResponse struct { + MemberId string + GroupInstanceId *string + Err KError +} type LeaveGroupResponse struct { - Err KError + Version int16 + ThrottleTime int32 + Err KError + Members []MemberResponse } func (r *LeaveGroupResponse) encode(pe packetEncoder) error { + if r.Version >= 1 { + pe.putInt32(r.ThrottleTime) + } pe.putInt16(int16(r.Err)) + if r.Version >= 3 { + if err := pe.putArrayLength(len(r.Members)); err != nil { + return err + } + for _, member := range r.Members { + if err := pe.putString(member.MemberId); err != nil { + return err + } + if err := pe.putNullableString(member.GroupInstanceId); err != nil { + return err + } + pe.putInt16(int16(member.Err)) + } + } return nil } func (r *LeaveGroupResponse) decode(pd packetDecoder, version int16) (err error) { + r.Version = version + if r.Version >= 1 { + if r.ThrottleTime, err = pd.getInt32(); err != nil { + return err + } + } kerr, err := pd.getInt16() if err != nil { return err } r.Err = KError(kerr) + if r.Version >= 3 { + membersLen, err := pd.getArrayLength() + if err != nil { + return err + } + r.Members = make([]MemberResponse, membersLen) + for i := 0; i < len(r.Members); i++ { + if r.Members[i].MemberId, err = pd.getString(); err != nil { + return err + } + if r.Members[i].GroupInstanceId, err = pd.getNullableString(); err != nil { + return err + } + if memberErr, err := pd.getInt16(); err != nil { + return err + } else { + r.Members[i].Err = KError(memberErr) + } + } + } + return nil } @@ -24,7 +76,7 @@ func (r *LeaveGroupResponse) key() int16 { } func (r *LeaveGroupResponse) version() int16 { - return 0 + return r.Version } func (r *LeaveGroupResponse) headerVersion() int16 { @@ -32,5 +84,9 @@ func (r *LeaveGroupResponse) headerVersion() int16 { } func (r *LeaveGroupResponse) requiredVersion() KafkaVersion { + switch r.Version { + case 1, 2, 3: + return V2_3_0_0 + } return V0_9_0_0 } diff --git a/leave_group_response_test.go b/leave_group_response_test.go index 7a499c24c5..306c721a0f 100644 --- a/leave_group_response_test.go +++ b/leave_group_response_test.go @@ -1,27 +1,87 @@ package sarama import ( - "errors" + "reflect" "testing" ) var ( - leaveGroupResponseNoError = []byte{0x00, 0x00} - leaveGroupResponseWithError = []byte{0, 25} + leaveGroupResponseV0NoError = []byte{0x00, 0x00} + leaveGroupResponseV0WithError = []byte{0, 25} + leaveGroupResponseV1NoError = []byte{ + 0, 0, 0, 100, // ThrottleTime + 0x00, 0x00, // Err + } + leaveGroupResponseV3NoError = []byte{ + 0, 0, 0, 100, // ThrottleTime + 0x00, 0x00, // Err + 0, 0, 0, 2, // Two Members + 0, 4, 'm', 'i', 'd', '1', // MemberId + 255, 255, // GroupInstanceId + 0, 0, // Err + 0, 4, 'm', 'i', 'd', '2', // MemberId + 0, 3, 'g', 'i', 'd', // GroupInstanceId + 0, 25, // Err + } ) func TestLeaveGroupResponse(t *testing.T) { - var response *LeaveGroupResponse - - response = new(LeaveGroupResponse) - testVersionDecodable(t, "no error", response, leaveGroupResponseNoError, 0) - if !errors.Is(response.Err, ErrNoError) { - t.Error("Decoding error failed: no error expected but found", response.Err) + groupInstanceId := "gid" + tests := []struct { + CaseName string + Version int16 + MessageBytes []byte + Message *LeaveGroupResponse + }{ + { + "v0-noErr", + 0, + leaveGroupResponseV0NoError, + &LeaveGroupResponse{ + Version: 0, + Err: ErrNoError, + }, + }, + { + "v0-Err", + 0, + leaveGroupResponseV0WithError, + &LeaveGroupResponse{ + Version: 0, + Err: ErrUnknownMemberId, + }, + }, + { + "v1-noErr", + 1, + leaveGroupResponseV1NoError, + &LeaveGroupResponse{ + Version: 1, + ThrottleTime: 100, + Err: ErrNoError, + }, + }, + { + "v3", + 3, + leaveGroupResponseV3NoError, + &LeaveGroupResponse{ + Version: 3, + ThrottleTime: 100, + Err: ErrNoError, + Members: []MemberResponse{ + {"mid1", nil, ErrNoError}, + {"mid2", &groupInstanceId, ErrUnknownMemberId}, + }, + }, + }, } - - response = new(LeaveGroupResponse) - testVersionDecodable(t, "with error", response, leaveGroupResponseWithError, 0) - if !errors.Is(response.Err, ErrUnknownMemberId) { - t.Error("Decoding error failed: ErrUnknownMemberId expected but found", response.Err) + for _, c := range tests { + response := new(LeaveGroupResponse) + testVersionDecodable(t, c.CaseName, response, c.MessageBytes, c.Version) + if !reflect.DeepEqual(c.Message, response) { + t.Errorf("case %s decode failed, expected:%+v got %+v", c.CaseName, c.Message, response) + } + testEncodable(t, c.CaseName, c.Message, c.MessageBytes) } } diff --git a/mockresponses.go b/mockresponses.go index dcc361738e..a9c8519a7d 100644 --- a/mockresponses.go +++ b/mockresponses.go @@ -1189,13 +1189,13 @@ type MockJoinGroupResponse struct { GroupProtocol string LeaderId string MemberId string - Members map[string][]byte + Members []GroupMember } func NewMockJoinGroupResponse(t TestReporter) *MockJoinGroupResponse { return &MockJoinGroupResponse{ t: t, - Members: make(map[string][]byte), + Members: make([]GroupMember, 0), } } @@ -1249,7 +1249,7 @@ func (m *MockJoinGroupResponse) SetMember(id string, meta *ConsumerGroupMemberMe if err != nil { panic(fmt.Sprintf("error encoding member metadata: %v", err)) } - m.Members[id] = bin + m.Members = append(m.Members, GroupMember{MemberId: id, Metadata: bin}) return m } diff --git a/offset_commit_request.go b/offset_commit_request.go index 9931cade51..5dd88220d9 100644 --- a/offset_commit_request.go +++ b/offset_commit_request.go @@ -13,9 +13,10 @@ const ReceiveTime int64 = -1 const GroupGenerationUndefined = -1 type offsetCommitRequestBlock struct { - offset int64 - timestamp int64 - metadata string + offset int64 + timestamp int64 + committedLeaderEpoch int32 + metadata string } func (b *offsetCommitRequestBlock) encode(pe packetEncoder, version int16) error { @@ -25,6 +26,9 @@ func (b *offsetCommitRequestBlock) encode(pe packetEncoder, version int16) error } else if b.timestamp != 0 { Logger.Println("Non-zero timestamp specified for OffsetCommitRequest not v1, it will be ignored") } + if version >= 6 { + pe.putInt32(b.committedLeaderEpoch) + } return pe.putString(b.metadata) } @@ -38,15 +42,22 @@ func (b *offsetCommitRequestBlock) decode(pd packetDecoder, version int16) (err return err } } + if version >= 6 { + if b.committedLeaderEpoch, err = pd.getInt32(); err != nil { + return err + } + } + b.metadata, err = pd.getString() return err } type OffsetCommitRequest struct { ConsumerGroup string - ConsumerGroupGeneration int32 // v1 or later - ConsumerID string // v1 or later - RetentionTime int64 // v2 or later + ConsumerGroupGeneration int32 // v1 or later + ConsumerID string // v1 or later + GroupInstanceId *string // v7 or later + RetentionTime int64 // v2 or later // Version can be: // - 0 (kafka 0.8.1 and later) @@ -54,12 +65,14 @@ type OffsetCommitRequest struct { // - 2 (kafka 0.9.0 and later) // - 3 (kafka 0.11.0 and later) // - 4 (kafka 2.0.0 and later) + // - 5&6 (kafka 2.1.0 and later) + // - 7 (kafka 2.3.0 and later) Version int16 blocks map[string]map[int32]*offsetCommitRequestBlock } func (r *OffsetCommitRequest) encode(pe packetEncoder) error { - if r.Version < 0 || r.Version > 4 { + if r.Version < 0 || r.Version > 7 { return PacketEncodingError{"invalid or unsupported OffsetCommitRequest version field"} } @@ -81,12 +94,19 @@ func (r *OffsetCommitRequest) encode(pe packetEncoder) error { } } - if r.Version >= 2 { + // Version 5 removes RetentionTime, which is now controlled only by a broker configuration. + if r.Version >= 2 && r.Version <= 4 { pe.putInt64(r.RetentionTime) } else if r.RetentionTime != 0 { Logger.Println("Non-zero RetentionTime specified for OffsetCommitRequest version <2, it will be ignored") } + if r.Version >= 7 { + if err := pe.putNullableString(r.GroupInstanceId); err != nil { + return err + } + } + if err := pe.putArrayLength(len(r.blocks)); err != nil { return err } @@ -123,12 +143,19 @@ func (r *OffsetCommitRequest) decode(pd packetDecoder, version int16) (err error } } - if r.Version >= 2 { + // Version 5 removes RetentionTime, which is now controlled only by a broker configuration. + if r.Version >= 2 && r.Version <= 4 { if r.RetentionTime, err = pd.getInt64(); err != nil { return err } } + if r.Version >= 7 { + if r.GroupInstanceId, err = pd.getNullableString(); err != nil { + return err + } + } + topicCount, err := pd.getArrayLength() if err != nil { return err @@ -184,12 +211,16 @@ func (r *OffsetCommitRequest) requiredVersion() KafkaVersion { return V0_11_0_0 case 4: return V2_0_0_0 + case 5, 6: + return V2_1_0_0 + case 7: + return V2_3_0_0 default: return MinVersion } } -func (r *OffsetCommitRequest) AddBlock(topic string, partitionID int32, offset int64, timestamp int64, metadata string) { +func (r *OffsetCommitRequest) AddBlock(topic string, partitionID int32, offset int64, leaderEpoch int32, timestamp int64, metadata string) { if r.blocks == nil { r.blocks = make(map[string]map[int32]*offsetCommitRequestBlock) } @@ -198,7 +229,7 @@ func (r *OffsetCommitRequest) AddBlock(topic string, partitionID int32, offset i r.blocks[topic] = make(map[int32]*offsetCommitRequestBlock) } - r.blocks[topic][partitionID] = &offsetCommitRequestBlock{offset, timestamp, metadata} + r.blocks[topic][partitionID] = &offsetCommitRequestBlock{offset, timestamp, leaderEpoch, metadata} } func (r *OffsetCommitRequest) Offset(topic string, partitionID int32) (int64, string, error) { diff --git a/offset_commit_request_test.go b/offset_commit_request_test.go index 06bbd40369..eba22bc8ff 100644 --- a/offset_commit_request_test.go +++ b/offset_commit_request_test.go @@ -2,6 +2,7 @@ package sarama import ( "fmt" + "reflect" "testing" ) @@ -69,7 +70,7 @@ func TestOffsetCommitRequestV0(t *testing.T) { request.ConsumerGroup = "foobar" testRequest(t, "no blocks v0", request, offsetCommitRequestNoBlocksV0) - request.AddBlock("topic", 0x5221, 0xDEADBEEF, 0, "metadata") + request.AddBlock("topic", 0x5221, 0xDEADBEEF, 0, 0, "metadata") testRequest(t, "one block v0", request, offsetCommitRequestOneBlockV0) } @@ -81,7 +82,7 @@ func TestOffsetCommitRequestV1(t *testing.T) { request.Version = 1 testRequest(t, "no blocks v1", request, offsetCommitRequestNoBlocksV1) - request.AddBlock("topic", 0x5221, 0xDEADBEEF, ReceiveTime, "metadata") + request.AddBlock("topic", 0x5221, 0xDEADBEEF, 0, ReceiveTime, "metadata") testRequest(t, "one block v1", request, offsetCommitRequestOneBlockV1) } @@ -95,7 +96,114 @@ func TestOffsetCommitRequestV2ToV4(t *testing.T) { request.Version = int16(version) testRequest(t, fmt.Sprintf("no blocks v%d", version), request, offsetCommitRequestNoBlocksV2) - request.AddBlock("topic", 0x5221, 0xDEADBEEF, 0, "metadata") + request.AddBlock("topic", 0x5221, 0xDEADBEEF, 0, 0, "metadata") testRequest(t, fmt.Sprintf("one block v%d", version), request, offsetCommitRequestOneBlockV2) } } + +var ( + offsetCommitRequestOneBlockV5 = []byte{ + 0, 3, 'f', 'o', 'o', // GroupId + 0x00, 0x00, 0x00, 0x01, // GenerationId + 0, 3, 'm', 'i', 'd', // MemberId + 0, 0, 0, 1, // One Topic + 0, 5, 't', 'o', 'p', 'i', 'c', // Name + 0, 0, 0, 1, // One Partition + 0, 0, 0, 1, // PartitionIndex + 0, 0, 0, 0, 0, 0, 0, 2, // CommittedOffset + 0, 4, 'm', 'e', 't', 'a', // CommittedMetadata + } + offsetCommitRequestOneBlockV6 = []byte{ + 0, 3, 'f', 'o', 'o', // GroupId + 0x00, 0x00, 0x00, 0x01, // GenerationId + 0, 3, 'm', 'i', 'd', // MemberId + 0, 0, 0, 1, // One Topic + 0, 5, 't', 'o', 'p', 'i', 'c', // Name + 0, 0, 0, 1, // One Partition + 0, 0, 0, 1, // PartitionIndex + 0, 0, 0, 0, 0, 0, 0, 2, // CommittedOffset + 0, 0, 0, 3, // CommittedEpoch + 0, 4, 'm', 'e', 't', 'a', // CommittedMetadata + } + offsetCommitRequestOneBlockV7 = []byte{ + 0, 3, 'f', 'o', 'o', // GroupId + 0x00, 0x00, 0x00, 0x01, // GenerationId + 0, 3, 'm', 'i', 'd', // MemberId + 0, 3, 'g', 'i', 'd', // MemberId + 0, 0, 0, 1, // One Topic + 0, 5, 't', 'o', 'p', 'i', 'c', // Name + 0, 0, 0, 1, // One Partition + 0, 0, 0, 1, // PartitionIndex + 0, 0, 0, 0, 0, 0, 0, 2, // CommittedOffset + 0, 0, 0, 3, // CommittedEpoch + 0, 4, 'm', 'e', 't', 'a', // CommittedMetadata + } +) + +func TestOffsetCommitRequestV5AndPlus(t *testing.T) { + groupInstanceId := "gid" + tests := []struct { + CaseName string + Version int16 + MessageBytes []byte + Message *OffsetCommitRequest + }{ + { + "v5", + 5, + offsetCommitRequestOneBlockV5, + &OffsetCommitRequest{ + Version: 5, + ConsumerGroup: "foo", + ConsumerGroupGeneration: 1, + ConsumerID: "mid", + blocks: map[string]map[int32]*offsetCommitRequestBlock{ + "topic": { + 1: &offsetCommitRequestBlock{offset: 2, metadata: "meta"}, + }, + }, + }, + }, + { + "v6", + 6, + offsetCommitRequestOneBlockV6, + &OffsetCommitRequest{ + Version: 6, + ConsumerGroup: "foo", + ConsumerGroupGeneration: 1, + ConsumerID: "mid", + blocks: map[string]map[int32]*offsetCommitRequestBlock{ + "topic": { + 1: &offsetCommitRequestBlock{offset: 2, metadata: "meta", committedLeaderEpoch: 3}, + }, + }, + }, + }, + { + "v7", + 7, + offsetCommitRequestOneBlockV7, + &OffsetCommitRequest{ + Version: 7, + ConsumerGroup: "foo", + ConsumerGroupGeneration: 1, + ConsumerID: "mid", + GroupInstanceId: &groupInstanceId, + blocks: map[string]map[int32]*offsetCommitRequestBlock{ + "topic": { + 1: &offsetCommitRequestBlock{offset: 2, metadata: "meta", committedLeaderEpoch: 3}, + }, + }, + }, + }, + } + for _, c := range tests { + request := new(OffsetCommitRequest) + testVersionDecodable(t, c.CaseName, request, c.MessageBytes, c.Version) + if !reflect.DeepEqual(c.Message, request) { + t.Errorf("case %s decode failed, expected:%+v got %+v", c.CaseName, c.Message, request) + } + testEncodable(t, c.CaseName, c.Message, c.MessageBytes) + } +} diff --git a/offset_commit_response.go b/offset_commit_response.go index 342260ef59..4bed269aa5 100644 --- a/offset_commit_response.go +++ b/offset_commit_response.go @@ -108,6 +108,8 @@ func (r *OffsetCommitResponse) requiredVersion() KafkaVersion { return V0_11_0_0 case 4: return V2_0_0_0 + case 5, 6, 7: + return V2_3_0_0 default: return MinVersion } diff --git a/offset_commit_response_test.go b/offset_commit_response_test.go index f35ca54ec5..c580503071 100644 --- a/offset_commit_response_test.go +++ b/offset_commit_response_test.go @@ -2,16 +2,83 @@ package sarama import ( "fmt" + "reflect" "testing" ) -var emptyOffsetCommitResponse = []byte{ - 0x00, 0x00, 0x00, 0x00, -} +var ( + emptyOffsetCommitResponseV0 = []byte{ + 0x00, 0x00, 0x00, 0x00, // Empty topic + } + noEmptyOffsetCommitResponseV0 = []byte{ + 0, 0, 0, 1, // Topic Len + 0, 5, 't', 'o', 'p', 'i', 'c', // Name + 0, 0, 0, 1, // Partition Len + 0, 0, 0, 3, // PartitionIndex + 0, 0, // ErrorCode + } + noEmptyOffsetCommitResponseV3 = []byte{ + 0, 0, 0, 100, // ThrottleTimeMs + 0, 0, 0, 1, // Topic Len + 0, 5, 't', 'o', 'p', 'i', 'c', // Name + 0, 0, 0, 1, // Partition Len + 0, 0, 0, 3, // PartitionIndex + 0, 0, // ErrorCode + } +) func TestEmptyOffsetCommitResponse(t *testing.T) { - response := OffsetCommitResponse{} - testResponse(t, "empty", &response, emptyOffsetCommitResponse) + //groupInstanceId := "gid" + tests := []struct { + CaseName string + Version int16 + MessageBytes []byte + Message *OffsetCommitResponse + }{ + { + "v0-empty", + 0, + emptyOffsetCommitResponseV0, + &OffsetCommitResponse{ + Version: 0, + }, + }, + { + "v0-two-partition", + 0, + noEmptyOffsetCommitResponseV0, + &OffsetCommitResponse{ + Version: 0, + Errors: map[string]map[int32]KError{ + "topic": { + 3: ErrNoError, + }, + }, + }, + }, + { + "v3", + 3, + noEmptyOffsetCommitResponseV3, + &OffsetCommitResponse{ + ThrottleTimeMs: 100, + Version: 3, + Errors: map[string]map[int32]KError{ + "topic": { + 3: ErrNoError, + }, + }, + }, + }, + } + for _, c := range tests { + response := new(OffsetCommitResponse) + testVersionDecodable(t, c.CaseName, response, c.MessageBytes, c.Version) + if !reflect.DeepEqual(c.Message, response) { + t.Errorf("case %s decode failed, expected:%+v got %+v", c.CaseName, c.Message, response) + } + testEncodable(t, c.CaseName, c.Message, c.MessageBytes) + } } func TestNormalOffsetCommitResponse(t *testing.T) { diff --git a/offset_manager.go b/offset_manager.go index 4f480a08b9..1ea15ff939 100644 --- a/offset_manager.go +++ b/offset_manager.go @@ -26,13 +26,15 @@ type OffsetManager interface { } type offsetManager struct { - client Client - conf *Config - group string - ticker *time.Ticker + client Client + conf *Config + group string + ticker *time.Ticker + sessionCanceler func() - memberID string - generation int32 + memberID string + groupInstanceId *string + generation int32 broker *Broker brokerLock sync.RWMutex @@ -48,10 +50,10 @@ type offsetManager struct { // NewOffsetManagerFromClient creates a new OffsetManager from the given client. // It is still necessary to call Close() on the underlying client when finished with the partition manager. func NewOffsetManagerFromClient(group string, client Client) (OffsetManager, error) { - return newOffsetManagerFromClient(group, "", GroupGenerationUndefined, client) + return newOffsetManagerFromClient(group, "", GroupGenerationUndefined, client, nil) } -func newOffsetManagerFromClient(group, memberID string, generation int32, client Client) (*offsetManager, error) { +func newOffsetManagerFromClient(group, memberID string, generation int32, client Client, sessionCanceler func()) (*offsetManager, error) { // Check that we are not dealing with a closed Client before processing any other arguments if client.Closed() { return nil, ErrClosedClient @@ -59,10 +61,11 @@ func newOffsetManagerFromClient(group, memberID string, generation int32, client conf := client.Config() om := &offsetManager{ - client: client, - conf: conf, - group: group, - poms: make(map[string]map[int32]*partitionOffsetManager), + client: client, + conf: conf, + group: group, + poms: make(map[string]map[int32]*partitionOffsetManager), + sessionCanceler: sessionCanceler, memberID: memberID, generation: generation, @@ -70,6 +73,9 @@ func newOffsetManagerFromClient(group, memberID string, generation int32, client closing: make(chan none), closed: make(chan none), } + if conf.Consumer.Group.InstanceId != "" { + om.groupInstanceId = &conf.Consumer.Group.InstanceId + } if conf.Consumer.Offsets.AutoCommit.Enable { om.ticker = time.NewTicker(conf.Consumer.Offsets.AutoCommit.Interval) go withRecover(om.mainLoop) @@ -138,11 +144,11 @@ func (om *offsetManager) computeBackoff(retries int) time.Duration { } } -func (om *offsetManager) fetchInitialOffset(topic string, partition int32, retries int) (int64, string, error) { +func (om *offsetManager) fetchInitialOffset(topic string, partition int32, retries int) (int64, int32, string, error) { broker, err := om.coordinator() if err != nil { if retries <= 0 { - return 0, "", err + return 0, 0, "", err } return om.fetchInitialOffset(topic, partition, retries-1) } @@ -155,7 +161,7 @@ func (om *offsetManager) fetchInitialOffset(topic string, partition int32, retri resp, err := broker.FetchOffset(req) if err != nil { if retries <= 0 { - return 0, "", err + return 0, 0, "", err } om.releaseCoordinator(broker) return om.fetchInitialOffset(topic, partition, retries-1) @@ -163,31 +169,31 @@ func (om *offsetManager) fetchInitialOffset(topic string, partition int32, retri block := resp.GetBlock(topic, partition) if block == nil { - return 0, "", ErrIncompleteResponse + return 0, 0, "", ErrIncompleteResponse } switch block.Err { case ErrNoError: - return block.Offset, block.Metadata, nil + return block.Offset, block.LeaderEpoch, block.Metadata, nil case ErrNotCoordinatorForConsumer: if retries <= 0 { - return 0, "", block.Err + return 0, 0, "", block.Err } om.releaseCoordinator(broker) return om.fetchInitialOffset(topic, partition, retries-1) case ErrOffsetsLoadInProgress: if retries <= 0 { - return 0, "", block.Err + return 0, 0, "", block.Err } backoff := om.computeBackoff(retries) select { case <-om.closing: - return 0, "", block.Err + return 0, 0, "", block.Err case <-time.After(backoff): } return om.fetchInitialOffset(topic, partition, retries-1) default: - return 0, "", block.Err + return 0, 0, "", block.Err } } @@ -298,12 +304,17 @@ func (om *offsetManager) constructRequest() *OffsetCommitRequest { for _, pom := range topicManagers { pom.lock.Lock() if pom.dirty { - r.AddBlock(pom.topic, pom.partition, pom.offset, perPartitionTimestamp, pom.metadata) + r.AddBlock(pom.topic, pom.partition, pom.offset, pom.leaderEpoch, perPartitionTimestamp, pom.metadata) } pom.lock.Unlock() } } + if om.groupInstanceId != nil { + r.Version = 7 + r.GroupInstanceId = om.groupInstanceId + } + if len(r.blocks) > 0 { return r } @@ -346,6 +357,10 @@ func (om *offsetManager) handleResponse(broker *Broker, req *OffsetCommitRequest pom.handleError(err) case ErrOffsetsLoadInProgress: // nothing wrong but we didn't commit, we'll get it next time round + case ErrFencedInstancedId: + pom.handleError(err) + // TODO close the whole consumer for instacne fenced.... + om.tryCancelSession() case ErrUnknownTopicOrPartition: // let the user know *and* try redispatching - if topic-auto-create is // enabled, redispatching should trigger a metadata req and create the @@ -420,6 +435,12 @@ func (om *offsetManager) findPOM(topic string, partition int32) *partitionOffset return nil } +func (om *offsetManager) tryCancelSession() { + if om.sessionCanceler != nil { + om.sessionCanceler() + } +} + // Partition Offset Manager // PartitionOffsetManager uses Kafka to store and fetch consumed partition offsets. You MUST call Close() @@ -476,9 +497,10 @@ type PartitionOffsetManager interface { } type partitionOffsetManager struct { - parent *offsetManager - topic string - partition int32 + parent *offsetManager + topic string + partition int32 + leaderEpoch int32 lock sync.Mutex offset int64 @@ -491,18 +513,19 @@ type partitionOffsetManager struct { } func (om *offsetManager) newPartitionOffsetManager(topic string, partition int32) (*partitionOffsetManager, error) { - offset, metadata, err := om.fetchInitialOffset(topic, partition, om.conf.Metadata.Retry.Max) + offset, leaderEpoch, metadata, err := om.fetchInitialOffset(topic, partition, om.conf.Metadata.Retry.Max) if err != nil { return nil, err } return &partitionOffsetManager{ - parent: om, - topic: topic, - partition: partition, - errors: make(chan *ConsumerError, om.conf.ChannelBufferSize), - offset: offset, - metadata: metadata, + parent: om, + topic: topic, + partition: partition, + leaderEpoch: leaderEpoch, + errors: make(chan *ConsumerError, om.conf.ChannelBufferSize), + offset: offset, + metadata: metadata, }, nil } diff --git a/sync_group_request.go b/sync_group_request.go index ac6ecb13e0..4a27286fe9 100644 --- a/sync_group_request.go +++ b/sync_group_request.go @@ -1,9 +1,11 @@ package sarama type SyncGroupRequest struct { + Version int16 GroupId string GenerationId int32 MemberId string + GroupInstanceId *string GroupAssignments map[string][]byte } @@ -18,6 +20,12 @@ func (r *SyncGroupRequest) encode(pe packetEncoder) error { return err } + if r.Version >= 3 { + if err := pe.putNullableString(r.GroupInstanceId); err != nil { + return err + } + } + if err := pe.putArrayLength(len(r.GroupAssignments)); err != nil { return err } @@ -34,6 +42,8 @@ func (r *SyncGroupRequest) encode(pe packetEncoder) error { } func (r *SyncGroupRequest) decode(pd packetDecoder, version int16) (err error) { + r.Version = version + if r.GroupId, err = pd.getString(); err != nil { return } @@ -43,6 +53,11 @@ func (r *SyncGroupRequest) decode(pd packetDecoder, version int16) (err error) { if r.MemberId, err = pd.getString(); err != nil { return } + if r.Version >= 3 { + if r.GroupInstanceId, err = pd.getNullableString(); err != nil { + return + } + } n, err := pd.getArrayLength() if err != nil { @@ -74,7 +89,7 @@ func (r *SyncGroupRequest) key() int16 { } func (r *SyncGroupRequest) version() int16 { - return 0 + return r.Version } func (r *SyncGroupRequest) headerVersion() int16 { @@ -82,6 +97,10 @@ func (r *SyncGroupRequest) headerVersion() int16 { } func (r *SyncGroupRequest) requiredVersion() KafkaVersion { + switch { + case r.Version >= 3: + return V2_3_0_0 + } return V0_9_0_0 } diff --git a/sync_group_request_test.go b/sync_group_request_test.go index 3f537ef9fb..e3f8f6a0a1 100644 --- a/sync_group_request_test.go +++ b/sync_group_request_test.go @@ -1,6 +1,9 @@ package sarama -import "testing" +import ( + "reflect" + "testing" +) var ( emptySyncGroupRequest = []byte{ @@ -36,3 +39,49 @@ func TestSyncGroupRequest(t *testing.T) { request.AddGroupAssignment("baz", []byte("foo")) testRequest(t, "populated", request, populatedSyncGroupRequest) } + +var ( + populatedSyncGroupRequestV3 = []byte{ + 0, 3, 'f', 'o', 'o', // Group ID + 0x00, 0x01, 0x02, 0x03, // Generation ID + 0, 3, 'b', 'a', 'z', // Member ID + 0, 3, 'g', 'i', 'd', // GroupInstance ID + 0, 0, 0, 1, // one assignment + 0, 3, 'b', 'a', 'z', // Member ID + 0, 0, 0, 3, 'f', 'o', 'o', // Member assignment + } +) + +func TestSyncGroupRequestV3AndPlus(t *testing.T) { + groupInstanceId := "gid" + tests := []struct { + CaseName string + Version int16 + MessageBytes []byte + Message *SyncGroupRequest + }{ + { + "v3", + 3, + populatedSyncGroupRequestV3, + &SyncGroupRequest{ + Version: 3, + GroupId: "foo", + GenerationId: 0x00010203, + MemberId: "baz", + GroupInstanceId: &groupInstanceId, + GroupAssignments: map[string][]byte{ + "baz": []byte("foo"), + }, + }, + }, + } + for _, c := range tests { + request := new(SyncGroupRequest) + testVersionDecodable(t, c.CaseName, request, c.MessageBytes, c.Version) + if !reflect.DeepEqual(c.Message, request) { + t.Errorf("case %s decode failed, expected:%+v got %+v", c.CaseName, c.Message, request) + } + testEncodable(t, c.CaseName, c.Message, c.MessageBytes) + } +} diff --git a/sync_group_response.go b/sync_group_response.go index af019c42f9..2943f74040 100644 --- a/sync_group_response.go +++ b/sync_group_response.go @@ -1,6 +1,8 @@ package sarama type SyncGroupResponse struct { + Version int16 + ThrottleTime int32 Err KError MemberAssignment []byte } @@ -12,11 +14,20 @@ func (r *SyncGroupResponse) GetMemberAssignment() (*ConsumerGroupMemberAssignmen } func (r *SyncGroupResponse) encode(pe packetEncoder) error { + if r.Version >= 1 { + pe.putInt32(r.ThrottleTime) + } pe.putInt16(int16(r.Err)) return pe.putBytes(r.MemberAssignment) } func (r *SyncGroupResponse) decode(pd packetDecoder, version int16) (err error) { + r.Version = version + if r.Version >= 1 { + if r.ThrottleTime, err = pd.getInt32(); err != nil { + return err + } + } kerr, err := pd.getInt16() if err != nil { return err @@ -33,7 +44,7 @@ func (r *SyncGroupResponse) key() int16 { } func (r *SyncGroupResponse) version() int16 { - return 0 + return r.Version } func (r *SyncGroupResponse) headerVersion() int16 { @@ -41,5 +52,9 @@ func (r *SyncGroupResponse) headerVersion() int16 { } func (r *SyncGroupResponse) requiredVersion() KafkaVersion { + switch r.Version { + case 1, 2, 3: + return V2_3_0_0 + } return V0_9_0_0 } diff --git a/sync_group_response_test.go b/sync_group_response_test.go index a059be4c8b..8943153f6a 100644 --- a/sync_group_response_test.go +++ b/sync_group_response_test.go @@ -1,41 +1,73 @@ package sarama import ( - "errors" "reflect" "testing" ) var ( - syncGroupResponseNoError = []byte{ + syncGroupResponseV0NoError = []byte{ 0x00, 0x00, // No error 0, 0, 0, 3, 0x01, 0x02, 0x03, // Member assignment data } - syncGroupResponseWithError = []byte{ + syncGroupResponseV0WithError = []byte{ 0, 27, // ErrRebalanceInProgress 0, 0, 0, 0, // No member assignment data } -) - -func TestSyncGroupResponse(t *testing.T) { - var response *SyncGroupResponse - response = new(SyncGroupResponse) - testVersionDecodable(t, "no error", response, syncGroupResponseNoError, 0) - if !errors.Is(response.Err, ErrNoError) { - t.Error("Decoding Err failed: no error expected but found", response.Err) - } - if !reflect.DeepEqual(response.MemberAssignment, []byte{0x01, 0x02, 0x03}) { - t.Error("Decoding MemberAssignment failed, found:", response.MemberAssignment) + syncGroupResponseV1NoError = []byte{ + 0, 0, 0, 100, // ThrottleTimeMs + 0x00, 0x00, // No error + 0, 0, 0, 3, 0x01, 0x02, 0x03, // Member assignment data } +) - response = new(SyncGroupResponse) - testVersionDecodable(t, "no error", response, syncGroupResponseWithError, 0) - if !errors.Is(response.Err, ErrRebalanceInProgress) { - t.Error("Decoding Err failed: ErrRebalanceInProgress expected but found", response.Err) +func TestSyncGroupResponse(t *testing.T) { + tests := []struct { + CaseName string + Version int16 + MessageBytes []byte + Message *SyncGroupResponse + }{ + { + "v0-noErr", + 0, + syncGroupResponseV0NoError, + &SyncGroupResponse{ + Version: 0, + Err: ErrNoError, + MemberAssignment: []byte{1, 2, 3}, + }, + }, + { + "v0-Err", + 0, + syncGroupResponseV0WithError, + &SyncGroupResponse{ + Version: 0, + Err: ErrRebalanceInProgress, + MemberAssignment: []byte{}, + }, + }, + { + "v1-noErr", + 1, + syncGroupResponseV1NoError, + &SyncGroupResponse{ + ThrottleTime: 100, + Version: 1, + Err: ErrNoError, + MemberAssignment: []byte{1, 2, 3}, + }, + }, } - if !reflect.DeepEqual(response.MemberAssignment, []byte{}) { - t.Error("Decoding MemberAssignment failed, found:", response.MemberAssignment) + for _, c := range tests { + response := new(SyncGroupResponse) + testVersionDecodable(t, c.CaseName, response, c.MessageBytes, c.Version) + if !reflect.DeepEqual(c.Message, response) { + t.Errorf("case %s decode failed, expected:%+v got %+v", c.CaseName, c.Message, response) + } + testEncodable(t, c.CaseName, c.Message, c.MessageBytes) } }