Skip to content

Commit

Permalink
Merge pull request #973 from wladh/records
Browse files Browse the repository at this point in the history
Add implementation of Kafka 0.11 Records
  • Loading branch information
eapache authored Oct 31, 2017
2 parents 84ac271 + ff1f79c commit eca6c1c
Show file tree
Hide file tree
Showing 10 changed files with 913 additions and 13 deletions.
27 changes: 19 additions & 8 deletions length_field.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,24 +33,35 @@ type varintLengthField struct {
length int64
}

func newVarintLengthField(pd packetDecoder) (*varintLengthField, error) {
n, err := pd.getVarint()
if err != nil {
return nil, err
}
return &varintLengthField{length: n}, nil
func (l *varintLengthField) decode(pd packetDecoder) error {
var err error
l.length, err = pd.getVarint()
return err
}

func (l *varintLengthField) saveOffset(in int) {
l.startOffset = in
}

func (l *varintLengthField) adjustLength(currOffset int) int {
oldFieldSize := l.reserveLength()
l.length = int64(currOffset - l.startOffset - oldFieldSize)

return l.reserveLength() - oldFieldSize
}

func (l *varintLengthField) reserveLength() int {
return 0
var tmp [binary.MaxVarintLen64]byte
return binary.PutVarint(tmp[:], l.length)
}

func (l *varintLengthField) run(curOffset int, buf []byte) error {
binary.PutVarint(buf[l.startOffset:], l.length)
return nil
}

func (l *varintLengthField) check(curOffset int, buf []byte) error {
if int64(curOffset-l.startOffset) != l.length {
if int64(curOffset-l.startOffset-l.reserveLength()) != l.length {
return PacketDecodingError{"length field invalid"}
}

Expand Down
9 changes: 9 additions & 0 deletions packet_decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,3 +46,12 @@ type pushDecoder interface {
// of data from the saved offset, and verify it based on the data between the saved offset and curOffset.
check(curOffset int, buf []byte) error
}

// dynamicPushDecoder extends the interface of pushDecoder for uses cases where the length of the
// fields itself is unknown until its value was decoded (for instance varint encoded length
// fields).
// During push, dynamicPushDecoder.decode() method will be called instead of reserveLength()
type dynamicPushDecoder interface {
pushDecoder
decoder
}
11 changes: 11 additions & 0 deletions packet_encoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,3 +50,14 @@ type pushEncoder interface {
// of data to the saved offset, based on the data between the saved offset and curOffset.
run(curOffset int, buf []byte) error
}

// dynamicPushEncoder extends the interface of pushEncoder for uses cases where the length of the
// fields itself is unknown until its value was computed (for instance varint encoded length
// fields).
type dynamicPushEncoder interface {
pushEncoder

// Called during pop() to adjust the length of the field.
// It should return the difference in bytes between the last computed length and current length.
adjustLength(currOffset int) int
}
9 changes: 9 additions & 0 deletions prep_encoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
)

type prepEncoder struct {
stack []pushEncoder
length int
}

Expand Down Expand Up @@ -119,10 +120,18 @@ func (pe *prepEncoder) offset() int {
// stackable

func (pe *prepEncoder) push(in pushEncoder) {
in.saveOffset(pe.length)
pe.length += in.reserveLength()
pe.stack = append(pe.stack, in)
}

func (pe *prepEncoder) pop() error {
in := pe.stack[len(pe.stack)-1]
pe.stack = pe.stack[:len(pe.stack)-1]
if dpe, ok := in.(dynamicPushEncoder); ok {
pe.length += dpe.adjustLength(pe.length)
}

return nil
}

Expand Down
17 changes: 12 additions & 5 deletions real_decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func (rd *realDecoder) getArrayLength() (int, error) {
rd.off = len(rd.raw)
return -1, ErrInsufficientData
}
tmp := int(binary.BigEndian.Uint32(rd.raw[rd.off:]))
tmp := int(int32(binary.BigEndian.Uint32(rd.raw[rd.off:])))
rd.off += 4
if tmp > rd.remaining() {
rd.off = len(rd.raw)
Expand Down Expand Up @@ -260,10 +260,17 @@ func (rd *realDecoder) getRawBytes(length int) ([]byte, error) {
func (rd *realDecoder) push(in pushDecoder) error {
in.saveOffset(rd.off)

reserve := in.reserveLength()
if rd.remaining() < reserve {
rd.off = len(rd.raw)
return ErrInsufficientData
var reserve int
if dpd, ok := in.(dynamicPushDecoder); ok {
if err := dpd.decode(rd); err != nil {
return err
}
} else {
reserve = in.reserveLength()
if rd.remaining() < reserve {
rd.off = len(rd.raw)
return ErrInsufficientData
}
}

rd.stack = append(rd.stack, in)
Expand Down
105 changes: 105 additions & 0 deletions record.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
package sarama

const (
controlMask = 0x20
)

type RecordHeader struct {
Key []byte
Value []byte
}

func (h *RecordHeader) encode(pe packetEncoder) error {
if err := pe.putVarintBytes(h.Key); err != nil {
return err
}
return pe.putVarintBytes(h.Value)
}

func (h *RecordHeader) decode(pd packetDecoder) (err error) {
if h.Key, err = pd.getVarintBytes(); err != nil {
return err
}

if h.Value, err = pd.getVarintBytes(); err != nil {
return err
}
return nil
}

type Record struct {
Attributes int8
TimestampDelta int64
OffsetDelta int64
Key []byte
Value []byte
Headers []*RecordHeader

length varintLengthField
}

func (r *Record) encode(pe packetEncoder) error {
pe.push(&r.length)
pe.putInt8(r.Attributes)
pe.putVarint(r.TimestampDelta)
pe.putVarint(r.OffsetDelta)
if err := pe.putVarintBytes(r.Key); err != nil {
return err
}
if err := pe.putVarintBytes(r.Value); err != nil {
return err
}
pe.putVarint(int64(len(r.Headers)))

for _, h := range r.Headers {
if err := h.encode(pe); err != nil {
return err
}
}

return pe.pop()
}

func (r *Record) decode(pd packetDecoder) (err error) {
if err = pd.push(&r.length); err != nil {
return err
}

if r.Attributes, err = pd.getInt8(); err != nil {
return err
}

if r.TimestampDelta, err = pd.getVarint(); err != nil {
return err
}

if r.OffsetDelta, err = pd.getVarint(); err != nil {
return err
}

if r.Key, err = pd.getVarintBytes(); err != nil {
return err
}

if r.Value, err = pd.getVarintBytes(); err != nil {
return err
}

numHeaders, err := pd.getVarint()
if err != nil {
return err
}

if numHeaders >= 0 {
r.Headers = make([]*RecordHeader, numHeaders)
}
for i := int64(0); i < numHeaders; i++ {
hdr := new(RecordHeader)
if err := hdr.decode(pd); err != nil {
return err
}
r.Headers[i] = hdr
}

return pd.pop()
}
Loading

0 comments on commit eca6c1c

Please sign in to comment.