Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement Kafka 0.9 protocol additions #577

Merged
merged 11 commits into from
Dec 9, 2015
26 changes: 26 additions & 0 deletions describe_groups_request.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package sarama

type DescribeGroupsRequest struct {
Groups []string
}

func (r *DescribeGroupsRequest) encode(pe packetEncoder) error {
return pe.putStringArray(r.Groups)
}

func (r *DescribeGroupsRequest) decode(pd packetDecoder) (err error) {
r.Groups, err = pd.getStringArray()
return
}

func (r *DescribeGroupsRequest) key() int16 {
return 15
}

func (r *DescribeGroupsRequest) version() int16 {
return 0
}

func (r *DescribeGroupsRequest) AddGroup(group string) {
r.Groups = append(r.Groups, group)
}
34 changes: 34 additions & 0 deletions describe_groups_request_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package sarama

import "testing"

var (
emptyDescribeGroupsRequest = []byte{0, 0, 0, 0}

singleDescribeGroupsRequest = []byte{
0, 0, 0, 1, // 1 group
0, 3, 'f', 'o', 'o', // group name: foo
}

doubleDescribeGroupsRequest = []byte{
0, 0, 0, 2, // 2 groups
0, 3, 'f', 'o', 'o', // group name: foo
0, 3, 'b', 'a', 'r', // group name: foo
}
)

func TestDescribeGroupsRequest(t *testing.T) {
var request *DescribeGroupsRequest

request = new(DescribeGroupsRequest)
testRequest(t, "no groups", request, emptyDescribeGroupsRequest)

request = new(DescribeGroupsRequest)
request.AddGroup("foo")
testRequest(t, "one group", request, singleDescribeGroupsRequest)

request = new(DescribeGroupsRequest)
request.AddGroup("foo")
request.AddGroup("bar")
testRequest(t, "two groups", request, doubleDescribeGroupsRequest)
}
162 changes: 162 additions & 0 deletions describe_groups_response.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
package sarama

type DescribeGroupsResponse struct {
Groups []*GroupDescription
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

kinda/almost/sorta makes sense for this to be a map keyed by group ID, but not sure if it's worth it at this point

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it would make decode rather more complex

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also like that this matches the request: you ask for a slice of groups, you get a slice of group descriptions back.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fair

}

func (r *DescribeGroupsResponse) encode(pe packetEncoder) error {
if err := pe.putArrayLength(len(r.Groups)); err != nil {
return err
}

for _, groupDescription := range r.Groups {
if err := groupDescription.encode(pe); err != nil {
return err
}
}

return nil
}

func (r *DescribeGroupsResponse) decode(pd packetDecoder) (err error) {
n, err := pd.getArrayLength()
if err != nil {
return err
}

r.Groups = make([]*GroupDescription, n)
for i := 0; i < n; i++ {
r.Groups[i] = new(GroupDescription)
if err := r.Groups[i].decode(pd); err != nil {
return err
}
}

return nil
}

type GroupDescription struct {
Err KError
GroupId string
State string
ProtocolType string
Protocol string
Members map[string]*GroupMemberDescription
}

func (gd *GroupDescription) encode(pe packetEncoder) error {
pe.putInt16(int16(gd.Err))

if err := pe.putString(gd.GroupId); err != nil {
return err
}
if err := pe.putString(gd.State); err != nil {
return err
}
if err := pe.putString(gd.ProtocolType); err != nil {
return err
}
if err := pe.putString(gd.Protocol); err != nil {
return err
}

if err := pe.putArrayLength(len(gd.Members)); err != nil {
return err
}

for memberId, groupMemberDescription := range gd.Members {
if err := pe.putString(memberId); err != nil {
return err
}
if err := groupMemberDescription.encode(pe); err != nil {
return err
}
}

return nil
}

func (gd *GroupDescription) decode(pd packetDecoder) (err error) {
if kerr, err := pd.getInt16(); err != nil {
return err
} else {
gd.Err = KError(kerr)
}

if gd.GroupId, err = pd.getString(); err != nil {
return
}
if gd.State, err = pd.getString(); err != nil {
return
}
if gd.ProtocolType, err = pd.getString(); err != nil {
return
}
if gd.Protocol, err = pd.getString(); err != nil {
return
}

n, err := pd.getArrayLength()
if err != nil {
return err
}
if n == 0 {
return nil
}

gd.Members = make(map[string]*GroupMemberDescription)
for i := 0; i < n; i++ {
memberId, err := pd.getString()
if err != nil {
return err
}

gd.Members[memberId] = new(GroupMemberDescription)
if err := gd.Members[memberId].decode(pd); err != nil {
return err
}
}

return nil
}

type GroupMemberDescription struct {
ClientId string
ClientHost string
MemberMetadata []byte
MemberAssignment []byte
}

func (gmd *GroupMemberDescription) encode(pe packetEncoder) error {
if err := pe.putString(gmd.ClientId); err != nil {
return err
}
if err := pe.putString(gmd.ClientHost); err != nil {
return err
}
if err := pe.putBytes(gmd.MemberMetadata); err != nil {
return err
}
if err := pe.putBytes(gmd.MemberAssignment); err != nil {
return err
}

return nil
}

func (gmd *GroupMemberDescription) decode(pd packetDecoder) (err error) {
if gmd.ClientId, err = pd.getString(); err != nil {
return
}
if gmd.ClientHost, err = pd.getString(); err != nil {
return
}
if gmd.MemberMetadata, err = pd.getBytes(); err != nil {
return
}
if gmd.MemberAssignment, err = pd.getBytes(); err != nil {
return
}

return nil
}
91 changes: 91 additions & 0 deletions describe_groups_response_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package sarama

import (
"reflect"
"testing"
)

var (
describeGroupsResponseEmpty = []byte{
0, 0, 0, 0, // no groups
}

describeGroupsResponsePopulated = []byte{
0, 0, 0, 2, // 2 groups

0, 0, // no error
0, 3, 'f', 'o', 'o', // Group ID
0, 3, 'b', 'a', 'r', // State
0, 8, 'c', 'o', 'n', 's', 'u', 'm', 'e', 'r', // ConsumerProtocol type
0, 3, 'b', 'a', 'z', // Protocol name
0, 0, 0, 1, // 1 member
0, 2, 'i', 'd', // Member ID
0, 6, 's', 'a', 'r', 'a', 'm', 'a', // Client ID
0, 9, 'l', 'o', 'c', 'a', 'l', 'h', 'o', 's', 't', // Client Host
0, 0, 0, 3, 0x01, 0x02, 0x03, // MemberMetadata
0, 0, 0, 3, 0x04, 0x05, 0x06, // MemberAssignment

0, 30, // ErrGroupAuthorizationFailed
0, 0,
0, 0,
0, 0,
0, 0,
0, 0, 0, 0,
}
)

func TestDescribeGroupsResponse(t *testing.T) {
var response *DescribeGroupsResponse

response = new(DescribeGroupsResponse)
testDecodable(t, "empty", response, describeGroupsResponseEmpty)
if len(response.Groups) != 0 {
t.Error("Expected no groups")
}

response = new(DescribeGroupsResponse)
testDecodable(t, "populated", response, describeGroupsResponsePopulated)
if len(response.Groups) != 2 {
t.Error("Expected two groups")
}

group0 := response.Groups[0]
if group0.Err != ErrNoError {
t.Error("Unxpected groups[0].Err, found", group0.Err)
}
if group0.GroupId != "foo" {
t.Error("Unxpected groups[0].GroupId, found", group0.GroupId)
}
if group0.State != "bar" {
t.Error("Unxpected groups[0].State, found", group0.State)
}
if group0.ProtocolType != "consumer" {
t.Error("Unxpected groups[0].ProtocolType, found", group0.ProtocolType)
}
if group0.Protocol != "baz" {
t.Error("Unxpected groups[0].Protocol, found", group0.Protocol)
}
if len(group0.Members) != 1 {
t.Error("Unxpected groups[0].Members, found", group0.Members)
}
if group0.Members["id"].ClientId != "sarama" {
t.Error("Unxpected groups[0].Members[id].ClientId, found", group0.Members["id"].ClientId)
}
if group0.Members["id"].ClientHost != "localhost" {
t.Error("Unxpected groups[0].Members[id].ClientHost, found", group0.Members["id"].ClientHost)
}
if !reflect.DeepEqual(group0.Members["id"].MemberMetadata, []byte{0x01, 0x02, 0x03}) {
t.Error("Unxpected groups[0].Members[id].MemberMetadata, found", group0.Members["id"].MemberMetadata)
}
if !reflect.DeepEqual(group0.Members["id"].MemberAssignment, []byte{0x04, 0x05, 0x06}) {
t.Error("Unxpected groups[0].Members[id].MemberAssignment, found", group0.Members["id"].MemberAssignment)
}

group1 := response.Groups[1]
if group1.Err != ErrGroupAuthorizationFailed {
t.Error("Unxpected groups[1].Err, found", group0.Err)
}
if len(group1.Members) != 0 {
t.Error("Unxpected groups[1].Members, found", group0.Members)
}
}
33 changes: 33 additions & 0 deletions errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,17 @@ const (
ErrMessageSetSizeTooLarge KError = 18
ErrNotEnoughReplicas KError = 19
ErrNotEnoughReplicasAfterAppend KError = 20
ErrInvalidRequiredAcks KError = 21
ErrIllegalGeneration KError = 22
ErrInconsistentGroupProtocol KError = 23
ErrInvalidGroupId KError = 24
ErrUnknownMemberId KError = 25
ErrInvalidSessionTimeout KError = 26
ErrRebalanceInProgress KError = 27
ErrInvalidCommitOffsetSize KError = 28
ErrTopicAuthorizationFailed KError = 29
ErrGroupAuthorizationFailed KError = 30
ErrClusterAuthorizationFailed KError = 31
)

func (err KError) Error() string {
Expand Down Expand Up @@ -140,6 +151,28 @@ func (err KError) Error() string {
return "kafka server: Messages are rejected since there are fewer in-sync replicas than required."
case ErrNotEnoughReplicasAfterAppend:
return "kafka server: Messages are written to the log, but to fewer in-sync replicas than required."
case ErrInvalidRequiredAcks:
return "kafka server: The number of required acks is invalid (should be either -1, 0, or 1)."
case ErrIllegalGeneration:
return "kafka server: The provided generation id is not the current generation."
case ErrInconsistentGroupProtocol:
return "kafka server: The provider group protocol type is incompatible with the other members."
case ErrInvalidGroupId:
return "kafka server: The provided group id was empty."
case ErrUnknownMemberId:
return "kafka server: The provided member is not known in the current generation."
case ErrInvalidSessionTimeout:
return "kafka server: The provided session timeout is outside the allowed range."
case ErrRebalanceInProgress:
return "kafka server: A rebalance for the group is in progress. Please re-join the group."
case ErrInvalidCommitOffsetSize:
return "kafka server: The provided commit metadata was too large."
case ErrTopicAuthorizationFailed:
return "kafka server: The client is not authorized to access this topic."
case ErrGroupAuthorizationFailed:
return "kafka server: The client is not authorized to access this group."
case ErrClusterAuthorizationFailed:
return "kafka server: The client is not authorized to send this request type."
}

return fmt.Sprintf("Unknown error, how did this happen? Error code = %d", err)
Expand Down
Loading