diff --git a/describe_groups_request.go b/describe_groups_request.go new file mode 100644 index 000000000..c9426a6b7 --- /dev/null +++ b/describe_groups_request.go @@ -0,0 +1,26 @@ +package sarama + +type DescribeGroupsRequest struct { + Groups []string +} + +func (r *DescribeGroupsRequest) encode(pe packetEncoder) error { + return pe.putStringArray(r.Groups) +} + +func (r *DescribeGroupsRequest) decode(pd packetDecoder) (err error) { + r.Groups, err = pd.getStringArray() + return +} + +func (r *DescribeGroupsRequest) key() int16 { + return 15 +} + +func (r *DescribeGroupsRequest) version() int16 { + return 0 +} + +func (r *DescribeGroupsRequest) AddGroup(group string) { + r.Groups = append(r.Groups, group) +} diff --git a/describe_groups_request_test.go b/describe_groups_request_test.go new file mode 100644 index 000000000..7d45f3fee --- /dev/null +++ b/describe_groups_request_test.go @@ -0,0 +1,34 @@ +package sarama + +import "testing" + +var ( + emptyDescribeGroupsRequest = []byte{0, 0, 0, 0} + + singleDescribeGroupsRequest = []byte{ + 0, 0, 0, 1, // 1 group + 0, 3, 'f', 'o', 'o', // group name: foo + } + + doubleDescribeGroupsRequest = []byte{ + 0, 0, 0, 2, // 2 groups + 0, 3, 'f', 'o', 'o', // group name: foo + 0, 3, 'b', 'a', 'r', // group name: foo + } +) + +func TestDescribeGroupsRequest(t *testing.T) { + var request *DescribeGroupsRequest + + request = new(DescribeGroupsRequest) + testRequest(t, "no groups", request, emptyDescribeGroupsRequest) + + request = new(DescribeGroupsRequest) + request.AddGroup("foo") + testRequest(t, "one group", request, singleDescribeGroupsRequest) + + request = new(DescribeGroupsRequest) + request.AddGroup("foo") + request.AddGroup("bar") + testRequest(t, "two groups", request, doubleDescribeGroupsRequest) +} diff --git a/describe_groups_response.go b/describe_groups_response.go new file mode 100644 index 000000000..b4b32dd8b --- /dev/null +++ b/describe_groups_response.go @@ -0,0 +1,162 @@ +package sarama + +type DescribeGroupsResponse struct { + Groups []*GroupDescription +} + +func (r *DescribeGroupsResponse) encode(pe packetEncoder) error { + if err := pe.putArrayLength(len(r.Groups)); err != nil { + return err + } + + for _, groupDescription := range r.Groups { + if err := groupDescription.encode(pe); err != nil { + return err + } + } + + return nil +} + +func (r *DescribeGroupsResponse) decode(pd packetDecoder) (err error) { + n, err := pd.getArrayLength() + if err != nil { + return err + } + + r.Groups = make([]*GroupDescription, n) + for i := 0; i < n; i++ { + r.Groups[i] = new(GroupDescription) + if err := r.Groups[i].decode(pd); err != nil { + return err + } + } + + return nil +} + +type GroupDescription struct { + Err KError + GroupId string + State string + ProtocolType string + Protocol string + Members map[string]*GroupMemberDescription +} + +func (gd *GroupDescription) encode(pe packetEncoder) error { + pe.putInt16(int16(gd.Err)) + + if err := pe.putString(gd.GroupId); err != nil { + return err + } + if err := pe.putString(gd.State); err != nil { + return err + } + if err := pe.putString(gd.ProtocolType); err != nil { + return err + } + if err := pe.putString(gd.Protocol); err != nil { + return err + } + + if err := pe.putArrayLength(len(gd.Members)); err != nil { + return err + } + + for memberId, groupMemberDescription := range gd.Members { + if err := pe.putString(memberId); err != nil { + return err + } + if err := groupMemberDescription.encode(pe); err != nil { + return err + } + } + + return nil +} + +func (gd *GroupDescription) decode(pd packetDecoder) (err error) { + if kerr, err := pd.getInt16(); err != nil { + return err + } else { + gd.Err = KError(kerr) + } + + if gd.GroupId, err = pd.getString(); err != nil { + return + } + if gd.State, err = pd.getString(); err != nil { + return + } + if gd.ProtocolType, err = pd.getString(); err != nil { + return + } + if gd.Protocol, err = pd.getString(); err != nil { + return + } + + n, err := pd.getArrayLength() + if err != nil { + return err + } + if n == 0 { + return nil + } + + gd.Members = make(map[string]*GroupMemberDescription) + for i := 0; i < n; i++ { + memberId, err := pd.getString() + if err != nil { + return err + } + + gd.Members[memberId] = new(GroupMemberDescription) + if err := gd.Members[memberId].decode(pd); err != nil { + return err + } + } + + return nil +} + +type GroupMemberDescription struct { + ClientId string + ClientHost string + MemberMetadata []byte + MemberAssignment []byte +} + +func (gmd *GroupMemberDescription) encode(pe packetEncoder) error { + if err := pe.putString(gmd.ClientId); err != nil { + return err + } + if err := pe.putString(gmd.ClientHost); err != nil { + return err + } + if err := pe.putBytes(gmd.MemberMetadata); err != nil { + return err + } + if err := pe.putBytes(gmd.MemberAssignment); err != nil { + return err + } + + return nil +} + +func (gmd *GroupMemberDescription) decode(pd packetDecoder) (err error) { + if gmd.ClientId, err = pd.getString(); err != nil { + return + } + if gmd.ClientHost, err = pd.getString(); err != nil { + return + } + if gmd.MemberMetadata, err = pd.getBytes(); err != nil { + return + } + if gmd.MemberAssignment, err = pd.getBytes(); err != nil { + return + } + + return nil +} diff --git a/describe_groups_response_test.go b/describe_groups_response_test.go new file mode 100644 index 000000000..6f85f067e --- /dev/null +++ b/describe_groups_response_test.go @@ -0,0 +1,91 @@ +package sarama + +import ( + "reflect" + "testing" +) + +var ( + describeGroupsResponseEmpty = []byte{ + 0, 0, 0, 0, // no groups + } + + describeGroupsResponsePopulated = []byte{ + 0, 0, 0, 2, // 2 groups + + 0, 0, // no error + 0, 3, 'f', 'o', 'o', // Group ID + 0, 3, 'b', 'a', 'r', // State + 0, 8, 'c', 'o', 'n', 's', 'u', 'm', 'e', 'r', // ConsumerProtocol type + 0, 3, 'b', 'a', 'z', // Protocol name + 0, 0, 0, 1, // 1 member + 0, 2, 'i', 'd', // Member ID + 0, 6, 's', 'a', 'r', 'a', 'm', 'a', // Client ID + 0, 9, 'l', 'o', 'c', 'a', 'l', 'h', 'o', 's', 't', // Client Host + 0, 0, 0, 3, 0x01, 0x02, 0x03, // MemberMetadata + 0, 0, 0, 3, 0x04, 0x05, 0x06, // MemberAssignment + + 0, 30, // ErrGroupAuthorizationFailed + 0, 0, + 0, 0, + 0, 0, + 0, 0, + 0, 0, 0, 0, + } +) + +func TestDescribeGroupsResponse(t *testing.T) { + var response *DescribeGroupsResponse + + response = new(DescribeGroupsResponse) + testDecodable(t, "empty", response, describeGroupsResponseEmpty) + if len(response.Groups) != 0 { + t.Error("Expected no groups") + } + + response = new(DescribeGroupsResponse) + testDecodable(t, "populated", response, describeGroupsResponsePopulated) + if len(response.Groups) != 2 { + t.Error("Expected two groups") + } + + group0 := response.Groups[0] + if group0.Err != ErrNoError { + t.Error("Unxpected groups[0].Err, found", group0.Err) + } + if group0.GroupId != "foo" { + t.Error("Unxpected groups[0].GroupId, found", group0.GroupId) + } + if group0.State != "bar" { + t.Error("Unxpected groups[0].State, found", group0.State) + } + if group0.ProtocolType != "consumer" { + t.Error("Unxpected groups[0].ProtocolType, found", group0.ProtocolType) + } + if group0.Protocol != "baz" { + t.Error("Unxpected groups[0].Protocol, found", group0.Protocol) + } + if len(group0.Members) != 1 { + t.Error("Unxpected groups[0].Members, found", group0.Members) + } + if group0.Members["id"].ClientId != "sarama" { + t.Error("Unxpected groups[0].Members[id].ClientId, found", group0.Members["id"].ClientId) + } + if group0.Members["id"].ClientHost != "localhost" { + t.Error("Unxpected groups[0].Members[id].ClientHost, found", group0.Members["id"].ClientHost) + } + if !reflect.DeepEqual(group0.Members["id"].MemberMetadata, []byte{0x01, 0x02, 0x03}) { + t.Error("Unxpected groups[0].Members[id].MemberMetadata, found", group0.Members["id"].MemberMetadata) + } + if !reflect.DeepEqual(group0.Members["id"].MemberAssignment, []byte{0x04, 0x05, 0x06}) { + t.Error("Unxpected groups[0].Members[id].MemberAssignment, found", group0.Members["id"].MemberAssignment) + } + + group1 := response.Groups[1] + if group1.Err != ErrGroupAuthorizationFailed { + t.Error("Unxpected groups[1].Err, found", group0.Err) + } + if len(group1.Members) != 0 { + t.Error("Unxpected groups[1].Members, found", group0.Members) + } +} diff --git a/errors.go b/errors.go index 70f2b9bfd..a837087f1 100644 --- a/errors.go +++ b/errors.go @@ -92,6 +92,17 @@ const ( ErrMessageSetSizeTooLarge KError = 18 ErrNotEnoughReplicas KError = 19 ErrNotEnoughReplicasAfterAppend KError = 20 + ErrInvalidRequiredAcks KError = 21 + ErrIllegalGeneration KError = 22 + ErrInconsistentGroupProtocol KError = 23 + ErrInvalidGroupId KError = 24 + ErrUnknownMemberId KError = 25 + ErrInvalidSessionTimeout KError = 26 + ErrRebalanceInProgress KError = 27 + ErrInvalidCommitOffsetSize KError = 28 + ErrTopicAuthorizationFailed KError = 29 + ErrGroupAuthorizationFailed KError = 30 + ErrClusterAuthorizationFailed KError = 31 ) func (err KError) Error() string { @@ -140,6 +151,28 @@ func (err KError) Error() string { return "kafka server: Messages are rejected since there are fewer in-sync replicas than required." case ErrNotEnoughReplicasAfterAppend: return "kafka server: Messages are written to the log, but to fewer in-sync replicas than required." + case ErrInvalidRequiredAcks: + return "kafka server: The number of required acks is invalid (should be either -1, 0, or 1)." + case ErrIllegalGeneration: + return "kafka server: The provided generation id is not the current generation." + case ErrInconsistentGroupProtocol: + return "kafka server: The provider group protocol type is incompatible with the other members." + case ErrInvalidGroupId: + return "kafka server: The provided group id was empty." + case ErrUnknownMemberId: + return "kafka server: The provided member is not known in the current generation." + case ErrInvalidSessionTimeout: + return "kafka server: The provided session timeout is outside the allowed range." + case ErrRebalanceInProgress: + return "kafka server: A rebalance for the group is in progress. Please re-join the group." + case ErrInvalidCommitOffsetSize: + return "kafka server: The provided commit metadata was too large." + case ErrTopicAuthorizationFailed: + return "kafka server: The client is not authorized to access this topic." + case ErrGroupAuthorizationFailed: + return "kafka server: The client is not authorized to access this group." + case ErrClusterAuthorizationFailed: + return "kafka server: The client is not authorized to send this request type." } return fmt.Sprintf("Unknown error, how did this happen? Error code = %d", err) diff --git a/heartbeat_request.go b/heartbeat_request.go new file mode 100644 index 000000000..b89d290f1 --- /dev/null +++ b/heartbeat_request.go @@ -0,0 +1,43 @@ +package sarama + +type HeartbeatRequest struct { + GroupId string + GenerationId int32 + MemberId string +} + +func (r *HeartbeatRequest) encode(pe packetEncoder) error { + if err := pe.putString(r.GroupId); err != nil { + return err + } + + pe.putInt32(r.GenerationId) + + if err := pe.putString(r.MemberId); err != nil { + return err + } + + return nil +} + +func (r *HeartbeatRequest) decode(pd packetDecoder) (err error) { + if r.GroupId, err = pd.getString(); err != nil { + return + } + if r.GenerationId, err = pd.getInt32(); err != nil { + return + } + if r.MemberId, err = pd.getString(); err != nil { + return + } + + return nil +} + +func (r *HeartbeatRequest) key() int16 { + return 12 +} + +func (r *HeartbeatRequest) version() int16 { + return 0 +} diff --git a/heartbeat_request_test.go b/heartbeat_request_test.go new file mode 100644 index 000000000..da6cd18f5 --- /dev/null +++ b/heartbeat_request_test.go @@ -0,0 +1,21 @@ +package sarama + +import "testing" + +var ( + basicHeartbeatRequest = []byte{ + 0, 3, 'f', 'o', 'o', // Group ID + 0x00, 0x01, 0x02, 0x03, // Generatiuon ID + 0, 3, 'b', 'a', 'z', // Member ID + } +) + +func TestHeartbeatRequest(t *testing.T) { + var request *HeartbeatRequest + + request = new(HeartbeatRequest) + request.GroupId = "foo" + request.GenerationId = 66051 + request.MemberId = "baz" + testRequest(t, "basic", request, basicHeartbeatRequest) +} diff --git a/heartbeat_response.go b/heartbeat_response.go new file mode 100644 index 000000000..b48b8c1f3 --- /dev/null +++ b/heartbeat_response.go @@ -0,0 +1,20 @@ +package sarama + +type HeartbeatResponse struct { + Err KError +} + +func (r *HeartbeatResponse) encode(pe packetEncoder) error { + pe.putInt16(int16(r.Err)) + return nil +} + +func (r *HeartbeatResponse) decode(pd packetDecoder) error { + if kerr, err := pd.getInt16(); err != nil { + return err + } else { + r.Err = KError(kerr) + } + + return nil +} diff --git a/heartbeat_response_test.go b/heartbeat_response_test.go new file mode 100644 index 000000000..f8545781a --- /dev/null +++ b/heartbeat_response_test.go @@ -0,0 +1,18 @@ +package sarama + +import "testing" + +var ( + heartbeatResponseNoError = []byte{ + 0x00, 0x00} +) + +func TestHeartbeatResponse(t *testing.T) { + var response *HeartbeatResponse + + response = new(HeartbeatResponse) + testDecodable(t, "no error", response, heartbeatResponseNoError) + if response.Err != ErrNoError { + t.Error("Decoding error failed: no error expected but found", response.Err) + } +} diff --git a/join_group_request.go b/join_group_request.go new file mode 100644 index 000000000..8bb5ce826 --- /dev/null +++ b/join_group_request.go @@ -0,0 +1,94 @@ +package sarama + +type JoinGroupRequest struct { + GroupId string + SessionTimeout int32 + MemberId string + ProtocolType string + GroupProtocols map[string][]byte +} + +func (r *JoinGroupRequest) encode(pe packetEncoder) error { + if err := pe.putString(r.GroupId); err != nil { + return err + } + pe.putInt32(r.SessionTimeout) + if err := pe.putString(r.MemberId); err != nil { + return err + } + if err := pe.putString(r.ProtocolType); err != nil { + return err + } + + if err := pe.putArrayLength(len(r.GroupProtocols)); err != nil { + return err + } + for name, metadata := range r.GroupProtocols { + if err := pe.putString(name); err != nil { + return err + } + if err := pe.putBytes(metadata); err != nil { + return err + } + } + + return nil +} + +func (r *JoinGroupRequest) decode(pd packetDecoder) (err error) { + if r.GroupId, err = pd.getString(); err != nil { + return + } + + if r.SessionTimeout, err = pd.getInt32(); err != nil { + return + } + + if r.MemberId, err = pd.getString(); err != nil { + return + } + + if r.ProtocolType, err = pd.getString(); err != nil { + return + } + + n, err := pd.getArrayLength() + if err != nil { + return err + } + if n == 0 { + return nil + } + + r.GroupProtocols = make(map[string][]byte) + for i := 0; i < n; i++ { + name, err := pd.getString() + if err != nil { + return err + } + metadata, err := pd.getBytes() + if err != nil { + return err + } + + r.GroupProtocols[name] = metadata + } + + return nil +} + +func (r *JoinGroupRequest) key() int16 { + return 11 +} + +func (r *JoinGroupRequest) version() int16 { + return 0 +} + +func (r *JoinGroupRequest) AddGroupProtocol(name string, metadata []byte) { + if r.GroupProtocols == nil { + r.GroupProtocols = make(map[string][]byte) + } + + r.GroupProtocols[name] = metadata +} diff --git a/join_group_request_test.go b/join_group_request_test.go new file mode 100644 index 000000000..8a6448c0e --- /dev/null +++ b/join_group_request_test.go @@ -0,0 +1,41 @@ +package sarama + +import "testing" + +var ( + joinGroupRequestNoProtocols = []byte{ + 0, 9, 'T', 'e', 's', 't', 'G', 'r', 'o', 'u', 'p', // Group ID + 0, 0, 0, 100, // Session timeout + 0, 0, // Member ID + 0, 8, 'c', 'o', 'n', 's', 'u', 'm', 'e', 'r', // Protocol Type + 0, 0, 0, 0, // 0 protocol groups + } + + joinGroupRequestOneProtocol = []byte{ + 0, 9, 'T', 'e', 's', 't', 'G', 'r', 'o', 'u', 'p', // Group ID + 0, 0, 0, 100, // Session timeout + 0, 11, 'O', 'n', 'e', 'P', 'r', 'o', 't', 'o', 'c', 'o', 'l', // Member ID + 0, 8, 'c', 'o', 'n', 's', 'u', 'm', 'e', 'r', // Protocol Type + 0, 0, 0, 1, // 1 group protocol + 0, 3, 'o', 'n', 'e', // Protocol name + 0, 0, 0, 3, 0x01, 0x02, 0x03, // protocol metadata + } +) + +func TestJoinGroupRequest(t *testing.T) { + var request *JoinGroupRequest + + request = new(JoinGroupRequest) + request.GroupId = "TestGroup" + request.SessionTimeout = 100 + request.ProtocolType = "consumer" + testRequest(t, "no protocols", request, joinGroupRequestNoProtocols) + + request = new(JoinGroupRequest) + request.GroupId = "TestGroup" + request.SessionTimeout = 100 + request.MemberId = "OneProtocol" + request.ProtocolType = "consumer" + request.AddGroupProtocol("one", []byte{0x01, 0x02, 0x03}) + testRequest(t, "one protocol", request, joinGroupRequestOneProtocol) +} diff --git a/join_group_response.go b/join_group_response.go new file mode 100644 index 000000000..037a9cd26 --- /dev/null +++ b/join_group_response.go @@ -0,0 +1,90 @@ +package sarama + +type JoinGroupResponse struct { + Err KError + GenerationId int32 + GroupProtocol string + LeaderId string + MemberId string + Members map[string][]byte +} + +func (r *JoinGroupResponse) encode(pe packetEncoder) error { + pe.putInt16(int16(r.Err)) + pe.putInt32(r.GenerationId) + + if err := pe.putString(r.GroupProtocol); err != nil { + return err + } + if err := pe.putString(r.LeaderId); err != nil { + return err + } + if err := pe.putString(r.MemberId); err != nil { + return err + } + + if err := pe.putArrayLength(len(r.Members)); err != nil { + return err + } + + for memberId, memberMetadata := range r.Members { + if err := pe.putString(memberId); err != nil { + return err + } + + if err := pe.putBytes(memberMetadata); err != nil { + return err + } + } + + return nil +} + +func (r *JoinGroupResponse) decode(pd packetDecoder) (err error) { + if kerr, err := pd.getInt16(); err != nil { + return err + } else { + r.Err = KError(kerr) + } + + if r.GenerationId, err = pd.getInt32(); err != nil { + return + } + + if r.GroupProtocol, err = pd.getString(); err != nil { + return + } + + if r.LeaderId, err = pd.getString(); err != nil { + return + } + + if r.MemberId, err = pd.getString(); err != nil { + return + } + + n, err := pd.getArrayLength() + if err != nil { + return err + } + if n == 0 { + return nil + } + + r.Members = make(map[string][]byte) + for i := 0; i < n; i++ { + memberId, err := pd.getString() + if err != nil { + return err + } + + memberMetadata, err := pd.getBytes() + if err != nil { + return err + } + + r.Members[memberId] = memberMetadata + } + + return nil +} diff --git a/join_group_response_test.go b/join_group_response_test.go new file mode 100644 index 000000000..560a726ff --- /dev/null +++ b/join_group_response_test.go @@ -0,0 +1,98 @@ +package sarama + +import ( + "reflect" + "testing" +) + +var ( + joinGroupResponseNoError = []byte{ + 0x00, 0x00, // No error + 0x00, 0x01, 0x02, 0x03, // Generation ID + 0, 8, 'p', 'r', 'o', 't', 'o', 'c', 'o', 'l', // Protocol name chosen + 0, 3, 'f', 'o', 'o', // Leader ID + 0, 3, 'b', 'a', 'r', // Member ID + 0, 0, 0, 0, // No member info + } + + joinGroupResponseWithError = []byte{ + 0, 23, // Error: inconsistent group protocol + 0x00, 0x00, 0x00, 0x00, // Generation ID + 0, 0, // Protocol name chosen + 0, 0, // Leader ID + 0, 0, // Member ID + 0, 0, 0, 0, // No member info + } + + joinGroupResponseLeader = []byte{ + 0x00, 0x00, // No error + 0x00, 0x01, 0x02, 0x03, // Generation ID + 0, 8, 'p', 'r', 'o', 't', 'o', 'c', 'o', 'l', // Protocol name chosen + 0, 3, 'f', 'o', 'o', // Leader ID + 0, 3, 'f', 'o', 'o', // Member ID == Leader ID + 0, 0, 0, 1, // 1 member + 0, 3, 'f', 'o', 'o', // Member ID + 0, 0, 0, 3, 0x01, 0x02, 0x03, // Member metadata + } +) + +func TestJoinGroupResponse(t *testing.T) { + var response *JoinGroupResponse + + response = new(JoinGroupResponse) + testDecodable(t, "no error", response, joinGroupResponseNoError) + if response.Err != ErrNoError { + t.Error("Decoding Err failed: no error expected but found", response.Err) + } + if response.GenerationId != 66051 { + t.Error("Decoding GenerationId failed, found:", response.GenerationId) + } + if response.LeaderId != "foo" { + t.Error("Decoding LeaderId failed, found:", response.LeaderId) + } + if response.MemberId != "bar" { + t.Error("Decoding MemberId failed, found:", response.MemberId) + } + if len(response.Members) != 0 { + t.Error("Decoding Members failed, found:", response.Members) + } + + response = new(JoinGroupResponse) + testDecodable(t, "with error", response, joinGroupResponseWithError) + if response.Err != ErrInconsistentGroupProtocol { + t.Error("Decoding Err failed: ErrInconsistentGroupProtocol expected but found", response.Err) + } + if response.GenerationId != 0 { + t.Error("Decoding GenerationId failed, found:", response.GenerationId) + } + if response.LeaderId != "" { + t.Error("Decoding LeaderId failed, found:", response.LeaderId) + } + if response.MemberId != "" { + t.Error("Decoding MemberId failed, found:", response.MemberId) + } + if len(response.Members) != 0 { + t.Error("Decoding Members failed, found:", response.Members) + } + + response = new(JoinGroupResponse) + testDecodable(t, "with error", response, joinGroupResponseLeader) + if response.Err != ErrNoError { + t.Error("Decoding Err failed: ErrNoError expected but found", response.Err) + } + if response.GenerationId != 66051 { + t.Error("Decoding GenerationId failed, found:", response.GenerationId) + } + if response.LeaderId != "foo" { + t.Error("Decoding LeaderId failed, found:", response.LeaderId) + } + if response.MemberId != "foo" { + t.Error("Decoding MemberId failed, found:", response.MemberId) + } + if len(response.Members) != 1 { + t.Error("Decoding Members failed, found:", response.Members) + } + if !reflect.DeepEqual(response.Members["foo"], []byte{0x01, 0x02, 0x03}) { + t.Error("Decoding foo member failed, found:", response.Members["foo"]) + } +} diff --git a/leave_group_request.go b/leave_group_request.go new file mode 100644 index 000000000..cdb4d14fd --- /dev/null +++ b/leave_group_request.go @@ -0,0 +1,36 @@ +package sarama + +type LeaveGroupRequest struct { + GroupId string + MemberId string +} + +func (r *LeaveGroupRequest) encode(pe packetEncoder) error { + if err := pe.putString(r.GroupId); err != nil { + return err + } + if err := pe.putString(r.MemberId); err != nil { + return err + } + + return nil +} + +func (r *LeaveGroupRequest) decode(pd packetDecoder) (err error) { + if r.GroupId, err = pd.getString(); err != nil { + return + } + if r.MemberId, err = pd.getString(); err != nil { + return + } + + return nil +} + +func (r *LeaveGroupRequest) key() int16 { + return 13 +} + +func (r *LeaveGroupRequest) version() int16 { + return 0 +} diff --git a/leave_group_request_test.go b/leave_group_request_test.go new file mode 100644 index 000000000..c1fed6d25 --- /dev/null +++ b/leave_group_request_test.go @@ -0,0 +1,19 @@ +package sarama + +import "testing" + +var ( + basicLeaveGroupRequest = []byte{ + 0, 3, 'f', 'o', 'o', + 0, 3, 'b', 'a', 'r', + } +) + +func TestLeaveGroupRequest(t *testing.T) { + var request *LeaveGroupRequest + + request = new(LeaveGroupRequest) + request.GroupId = "foo" + request.MemberId = "bar" + testRequest(t, "basic", request, basicLeaveGroupRequest) +} diff --git a/leave_group_response.go b/leave_group_response.go new file mode 100644 index 000000000..bad1dba2f --- /dev/null +++ b/leave_group_response.go @@ -0,0 +1,20 @@ +package sarama + +type LeaveGroupResponse struct { + Err KError +} + +func (r *LeaveGroupResponse) encode(pe packetEncoder) error { + pe.putInt16(int16(r.Err)) + return nil +} + +func (r *LeaveGroupResponse) decode(pd packetDecoder) (err error) { + if kerr, err := pd.getInt16(); err != nil { + return err + } else { + r.Err = KError(kerr) + } + + return nil +} diff --git a/leave_group_response_test.go b/leave_group_response_test.go new file mode 100644 index 000000000..0e9fb74b4 --- /dev/null +++ b/leave_group_response_test.go @@ -0,0 +1,24 @@ +package sarama + +import "testing" + +var ( + leaveGroupResponseNoError = []byte{0x00, 0x00} + leaveGroupResponseWithError = []byte{0, 25} +) + +func TestLeaveGroupResponse(t *testing.T) { + var response *LeaveGroupResponse + + response = new(LeaveGroupResponse) + testDecodable(t, "no error", response, leaveGroupResponseNoError) + if response.Err != ErrNoError { + t.Error("Decoding error failed: no error expected but found", response.Err) + } + + response = new(LeaveGroupResponse) + testDecodable(t, "with error", response, leaveGroupResponseWithError) + if response.Err != ErrUnknownMemberId { + t.Error("Decoding error failed: ErrUnknownMemberId expected but found", response.Err) + } +} diff --git a/list_groups_request.go b/list_groups_request.go new file mode 100644 index 000000000..4d74c2665 --- /dev/null +++ b/list_groups_request.go @@ -0,0 +1,20 @@ +package sarama + +type ListGroupsRequest struct { +} + +func (r *ListGroupsRequest) encode(pe packetEncoder) error { + return nil +} + +func (r *ListGroupsRequest) decode(pd packetDecoder) (err error) { + return nil +} + +func (r *ListGroupsRequest) key() int16 { + return 16 +} + +func (r *ListGroupsRequest) version() int16 { + return 0 +} diff --git a/list_groups_request_test.go b/list_groups_request_test.go new file mode 100644 index 000000000..2e977d9a5 --- /dev/null +++ b/list_groups_request_test.go @@ -0,0 +1,7 @@ +package sarama + +import "testing" + +func TestListGroupsRequest(t *testing.T) { + testRequest(t, "ListGroupsRequest", &ListGroupsRequest{}, []byte{}) +} diff --git a/list_groups_response.go b/list_groups_response.go new file mode 100644 index 000000000..2f5314902 --- /dev/null +++ b/list_groups_response.go @@ -0,0 +1,56 @@ +package sarama + +type ListGroupsResponse struct { + Err KError + Groups map[string]string +} + +func (r *ListGroupsResponse) encode(pe packetEncoder) error { + pe.putInt16(int16(r.Err)) + + if err := pe.putArrayLength(len(r.Groups)); err != nil { + return err + } + for groupId, protocolType := range r.Groups { + if err := pe.putString(groupId); err != nil { + return err + } + if err := pe.putString(protocolType); err != nil { + return err + } + } + + return nil +} + +func (r *ListGroupsResponse) decode(pd packetDecoder) error { + if kerr, err := pd.getInt16(); err != nil { + return err + } else { + r.Err = KError(kerr) + } + + n, err := pd.getArrayLength() + if err != nil { + return err + } + if n == 0 { + return nil + } + + r.Groups = make(map[string]string) + for i := 0; i < n; i++ { + groupId, err := pd.getString() + if err != nil { + return err + } + protocolType, err := pd.getString() + if err != nil { + return err + } + + r.Groups[groupId] = protocolType + } + + return nil +} diff --git a/list_groups_response_test.go b/list_groups_response_test.go new file mode 100644 index 000000000..626e7f9ab --- /dev/null +++ b/list_groups_response_test.go @@ -0,0 +1,58 @@ +package sarama + +import ( + "testing" +) + +var ( + listGroupsResponseEmpty = []byte{ + 0, 0, // no error + 0, 0, 0, 0, // no groups + } + + listGroupsResponseError = []byte{ + 0, 31, // no error + 0, 0, 0, 0, // ErrClusterAuthorizationFailed + } + + listGroupsResponseWithConsumer = []byte{ + 0, 0, // no error + 0, 0, 0, 1, // 1 group + 0, 3, 'f', 'o', 'o', // group name + 0, 8, 'c', 'o', 'n', 's', 'u', 'm', 'e', 'r', // protocol type + } +) + +func TestListGroupsResponse(t *testing.T) { + var response *ListGroupsResponse + + response = new(ListGroupsResponse) + testDecodable(t, "no error", response, listGroupsResponseEmpty) + if response.Err != ErrNoError { + t.Error("Expected no gerror, found:", response.Err) + } + if len(response.Groups) != 0 { + t.Error("Expected no groups") + } + + response = new(ListGroupsResponse) + testDecodable(t, "no error", response, listGroupsResponseError) + if response.Err != ErrClusterAuthorizationFailed { + t.Error("Expected no gerror, found:", response.Err) + } + if len(response.Groups) != 0 { + t.Error("Expected no groups") + } + + response = new(ListGroupsResponse) + testDecodable(t, "no error", response, listGroupsResponseWithConsumer) + if response.Err != ErrNoError { + t.Error("Expected no gerror, found:", response.Err) + } + if len(response.Groups) != 1 { + t.Error("Expected one group") + } + if response.Groups["foo"] != "consumer" { + t.Error("Expected foo group to use consumer protocol") + } +} diff --git a/packet_decoder.go b/packet_decoder.go index 034222313..28670c0e6 100644 --- a/packet_decoder.go +++ b/packet_decoder.go @@ -16,6 +16,7 @@ type packetDecoder interface { getString() (string, error) getInt32Array() ([]int32, error) getInt64Array() ([]int64, error) + getStringArray() ([]string, error) // Subsets remaining() int diff --git a/packet_encoder.go b/packet_encoder.go index 2c5710938..0df6e24aa 100644 --- a/packet_encoder.go +++ b/packet_encoder.go @@ -15,6 +15,7 @@ type packetEncoder interface { putBytes(in []byte) error putRawBytes(in []byte) error putString(in string) error + putStringArray(in []string) error putInt32Array(in []int32) error putInt64Array(in []int64) error diff --git a/prep_encoder.go b/prep_encoder.go index 58fb4fc2c..8c6ba8502 100644 --- a/prep_encoder.go +++ b/prep_encoder.go @@ -66,6 +66,21 @@ func (pe *prepEncoder) putString(in string) error { return nil } +func (pe *prepEncoder) putStringArray(in []string) error { + err := pe.putArrayLength(len(in)) + if err != nil { + return err + } + + for _, str := range in { + if err := pe.putString(str); err != nil { + return err + } + } + + return nil +} + func (pe *prepEncoder) putInt32Array(in []int32) error { err := pe.putArrayLength(len(in)) if err != nil { diff --git a/real_decoder.go b/real_decoder.go index 235c8a80d..e3ea33104 100644 --- a/real_decoder.go +++ b/real_decoder.go @@ -181,6 +181,33 @@ func (rd *realDecoder) getInt64Array() ([]int64, error) { return ret, nil } +func (rd *realDecoder) getStringArray() ([]string, error) { + if rd.remaining() < 4 { + rd.off = len(rd.raw) + return nil, ErrInsufficientData + } + n := int(binary.BigEndian.Uint32(rd.raw[rd.off:])) + rd.off += 4 + + if n == 0 { + return nil, nil + } + + if n < 0 { + return nil, PacketDecodingError{"invalid array length"} + } + + ret := make([]string, n) + for i := range ret { + if str, err := rd.getString(); err != nil { + return nil, err + } else { + ret[i] = str + } + } + return ret, nil +} + // subsets func (rd *realDecoder) remaining() int { diff --git a/real_encoder.go b/real_encoder.go index b50f54bc5..076fdd0ca 100644 --- a/real_encoder.go +++ b/real_encoder.go @@ -61,6 +61,21 @@ func (re *realEncoder) putString(in string) error { return nil } +func (re *realEncoder) putStringArray(in []string) error { + err := re.putArrayLength(len(in)) + if err != nil { + return err + } + + for _, val := range in { + if err := re.putString(val); err != nil { + return err + } + } + + return nil +} + func (re *realEncoder) putInt32Array(in []int32) error { err := re.putArrayLength(len(in)) if err != nil { diff --git a/request.go b/request.go index d6d5cdfcd..b9f654ba2 100644 --- a/request.go +++ b/request.go @@ -95,6 +95,18 @@ func allocateBody(key, version int16) requestBody { return &OffsetFetchRequest{} case 10: return &ConsumerMetadataRequest{} + case 11: + return &JoinGroupRequest{} + case 12: + return &HeartbeatRequest{} + case 13: + return &LeaveGroupRequest{} + case 14: + return &SyncGroupRequest{} + case 15: + return &DescribeGroupsRequest{} + case 16: + return &ListGroupsRequest{} } return nil } diff --git a/request_test.go b/request_test.go index 69e8b4cbe..1e47292c1 100644 --- a/request_test.go +++ b/request_test.go @@ -48,7 +48,7 @@ func testRequest(t *testing.T, name string, rb requestBody, expected []byte) { if err != nil { t.Error(err) } else if !bytes.Equal(packet[headerSize:], expected) { - t.Error("Encoding", name, "failed\ngot ", packet, "\nwant", expected) + t.Error("Encoding", name, "failed\ngot ", packet[headerSize:], "\nwant", expected) } // Decoder request decoded, err := decodeRequest(bytes.NewReader(packet)) @@ -57,7 +57,7 @@ func testRequest(t *testing.T, name string, rb requestBody, expected []byte) { } else if decoded.correlationID != 123 || decoded.clientID != "foo" { t.Errorf("Decoded header is not valid: %v", decoded) } else if !reflect.DeepEqual(rb, decoded.body) { - t.Errorf("Decoded request does not match the encoded one\nencoded: %v\ndecoded: %v", rb, decoded) + t.Errorf("Decoded request does not match the encoded one\nencoded: %v\ndecoded: %v", rb, decoded.body) } } diff --git a/sync_group_request.go b/sync_group_request.go new file mode 100644 index 000000000..60be6f3f3 --- /dev/null +++ b/sync_group_request.go @@ -0,0 +1,86 @@ +package sarama + +type SyncGroupRequest struct { + GroupId string + GenerationId int32 + MemberId string + GroupAssignments map[string][]byte +} + +func (r *SyncGroupRequest) encode(pe packetEncoder) error { + if err := pe.putString(r.GroupId); err != nil { + return err + } + + pe.putInt32(r.GenerationId) + + if err := pe.putString(r.MemberId); err != nil { + return err + } + + if err := pe.putArrayLength(len(r.GroupAssignments)); err != nil { + return err + } + for memberId, memberAssignment := range r.GroupAssignments { + if err := pe.putString(memberId); err != nil { + return err + } + if err := pe.putBytes(memberAssignment); err != nil { + return err + } + } + + return nil +} + +func (r *SyncGroupRequest) decode(pd packetDecoder) (err error) { + if r.GroupId, err = pd.getString(); err != nil { + return + } + if r.GenerationId, err = pd.getInt32(); err != nil { + return + } + if r.MemberId, err = pd.getString(); err != nil { + return + } + + n, err := pd.getArrayLength() + if err != nil { + return err + } + if n == 0 { + return nil + } + + r.GroupAssignments = make(map[string][]byte) + for i := 0; i < n; i++ { + memberId, err := pd.getString() + if err != nil { + return err + } + memberAssignment, err := pd.getBytes() + if err != nil { + return err + } + + r.GroupAssignments[memberId] = memberAssignment + } + + return nil +} + +func (r *SyncGroupRequest) key() int16 { + return 14 +} + +func (r *SyncGroupRequest) version() int16 { + return 0 +} + +func (r *SyncGroupRequest) AddGroupAssignment(memberId string, memberAssignment []byte) { + if r.GroupAssignments == nil { + r.GroupAssignments = make(map[string][]byte) + } + + r.GroupAssignments[memberId] = memberAssignment +} diff --git a/sync_group_request_test.go b/sync_group_request_test.go new file mode 100644 index 000000000..3f537ef9f --- /dev/null +++ b/sync_group_request_test.go @@ -0,0 +1,38 @@ +package sarama + +import "testing" + +var ( + emptySyncGroupRequest = []byte{ + 0, 3, 'f', 'o', 'o', // Group ID + 0x00, 0x01, 0x02, 0x03, // Generation ID + 0, 3, 'b', 'a', 'z', // Member ID + 0, 0, 0, 0, // no assignments + } + + populatedSyncGroupRequest = []byte{ + 0, 3, 'f', 'o', 'o', // Group ID + 0x00, 0x01, 0x02, 0x03, // Generation ID + 0, 3, 'b', 'a', 'z', // Member ID + 0, 0, 0, 1, // one assignment + 0, 3, 'b', 'a', 'z', // Member ID + 0, 0, 0, 3, 'f', 'o', 'o', // Member assignment + } +) + +func TestSyncGroupRequest(t *testing.T) { + var request *SyncGroupRequest + + request = new(SyncGroupRequest) + request.GroupId = "foo" + request.GenerationId = 66051 + request.MemberId = "baz" + testRequest(t, "empty", request, emptySyncGroupRequest) + + request = new(SyncGroupRequest) + request.GroupId = "foo" + request.GenerationId = 66051 + request.MemberId = "baz" + request.AddGroupAssignment("baz", []byte("foo")) + testRequest(t, "populated", request, populatedSyncGroupRequest) +} diff --git a/sync_group_response.go b/sync_group_response.go new file mode 100644 index 000000000..e10685ef8 --- /dev/null +++ b/sync_group_response.go @@ -0,0 +1,22 @@ +package sarama + +type SyncGroupResponse struct { + Err KError + MemberAssignment []byte +} + +func (r *SyncGroupResponse) encode(pe packetEncoder) error { + pe.putInt16(int16(r.Err)) + return pe.putBytes(r.MemberAssignment) +} + +func (r *SyncGroupResponse) decode(pd packetDecoder) (err error) { + if kerr, err := pd.getInt16(); err != nil { + return err + } else { + r.Err = KError(kerr) + } + + r.MemberAssignment, err = pd.getBytes() + return +} diff --git a/sync_group_response_test.go b/sync_group_response_test.go new file mode 100644 index 000000000..164908f37 --- /dev/null +++ b/sync_group_response_test.go @@ -0,0 +1,40 @@ +package sarama + +import ( + "reflect" + "testing" +) + +var ( + syncGroupResponseNoError = []byte{ + 0x00, 0x00, // No error + 0, 0, 0, 3, 0x01, 0x02, 0x03, // Member assignment data + } + + syncGroupResponseWithError = []byte{ + 0, 27, // ErrRebalanceInProgress + 0, 0, 0, 0, // No member assignment data + } +) + +func TestSyncGroupResponse(t *testing.T) { + var response *SyncGroupResponse + + response = new(SyncGroupResponse) + testDecodable(t, "no error", response, syncGroupResponseNoError) + if response.Err != ErrNoError { + t.Error("Decoding Err failed: no error expected but found", response.Err) + } + if !reflect.DeepEqual(response.MemberAssignment, []byte{0x01, 0x02, 0x03}) { + t.Error("Decoding MemberAssignment failed, found:", response.MemberAssignment) + } + + response = new(SyncGroupResponse) + testDecodable(t, "no error", response, syncGroupResponseWithError) + if response.Err != ErrRebalanceInProgress { + t.Error("Decoding Err failed: ErrRebalanceInProgress expected but found", response.Err) + } + if !reflect.DeepEqual(response.MemberAssignment, []byte{}) { + t.Error("Decoding MemberAssignment failed, found:", response.MemberAssignment) + } +}