Skip to content

Commit

Permalink
Plumb through v0.10 support for consumer/fetch
Browse files Browse the repository at this point in the history
  • Loading branch information
eapache committed Jun 16, 2016
1 parent 19422bf commit 95bd3cd
Show file tree
Hide file tree
Showing 5 changed files with 62 additions and 14 deletions.
15 changes: 15 additions & 0 deletions broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,21 @@ func (b *Broker) Produce(request *ProduceRequest) (*ProduceResponse, error) {
}

func (b *Broker) Fetch(request *FetchRequest) (*FetchResponse, error) {
switch request.version() {
case 0:
break
case 1:
if !b.conf.Version.IsAtLeast(V0_9_0_0) {
return nil, ErrUnsupportedVersion
}
case 2:
if !b.conf.Version.IsAtLeast(V0_10_0_0) {
return nil, ErrUnsupportedVersion
}
default:
return nil, ErrUnsupportedVersion
}

response := new(FetchResponse)

err := b.sendAndReceive(request, response)
Expand Down
5 changes: 5 additions & 0 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ type ConsumerMessage struct {
Topic string
Partition int32
Offset int64
Timestamp time.Time // only set if kafka is version 0.10+
}

// ConsumerError is what is provided to the user when an error occurs.
Expand Down Expand Up @@ -489,6 +490,7 @@ func (child *partitionConsumer) parseResponse(response *FetchResponse) ([]*Consu
Key: msg.Msg.Key,
Value: msg.Msg.Value,
Offset: msg.Offset,
Timestamp: msg.Msg.Timestamp,
})
child.offset = msg.Offset + 1
} else {
Expand Down Expand Up @@ -682,6 +684,9 @@ func (bc *brokerConsumer) fetchNewMessages() (*FetchResponse, error) {
MinBytes: bc.consumer.conf.Consumer.Fetch.Min,
MaxWaitTime: int32(bc.consumer.conf.Consumer.MaxWaitTime / time.Millisecond),
}
if bc.consumer.conf.Version.IsAtLeast(V0_10_0_0) {
request.Version = 2
}

for child := range bc.subscriptions {
request.AddBlock(child.topic, child.partition, child.offset, child.fetchSize)
Expand Down
2 changes: 2 additions & 0 deletions fetch_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ func (f *fetchRequestBlock) decode(pd packetDecoder) (err error) {
type FetchRequest struct {
MaxWaitTime int32
MinBytes int32
Version int16
blocks map[string]map[int32]*fetchRequestBlock
}

Expand Down Expand Up @@ -56,6 +57,7 @@ func (f *FetchRequest) encode(pe packetEncoder) (err error) {
}

func (f *FetchRequest) decode(pd packetDecoder, version int16) (err error) {
f.Version = version
if _, err = pd.getInt32(); err != nil {
return err
}
Expand Down
20 changes: 19 additions & 1 deletion fetch_response.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package sarama

import "time"

type FetchResponseBlock struct {
Err KError
HighWaterMarkOffset int64
Expand Down Expand Up @@ -33,7 +35,9 @@ func (pr *FetchResponseBlock) decode(pd packetDecoder) (err error) {
}

type FetchResponse struct {
Blocks map[string]map[int32]*FetchResponseBlock
Blocks map[string]map[int32]*FetchResponseBlock
ThrottleTime time.Duration
Version int16 // v1 requires 0.9+, v2 requires 0.10+
}

func (pr *FetchResponseBlock) encode(pe packetEncoder) (err error) {
Expand All @@ -50,6 +54,16 @@ func (pr *FetchResponseBlock) encode(pe packetEncoder) (err error) {
}

func (fr *FetchResponse) decode(pd packetDecoder, version int16) (err error) {
fr.Version = version

if fr.Version >= 1 {
throttle, err := pd.getInt64()
if err != nil {
return err
}
fr.ThrottleTime = time.Duration(throttle)
}

numTopics, err := pd.getArrayLength()
if err != nil {
return err
Expand Down Expand Up @@ -88,6 +102,10 @@ func (fr *FetchResponse) decode(pd packetDecoder, version int16) (err error) {
}

func (fr *FetchResponse) encode(pe packetEncoder) (err error) {
if fr.Version >= 1 {
pe.putInt64(int64(fr.ThrottleTime))
}

err = pe.putArrayLength(len(fr.Blocks))
if err != nil {
return err
Expand Down
34 changes: 21 additions & 13 deletions message.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"compress/gzip"
"fmt"
"io/ioutil"
"time"

"github.com/eapache/go-xerial-snappy"
)
Expand All @@ -21,27 +22,29 @@ const (
CompressionSnappy CompressionCodec = 2
)

// The spec just says: "This is a version id used to allow backwards compatible evolution of the message
// binary format." but it doesn't say what the current value is, so presumably 0...
const messageFormat int8 = 0

type Message struct {
Codec CompressionCodec // codec used to compress the message contents
Key []byte // the message key, may be nil
Value []byte // the message contents
Set *MessageSet // the message set a message might wrap
Codec CompressionCodec // codec used to compress the message contents
Key []byte // the message key, may be nil
Value []byte // the message contents
Set *MessageSet // the message set a message might wrap
Version int8 // v1 requires Kafka 0.10
Timestamp time.Time // the timestamp of the message (version 1+ only)

compressedCache []byte
}

func (m *Message) encode(pe packetEncoder) error {
pe.push(&crc32Field{})

pe.putInt8(messageFormat)
pe.putInt8(m.Version)

attributes := int8(m.Codec) & compressionCodecMask
pe.putInt8(attributes)

if m.Version >= 1 {
pe.putInt64(m.Timestamp.Unix())
}

err := pe.putBytes(m.Key)
if err != nil {
return err
Expand Down Expand Up @@ -89,20 +92,25 @@ func (m *Message) decode(pd packetDecoder) (err error) {
return err
}

format, err := pd.getInt8()
m.Version, err = pd.getInt8()
if err != nil {
return err
}
if format != messageFormat {
return PacketDecodingError{"unexpected messageFormat"}
}

attribute, err := pd.getInt8()
if err != nil {
return err
}
m.Codec = CompressionCodec(attribute & compressionCodecMask)

if m.Version >= 1 {
timestamp, err := pd.getInt64()
if err != nil {
return err
}
m.Timestamp = time.Unix(timestamp, 0)
}

m.Key, err = pd.getBytes()
if err != nil {
return err
Expand Down

0 comments on commit 95bd3cd

Please sign in to comment.