Skip to content

Commit

Permalink
Merge pull request #577 from Shopify/kafka_09_protocol
Browse files Browse the repository at this point in the history
Implement Kafka 0.9 protocol additions
  • Loading branch information
wvanbergen committed Dec 9, 2015
2 parents c36adfa + 756801d commit 159e999
Show file tree
Hide file tree
Showing 32 changed files with 1,270 additions and 2 deletions.
26 changes: 26 additions & 0 deletions describe_groups_request.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package sarama

type DescribeGroupsRequest struct {
Groups []string
}

func (r *DescribeGroupsRequest) encode(pe packetEncoder) error {
return pe.putStringArray(r.Groups)
}

func (r *DescribeGroupsRequest) decode(pd packetDecoder) (err error) {
r.Groups, err = pd.getStringArray()
return
}

func (r *DescribeGroupsRequest) key() int16 {
return 15
}

func (r *DescribeGroupsRequest) version() int16 {
return 0
}

func (r *DescribeGroupsRequest) AddGroup(group string) {
r.Groups = append(r.Groups, group)
}
34 changes: 34 additions & 0 deletions describe_groups_request_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package sarama

import "testing"

var (
emptyDescribeGroupsRequest = []byte{0, 0, 0, 0}

singleDescribeGroupsRequest = []byte{
0, 0, 0, 1, // 1 group
0, 3, 'f', 'o', 'o', // group name: foo
}

doubleDescribeGroupsRequest = []byte{
0, 0, 0, 2, // 2 groups
0, 3, 'f', 'o', 'o', // group name: foo
0, 3, 'b', 'a', 'r', // group name: foo
}
)

func TestDescribeGroupsRequest(t *testing.T) {
var request *DescribeGroupsRequest

request = new(DescribeGroupsRequest)
testRequest(t, "no groups", request, emptyDescribeGroupsRequest)

request = new(DescribeGroupsRequest)
request.AddGroup("foo")
testRequest(t, "one group", request, singleDescribeGroupsRequest)

request = new(DescribeGroupsRequest)
request.AddGroup("foo")
request.AddGroup("bar")
testRequest(t, "two groups", request, doubleDescribeGroupsRequest)
}
162 changes: 162 additions & 0 deletions describe_groups_response.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
package sarama

type DescribeGroupsResponse struct {
Groups []*GroupDescription
}

func (r *DescribeGroupsResponse) encode(pe packetEncoder) error {
if err := pe.putArrayLength(len(r.Groups)); err != nil {
return err
}

for _, groupDescription := range r.Groups {
if err := groupDescription.encode(pe); err != nil {
return err
}
}

return nil
}

func (r *DescribeGroupsResponse) decode(pd packetDecoder) (err error) {
n, err := pd.getArrayLength()
if err != nil {
return err
}

r.Groups = make([]*GroupDescription, n)
for i := 0; i < n; i++ {
r.Groups[i] = new(GroupDescription)
if err := r.Groups[i].decode(pd); err != nil {
return err
}
}

return nil
}

type GroupDescription struct {
Err KError
GroupId string
State string
ProtocolType string
Protocol string
Members map[string]*GroupMemberDescription
}

func (gd *GroupDescription) encode(pe packetEncoder) error {
pe.putInt16(int16(gd.Err))

if err := pe.putString(gd.GroupId); err != nil {
return err
}
if err := pe.putString(gd.State); err != nil {
return err
}
if err := pe.putString(gd.ProtocolType); err != nil {
return err
}
if err := pe.putString(gd.Protocol); err != nil {
return err
}

if err := pe.putArrayLength(len(gd.Members)); err != nil {
return err
}

for memberId, groupMemberDescription := range gd.Members {
if err := pe.putString(memberId); err != nil {
return err
}
if err := groupMemberDescription.encode(pe); err != nil {
return err
}
}

return nil
}

func (gd *GroupDescription) decode(pd packetDecoder) (err error) {
if kerr, err := pd.getInt16(); err != nil {
return err
} else {
gd.Err = KError(kerr)
}

if gd.GroupId, err = pd.getString(); err != nil {
return
}
if gd.State, err = pd.getString(); err != nil {
return
}
if gd.ProtocolType, err = pd.getString(); err != nil {
return
}
if gd.Protocol, err = pd.getString(); err != nil {
return
}

n, err := pd.getArrayLength()
if err != nil {
return err
}
if n == 0 {
return nil
}

gd.Members = make(map[string]*GroupMemberDescription)
for i := 0; i < n; i++ {
memberId, err := pd.getString()
if err != nil {
return err
}

gd.Members[memberId] = new(GroupMemberDescription)
if err := gd.Members[memberId].decode(pd); err != nil {
return err
}
}

return nil
}

type GroupMemberDescription struct {
ClientId string
ClientHost string
MemberMetadata []byte
MemberAssignment []byte
}

func (gmd *GroupMemberDescription) encode(pe packetEncoder) error {
if err := pe.putString(gmd.ClientId); err != nil {
return err
}
if err := pe.putString(gmd.ClientHost); err != nil {
return err
}
if err := pe.putBytes(gmd.MemberMetadata); err != nil {
return err
}
if err := pe.putBytes(gmd.MemberAssignment); err != nil {
return err
}

return nil
}

func (gmd *GroupMemberDescription) decode(pd packetDecoder) (err error) {
if gmd.ClientId, err = pd.getString(); err != nil {
return
}
if gmd.ClientHost, err = pd.getString(); err != nil {
return
}
if gmd.MemberMetadata, err = pd.getBytes(); err != nil {
return
}
if gmd.MemberAssignment, err = pd.getBytes(); err != nil {
return
}

return nil
}
91 changes: 91 additions & 0 deletions describe_groups_response_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package sarama

import (
"reflect"
"testing"
)

var (
describeGroupsResponseEmpty = []byte{
0, 0, 0, 0, // no groups
}

describeGroupsResponsePopulated = []byte{
0, 0, 0, 2, // 2 groups

0, 0, // no error
0, 3, 'f', 'o', 'o', // Group ID
0, 3, 'b', 'a', 'r', // State
0, 8, 'c', 'o', 'n', 's', 'u', 'm', 'e', 'r', // ConsumerProtocol type
0, 3, 'b', 'a', 'z', // Protocol name
0, 0, 0, 1, // 1 member
0, 2, 'i', 'd', // Member ID
0, 6, 's', 'a', 'r', 'a', 'm', 'a', // Client ID
0, 9, 'l', 'o', 'c', 'a', 'l', 'h', 'o', 's', 't', // Client Host
0, 0, 0, 3, 0x01, 0x02, 0x03, // MemberMetadata
0, 0, 0, 3, 0x04, 0x05, 0x06, // MemberAssignment

0, 30, // ErrGroupAuthorizationFailed
0, 0,
0, 0,
0, 0,
0, 0,
0, 0, 0, 0,
}
)

func TestDescribeGroupsResponse(t *testing.T) {
var response *DescribeGroupsResponse

response = new(DescribeGroupsResponse)
testDecodable(t, "empty", response, describeGroupsResponseEmpty)
if len(response.Groups) != 0 {
t.Error("Expected no groups")
}

response = new(DescribeGroupsResponse)
testDecodable(t, "populated", response, describeGroupsResponsePopulated)
if len(response.Groups) != 2 {
t.Error("Expected two groups")
}

group0 := response.Groups[0]
if group0.Err != ErrNoError {
t.Error("Unxpected groups[0].Err, found", group0.Err)
}
if group0.GroupId != "foo" {
t.Error("Unxpected groups[0].GroupId, found", group0.GroupId)
}
if group0.State != "bar" {
t.Error("Unxpected groups[0].State, found", group0.State)
}
if group0.ProtocolType != "consumer" {
t.Error("Unxpected groups[0].ProtocolType, found", group0.ProtocolType)
}
if group0.Protocol != "baz" {
t.Error("Unxpected groups[0].Protocol, found", group0.Protocol)
}
if len(group0.Members) != 1 {
t.Error("Unxpected groups[0].Members, found", group0.Members)
}
if group0.Members["id"].ClientId != "sarama" {
t.Error("Unxpected groups[0].Members[id].ClientId, found", group0.Members["id"].ClientId)
}
if group0.Members["id"].ClientHost != "localhost" {
t.Error("Unxpected groups[0].Members[id].ClientHost, found", group0.Members["id"].ClientHost)
}
if !reflect.DeepEqual(group0.Members["id"].MemberMetadata, []byte{0x01, 0x02, 0x03}) {
t.Error("Unxpected groups[0].Members[id].MemberMetadata, found", group0.Members["id"].MemberMetadata)
}
if !reflect.DeepEqual(group0.Members["id"].MemberAssignment, []byte{0x04, 0x05, 0x06}) {
t.Error("Unxpected groups[0].Members[id].MemberAssignment, found", group0.Members["id"].MemberAssignment)
}

group1 := response.Groups[1]
if group1.Err != ErrGroupAuthorizationFailed {
t.Error("Unxpected groups[1].Err, found", group0.Err)
}
if len(group1.Members) != 0 {
t.Error("Unxpected groups[1].Members, found", group0.Members)
}
}
33 changes: 33 additions & 0 deletions errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,17 @@ const (
ErrMessageSetSizeTooLarge KError = 18
ErrNotEnoughReplicas KError = 19
ErrNotEnoughReplicasAfterAppend KError = 20
ErrInvalidRequiredAcks KError = 21
ErrIllegalGeneration KError = 22
ErrInconsistentGroupProtocol KError = 23
ErrInvalidGroupId KError = 24
ErrUnknownMemberId KError = 25
ErrInvalidSessionTimeout KError = 26
ErrRebalanceInProgress KError = 27
ErrInvalidCommitOffsetSize KError = 28
ErrTopicAuthorizationFailed KError = 29
ErrGroupAuthorizationFailed KError = 30
ErrClusterAuthorizationFailed KError = 31
)

func (err KError) Error() string {
Expand Down Expand Up @@ -140,6 +151,28 @@ func (err KError) Error() string {
return "kafka server: Messages are rejected since there are fewer in-sync replicas than required."
case ErrNotEnoughReplicasAfterAppend:
return "kafka server: Messages are written to the log, but to fewer in-sync replicas than required."
case ErrInvalidRequiredAcks:
return "kafka server: The number of required acks is invalid (should be either -1, 0, or 1)."
case ErrIllegalGeneration:
return "kafka server: The provided generation id is not the current generation."
case ErrInconsistentGroupProtocol:
return "kafka server: The provider group protocol type is incompatible with the other members."
case ErrInvalidGroupId:
return "kafka server: The provided group id was empty."
case ErrUnknownMemberId:
return "kafka server: The provided member is not known in the current generation."
case ErrInvalidSessionTimeout:
return "kafka server: The provided session timeout is outside the allowed range."
case ErrRebalanceInProgress:
return "kafka server: A rebalance for the group is in progress. Please re-join the group."
case ErrInvalidCommitOffsetSize:
return "kafka server: The provided commit metadata was too large."
case ErrTopicAuthorizationFailed:
return "kafka server: The client is not authorized to access this topic."
case ErrGroupAuthorizationFailed:
return "kafka server: The client is not authorized to access this group."
case ErrClusterAuthorizationFailed:
return "kafka server: The client is not authorized to send this request type."
}

return fmt.Sprintf("Unknown error, how did this happen? Error code = %d", err)
Expand Down
Loading

0 comments on commit 159e999

Please sign in to comment.