Skip to content

Commit

Permalink
Remove interface for ConsumerGroupProtocol.
Browse files Browse the repository at this point in the history
  • Loading branch information
wvanbergen committed Dec 7, 2015
1 parent c536fb3 commit 3bb8696
Showing 1 changed file with 23 additions and 41 deletions.
64 changes: 23 additions & 41 deletions join_group_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ type JoinGroupRequest struct {
SessionTimeout int32
MemberId string
ProtocolType string
GroupProtocols []GroupProtocol
GroupProtocols []*GroupProtocol
}

func (r *JoinGroupRequest) encode(pe packetEncoder) error {
Expand All @@ -24,7 +24,7 @@ func (r *JoinGroupRequest) encode(pe packetEncoder) error {
return err
}
for _, groupProtocol := range r.GroupProtocols {
if err := groupProtocol.encodeGroupProtocol(pe); err != nil {
if err := groupProtocol.encode(pe); err != nil {
return err
}
}
Expand Down Expand Up @@ -53,23 +53,17 @@ func (r *JoinGroupRequest) decode(pd packetDecoder) (err error) {
return
}

switch r.ProtocolType {
case "consumer":
n, err := pd.getArrayLength()
if err != nil {
return err
}
n, err := pd.getArrayLength()
if err != nil {
return err
}

r.GroupProtocols = make([]GroupProtocol, n)
for i := 0; i < n; i++ {
r.GroupProtocols[i] = new(ConsumerGroupProtocol)
if err := r.GroupProtocols[i].decodeGroupProtocol(pd); err != nil {
return nil
}
r.GroupProtocols = make([]*GroupProtocol, n)
for i := 0; i < n; i++ {
r.GroupProtocols[i] = new(GroupProtocol)
if err := r.GroupProtocols[i].decode(pd); err != nil {
return err
}

default:
return ErrUnknownGroupProtocol
}

return nil
Expand All @@ -83,42 +77,30 @@ func (r *JoinGroupRequest) version() int16 {
return 0
}

type GroupProtocol interface {
encodeGroupProtocol(packetEncoder) error
decodeGroupProtocol(packetDecoder) error
type GroupProtocol struct {
ProtocolName string
ProtocolMetadata []byte
}

type ConsumerGroupProtocol struct {
ProtocolName string
Version int16
Subscription []string
UserData []byte
}

func (cgp *ConsumerGroupProtocol) encodeGroupProtocol(pe packetEncoder) error {
if err := pe.putString(cgp.ProtocolName); err != nil {
func (gp *GroupProtocol) encode(pe packetEncoder) error {
if err := pe.putString(gp.ProtocolName); err != nil {
return err
}
pe.putInt16(cgp.Version)
if err := pe.putStringArray(cgp.Subscription); err != nil {
if err := pe.putBytes(gp.ProtocolMetadata); err != nil {
return err
}
return pe.putBytes(cgp.UserData)

return nil
}

func (cgp *ConsumerGroupProtocol) decodeGroupProtocol(pd packetDecoder) (err error) {
if cgp.ProtocolName, err = pd.getString(); err != nil {
func (gp *GroupProtocol) decode(pd packetDecoder) (err error) {
if gp.ProtocolName, err = pd.getString(); err != nil {
return
}

if cgp.Version, err = pd.getInt16(); err != nil {
if gp.ProtocolMetadata, err = pd.getBytes(); err != nil {
return
}

if cgp.Subscription, err = pd.getStringArray(); err != nil {
return
}
return nil

cgp.UserData, err = pd.getBytes()
return
}

0 comments on commit 3bb8696

Please sign in to comment.