Skip to content

Commit

Permalink
Add SyncGroup request and response pair.
Browse files Browse the repository at this point in the history
  • Loading branch information
wvanbergen committed Dec 7, 2015
1 parent d8137e6 commit c536fb3
Show file tree
Hide file tree
Showing 2 changed files with 115 additions and 0 deletions.
93 changes: 93 additions & 0 deletions sync_group_request.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package sarama

type SyncGroupRequest struct {
GroupId string
GenerationId string
MemberId string
GroupAssignments []*GroupAssignment
}

func (r *SyncGroupRequest) encode(pe packetEncoder) error {
if err := pe.putString(r.GroupId); err != nil {
return err
}
if err := pe.putString(r.GenerationId); err != nil {
return err
}
if err := pe.putString(r.MemberId); err != nil {
return err
}

if err := pe.putArrayLength(len(r.GroupAssignments)); err != nil {
return err
}
for _, groupAssignment := range r.GroupAssignments {
if err := groupAssignment.encode(pe); err != nil {
return err
}
}

return nil
}

func (r *SyncGroupRequest) decode(pd packetDecoder) (err error) {
if r.GroupId, err = pd.getString(); err != nil {
return
}
if r.GenerationId, err = pd.getString(); err != nil {
return
}
if r.MemberId, err = pd.getString(); err != nil {
return
}

n, err := pd.getArrayLength()
if err != nil {
return err
}

r.GroupAssignments = make([]*GroupAssignment, n)
for i := 0; i < n; i++ {
r.GroupAssignments[i] = new(GroupAssignment)
if err := r.GroupAssignments[i].decode(pd); err != nil {
return err
}
}

return nil
}

func (r *SyncGroupRequest) key() int16 {
return 14
}

func (r *SyncGroupRequest) version() int16 {
return 0
}

type GroupAssignment struct {
MemberId string
MemberAssignment []byte
}

func (gd *GroupAssignment) encode(pe packetEncoder) error {
if err := pe.putString(gd.MemberId); err != nil {
return err
}
if err := pe.putBytes(gd.MemberAssignment); err != nil {
return err
}

return nil
}

func (gd *GroupAssignment) decode(pd packetDecoder) (err error) {
if gd.MemberId, err = pd.getString(); err != nil {
return
}
if gd.MemberAssignment, err = pd.getBytes(); err != nil {
return
}

return nil
}
22 changes: 22 additions & 0 deletions sync_group_response.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package sarama

type SyncGroupResponse struct {
Err KError
MemberAssignment []byte
}

func (r *SyncGroupResponse) encode(pe packetEncoder) error {
pe.putInt16(int16(r.Err))
return pe.putBytes(r.MemberAssignment)
}

func (r *SyncGroupResponse) decode(pd packetDecoder) (err error) {
if kerr, err := pd.getInt16(); err != nil {
return err
} else {
r.Err = KError(kerr)
}

r.MemberAssignment, err = pd.getBytes()
return
}

0 comments on commit c536fb3

Please sign in to comment.