From 95bd3cd047dcab60177b5f8db20f3b765e729363 Mon Sep 17 00:00:00 2001 From: Evan Huus Date: Thu, 16 Jun 2016 10:50:12 -0400 Subject: [PATCH] Plumb through v0.10 support for consumer/fetch --- broker.go | 15 +++++++++++++++ consumer.go | 5 +++++ fetch_request.go | 2 ++ fetch_response.go | 20 +++++++++++++++++++- message.go | 34 +++++++++++++++++++++------------- 5 files changed, 62 insertions(+), 14 deletions(-) diff --git a/broker.go b/broker.go index 992931bd0d..8b6aa07e14 100644 --- a/broker.go +++ b/broker.go @@ -231,6 +231,21 @@ func (b *Broker) Produce(request *ProduceRequest) (*ProduceResponse, error) { } func (b *Broker) Fetch(request *FetchRequest) (*FetchResponse, error) { + switch request.version() { + case 0: + break + case 1: + if !b.conf.Version.IsAtLeast(V0_9_0_0) { + return nil, ErrUnsupportedVersion + } + case 2: + if !b.conf.Version.IsAtLeast(V0_10_0_0) { + return nil, ErrUnsupportedVersion + } + default: + return nil, ErrUnsupportedVersion + } + response := new(FetchResponse) err := b.sendAndReceive(request, response) diff --git a/consumer.go b/consumer.go index 8694527546..f6c9797662 100644 --- a/consumer.go +++ b/consumer.go @@ -14,6 +14,7 @@ type ConsumerMessage struct { Topic string Partition int32 Offset int64 + Timestamp time.Time // only set if kafka is version 0.10+ } // ConsumerError is what is provided to the user when an error occurs. @@ -489,6 +490,7 @@ func (child *partitionConsumer) parseResponse(response *FetchResponse) ([]*Consu Key: msg.Msg.Key, Value: msg.Msg.Value, Offset: msg.Offset, + Timestamp: msg.Msg.Timestamp, }) child.offset = msg.Offset + 1 } else { @@ -682,6 +684,9 @@ func (bc *brokerConsumer) fetchNewMessages() (*FetchResponse, error) { MinBytes: bc.consumer.conf.Consumer.Fetch.Min, MaxWaitTime: int32(bc.consumer.conf.Consumer.MaxWaitTime / time.Millisecond), } + if bc.consumer.conf.Version.IsAtLeast(V0_10_0_0) { + request.Version = 2 + } for child := range bc.subscriptions { request.AddBlock(child.topic, child.partition, child.offset, child.fetchSize) diff --git a/fetch_request.go b/fetch_request.go index bd4b403fec..02ba6e48b1 100644 --- a/fetch_request.go +++ b/fetch_request.go @@ -24,6 +24,7 @@ func (f *fetchRequestBlock) decode(pd packetDecoder) (err error) { type FetchRequest struct { MaxWaitTime int32 MinBytes int32 + Version int16 blocks map[string]map[int32]*fetchRequestBlock } @@ -56,6 +57,7 @@ func (f *FetchRequest) encode(pe packetEncoder) (err error) { } func (f *FetchRequest) decode(pd packetDecoder, version int16) (err error) { + f.Version = version if _, err = pd.getInt32(); err != nil { return err } diff --git a/fetch_response.go b/fetch_response.go index 13889d52b2..525b519224 100644 --- a/fetch_response.go +++ b/fetch_response.go @@ -1,5 +1,7 @@ package sarama +import "time" + type FetchResponseBlock struct { Err KError HighWaterMarkOffset int64 @@ -33,7 +35,9 @@ func (pr *FetchResponseBlock) decode(pd packetDecoder) (err error) { } type FetchResponse struct { - Blocks map[string]map[int32]*FetchResponseBlock + Blocks map[string]map[int32]*FetchResponseBlock + ThrottleTime time.Duration + Version int16 // v1 requires 0.9+, v2 requires 0.10+ } func (pr *FetchResponseBlock) encode(pe packetEncoder) (err error) { @@ -50,6 +54,16 @@ func (pr *FetchResponseBlock) encode(pe packetEncoder) (err error) { } func (fr *FetchResponse) decode(pd packetDecoder, version int16) (err error) { + fr.Version = version + + if fr.Version >= 1 { + throttle, err := pd.getInt64() + if err != nil { + return err + } + fr.ThrottleTime = time.Duration(throttle) + } + numTopics, err := pd.getArrayLength() if err != nil { return err @@ -88,6 +102,10 @@ func (fr *FetchResponse) decode(pd packetDecoder, version int16) (err error) { } func (fr *FetchResponse) encode(pe packetEncoder) (err error) { + if fr.Version >= 1 { + pe.putInt64(int64(fr.ThrottleTime)) + } + err = pe.putArrayLength(len(fr.Blocks)) if err != nil { return err diff --git a/message.go b/message.go index 55edc9d52f..de616c3c6f 100644 --- a/message.go +++ b/message.go @@ -5,6 +5,7 @@ import ( "compress/gzip" "fmt" "io/ioutil" + "time" "github.com/eapache/go-xerial-snappy" ) @@ -21,15 +22,13 @@ const ( CompressionSnappy CompressionCodec = 2 ) -// The spec just says: "This is a version id used to allow backwards compatible evolution of the message -// binary format." but it doesn't say what the current value is, so presumably 0... -const messageFormat int8 = 0 - type Message struct { - Codec CompressionCodec // codec used to compress the message contents - Key []byte // the message key, may be nil - Value []byte // the message contents - Set *MessageSet // the message set a message might wrap + Codec CompressionCodec // codec used to compress the message contents + Key []byte // the message key, may be nil + Value []byte // the message contents + Set *MessageSet // the message set a message might wrap + Version int8 // v1 requires Kafka 0.10 + Timestamp time.Time // the timestamp of the message (version 1+ only) compressedCache []byte } @@ -37,11 +36,15 @@ type Message struct { func (m *Message) encode(pe packetEncoder) error { pe.push(&crc32Field{}) - pe.putInt8(messageFormat) + pe.putInt8(m.Version) attributes := int8(m.Codec) & compressionCodecMask pe.putInt8(attributes) + if m.Version >= 1 { + pe.putInt64(m.Timestamp.Unix()) + } + err := pe.putBytes(m.Key) if err != nil { return err @@ -89,13 +92,10 @@ func (m *Message) decode(pd packetDecoder) (err error) { return err } - format, err := pd.getInt8() + m.Version, err = pd.getInt8() if err != nil { return err } - if format != messageFormat { - return PacketDecodingError{"unexpected messageFormat"} - } attribute, err := pd.getInt8() if err != nil { @@ -103,6 +103,14 @@ func (m *Message) decode(pd packetDecoder) (err error) { } m.Codec = CompressionCodec(attribute & compressionCodecMask) + if m.Version >= 1 { + timestamp, err := pd.getInt64() + if err != nil { + return err + } + m.Timestamp = time.Unix(timestamp, 0) + } + m.Key, err = pd.getBytes() if err != nil { return err