Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add implementation of Kafka 0.11 Records #973

Merged
merged 17 commits into from
Oct 31, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 19 additions & 8 deletions length_field.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,24 +33,35 @@ type varintLengthField struct {
length int64
}

func newVarintLengthField(pd packetDecoder) (*varintLengthField, error) {
n, err := pd.getVarint()
if err != nil {
return nil, err
}
return &varintLengthField{length: n}, nil
func (l *varintLengthField) decode(pd packetDecoder) error {
var err error
l.length, err = pd.getVarint()
return err
}

func (l *varintLengthField) saveOffset(in int) {
l.startOffset = in
}

func (l *varintLengthField) adjustLength(currOffset int) int {
oldFieldSize := l.reserveLength()
l.length = int64(currOffset - l.startOffset - oldFieldSize)

return l.reserveLength() - oldFieldSize
}

func (l *varintLengthField) reserveLength() int {
return 0
var tmp [binary.MaxVarintLen64]byte
return binary.PutVarint(tmp[:], l.length)
}

func (l *varintLengthField) run(curOffset int, buf []byte) error {
binary.PutVarint(buf[l.startOffset:], l.length)
return nil
}

func (l *varintLengthField) check(curOffset int, buf []byte) error {
if int64(curOffset-l.startOffset) != l.length {
if int64(curOffset-l.startOffset-l.reserveLength()) != l.length {
return PacketDecodingError{"length field invalid"}
}

Expand Down
9 changes: 9 additions & 0 deletions packet_decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,3 +46,12 @@ type pushDecoder interface {
// of data from the saved offset, and verify it based on the data between the saved offset and curOffset.
check(curOffset int, buf []byte) error
}

// dynamicPushDecoder extends the interface of pushDecoder for uses cases where the length of the
// fields itself is unknown until its value was decoded (for instance varint encoded length
// fields).
// During push, dynamicPushDecoder.decode() method will be called instead of reserveLength()
type dynamicPushDecoder interface {
pushDecoder
decoder
}
11 changes: 11 additions & 0 deletions packet_encoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,3 +50,14 @@ type pushEncoder interface {
// of data to the saved offset, based on the data between the saved offset and curOffset.
run(curOffset int, buf []byte) error
}

// dynamicPushEncoder extends the interface of pushEncoder for uses cases where the length of the
// fields itself is unknown until its value was computed (for instance varint encoded length
// fields).
type dynamicPushEncoder interface {
pushEncoder

// Called during pop() to adjust the length of the field.
// It should return the difference in bytes between the last computed length and current length.
adjustLength(currOffset int) int
}
9 changes: 9 additions & 0 deletions prep_encoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
)

type prepEncoder struct {
stack []pushEncoder
length int
}

Expand Down Expand Up @@ -119,10 +120,18 @@ func (pe *prepEncoder) offset() int {
// stackable

func (pe *prepEncoder) push(in pushEncoder) {
in.saveOffset(pe.length)
pe.length += in.reserveLength()
pe.stack = append(pe.stack, in)
}

func (pe *prepEncoder) pop() error {
in := pe.stack[len(pe.stack)-1]
pe.stack = pe.stack[:len(pe.stack)-1]
if dpe, ok := in.(dynamicPushEncoder); ok {
pe.length += dpe.adjustLength(pe.length)
}

return nil
}

Expand Down
17 changes: 12 additions & 5 deletions real_decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func (rd *realDecoder) getArrayLength() (int, error) {
rd.off = len(rd.raw)
return -1, ErrInsufficientData
}
tmp := int(binary.BigEndian.Uint32(rd.raw[rd.off:]))
tmp := int(int32(binary.BigEndian.Uint32(rd.raw[rd.off:])))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why cast to int32 and then int?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because we want to convert it to a signed value, so we're converting it to its signed counterpart (int usually 64 bits on 64bit platforms, so the sign conversion won't happen).

rd.off += 4
if tmp > rd.remaining() {
rd.off = len(rd.raw)
Expand Down Expand Up @@ -260,10 +260,17 @@ func (rd *realDecoder) getRawBytes(length int) ([]byte, error) {
func (rd *realDecoder) push(in pushDecoder) error {
in.saveOffset(rd.off)

reserve := in.reserveLength()
if rd.remaining() < reserve {
rd.off = len(rd.raw)
return ErrInsufficientData
var reserve int
if dpd, ok := in.(dynamicPushDecoder); ok {
if err := dpd.decode(rd); err != nil {
return err
}
} else {
reserve = in.reserveLength()
if rd.remaining() < reserve {
rd.off = len(rd.raw)
return ErrInsufficientData
}
}

rd.stack = append(rd.stack, in)
Expand Down
105 changes: 105 additions & 0 deletions record.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
package sarama

const (
controlMask = 0x20
)

type RecordHeader struct {
Key []byte
Value []byte
}

func (h *RecordHeader) encode(pe packetEncoder) error {
if err := pe.putVarintBytes(h.Key); err != nil {
return err
}
return pe.putVarintBytes(h.Value)
}

func (h *RecordHeader) decode(pd packetDecoder) (err error) {
if h.Key, err = pd.getVarintBytes(); err != nil {
return err
}

if h.Value, err = pd.getVarintBytes(); err != nil {
return err
}
return nil
}

type Record struct {
Attributes int8
TimestampDelta int64
OffsetDelta int64
Key []byte
Value []byte
Headers []*RecordHeader

length varintLengthField
}

func (r *Record) encode(pe packetEncoder) error {
pe.push(&r.length)
pe.putInt8(r.Attributes)
pe.putVarint(r.TimestampDelta)
pe.putVarint(r.OffsetDelta)
if err := pe.putVarintBytes(r.Key); err != nil {
return err
}
if err := pe.putVarintBytes(r.Value); err != nil {
return err
}
pe.putVarint(int64(len(r.Headers)))

for _, h := range r.Headers {
if err := h.encode(pe); err != nil {
return err
}
}

return pe.pop()
}

func (r *Record) decode(pd packetDecoder) (err error) {
if err = pd.push(&r.length); err != nil {
return err
}

if r.Attributes, err = pd.getInt8(); err != nil {
return err
}

if r.TimestampDelta, err = pd.getVarint(); err != nil {
return err
}

if r.OffsetDelta, err = pd.getVarint(); err != nil {
return err
}

if r.Key, err = pd.getVarintBytes(); err != nil {
return err
}

if r.Value, err = pd.getVarintBytes(); err != nil {
return err
}

numHeaders, err := pd.getVarint()
if err != nil {
return err
}

if numHeaders >= 0 {
r.Headers = make([]*RecordHeader, numHeaders)
}
for i := int64(0); i < numHeaders; i++ {
hdr := new(RecordHeader)
if err := hdr.decode(pd); err != nil {
return err
}
r.Headers[i] = hdr
}

return pd.pop()
}
Loading