Skip to content

Commit

Permalink
Merge pull request #681 from Shopify/consumer-timestamps
Browse files Browse the repository at this point in the history
Plumb through v0.10 support for consumer/fetch
  • Loading branch information
eapache authored Jun 20, 2016
2 parents 7e6290f + 10d06c2 commit 2979d3c
Show file tree
Hide file tree
Showing 5 changed files with 73 additions and 21 deletions.
5 changes: 5 additions & 0 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ type ConsumerMessage struct {
Topic string
Partition int32
Offset int64
Timestamp time.Time // only set if kafka is version 0.10+
}

// ConsumerError is what is provided to the user when an error occurs.
Expand Down Expand Up @@ -489,6 +490,7 @@ func (child *partitionConsumer) parseResponse(response *FetchResponse) ([]*Consu
Key: msg.Msg.Key,
Value: msg.Msg.Value,
Offset: msg.Offset,
Timestamp: msg.Msg.Timestamp,
})
child.offset = msg.Offset + 1
} else {
Expand Down Expand Up @@ -682,6 +684,9 @@ func (bc *brokerConsumer) fetchNewMessages() (*FetchResponse, error) {
MinBytes: bc.consumer.conf.Consumer.Fetch.Min,
MaxWaitTime: int32(bc.consumer.conf.Consumer.MaxWaitTime / time.Millisecond),
}
if bc.consumer.conf.Version.IsAtLeast(V0_10_0_0) {
request.Version = 2
}

for child := range bc.subscriptions {
request.AddBlock(child.topic, child.partition, child.offset, child.fetchSize)
Expand Down
13 changes: 11 additions & 2 deletions fetch_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ func (f *fetchRequestBlock) decode(pd packetDecoder) (err error) {
type FetchRequest struct {
MaxWaitTime int32
MinBytes int32
Version int16
blocks map[string]map[int32]*fetchRequestBlock
}

Expand Down Expand Up @@ -56,6 +57,7 @@ func (f *FetchRequest) encode(pe packetEncoder) (err error) {
}

func (f *FetchRequest) decode(pd packetDecoder, version int16) (err error) {
f.Version = version
if _, err = pd.getInt32(); err != nil {
return err
}
Expand Down Expand Up @@ -103,11 +105,18 @@ func (f *FetchRequest) key() int16 {
}

func (f *FetchRequest) version() int16 {
return 0
return f.Version
}

func (r *FetchRequest) requiredVersion() KafkaVersion {
return minVersion
switch r.Version {
case 1:
return V0_9_0_0
case 2:
return V0_10_0_0
default:
return minVersion
}
}

func (f *FetchRequest) AddBlock(topic string, partitionID int32, fetchOffset int64, maxBytes int32) {
Expand Down
31 changes: 28 additions & 3 deletions fetch_response.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package sarama

import "time"

type FetchResponseBlock struct {
Err KError
HighWaterMarkOffset int64
Expand Down Expand Up @@ -33,7 +35,9 @@ func (pr *FetchResponseBlock) decode(pd packetDecoder) (err error) {
}

type FetchResponse struct {
Blocks map[string]map[int32]*FetchResponseBlock
Blocks map[string]map[int32]*FetchResponseBlock
ThrottleTime time.Duration
Version int16 // v1 requires 0.9+, v2 requires 0.10+
}

func (pr *FetchResponseBlock) encode(pe packetEncoder) (err error) {
Expand All @@ -50,6 +54,16 @@ func (pr *FetchResponseBlock) encode(pe packetEncoder) (err error) {
}

func (fr *FetchResponse) decode(pd packetDecoder, version int16) (err error) {
fr.Version = version

if fr.Version >= 1 {
throttle, err := pd.getInt32()
if err != nil {
return err
}
fr.ThrottleTime = time.Duration(throttle) * time.Millisecond
}

numTopics, err := pd.getArrayLength()
if err != nil {
return err
Expand Down Expand Up @@ -88,6 +102,10 @@ func (fr *FetchResponse) decode(pd packetDecoder, version int16) (err error) {
}

func (fr *FetchResponse) encode(pe packetEncoder) (err error) {
if fr.Version >= 1 {
pe.putInt32(int32(fr.ThrottleTime / time.Millisecond))
}

err = pe.putArrayLength(len(fr.Blocks))
if err != nil {
return err
Expand Down Expand Up @@ -121,11 +139,18 @@ func (r *FetchResponse) key() int16 {
}

func (r *FetchResponse) version() int16 {
return 0
return r.Version
}

func (r *FetchResponse) requiredVersion() KafkaVersion {
return minVersion
switch r.Version {
case 1:
return V0_9_0_0
case 2:
return V0_10_0_0
default:
return minVersion
}
}

func (fr *FetchResponse) GetBlock(topic string, partition int32) *FetchResponseBlock {
Expand Down
34 changes: 21 additions & 13 deletions message.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"compress/gzip"
"fmt"
"io/ioutil"
"time"

"github.com/eapache/go-xerial-snappy"
)
Expand All @@ -21,27 +22,29 @@ const (
CompressionSnappy CompressionCodec = 2
)

// The spec just says: "This is a version id used to allow backwards compatible evolution of the message
// binary format." but it doesn't say what the current value is, so presumably 0...
const messageFormat int8 = 0

type Message struct {
Codec CompressionCodec // codec used to compress the message contents
Key []byte // the message key, may be nil
Value []byte // the message contents
Set *MessageSet // the message set a message might wrap
Codec CompressionCodec // codec used to compress the message contents
Key []byte // the message key, may be nil
Value []byte // the message contents
Set *MessageSet // the message set a message might wrap
Version int8 // v1 requires Kafka 0.10
Timestamp time.Time // the timestamp of the message (version 1+ only)

compressedCache []byte
}

func (m *Message) encode(pe packetEncoder) error {
pe.push(&crc32Field{})

pe.putInt8(messageFormat)
pe.putInt8(m.Version)

attributes := int8(m.Codec) & compressionCodecMask
pe.putInt8(attributes)

if m.Version >= 1 {
pe.putInt64(m.Timestamp.UnixNano() / int64(time.Millisecond))
}

err := pe.putBytes(m.Key)
if err != nil {
return err
Expand Down Expand Up @@ -89,20 +92,25 @@ func (m *Message) decode(pd packetDecoder) (err error) {
return err
}

format, err := pd.getInt8()
m.Version, err = pd.getInt8()
if err != nil {
return err
}
if format != messageFormat {
return PacketDecodingError{"unexpected messageFormat"}
}

attribute, err := pd.getInt8()
if err != nil {
return err
}
m.Codec = CompressionCodec(attribute & compressionCodecMask)

if m.Version >= 1 {
millis, err := pd.getInt64()
if err != nil {
return err
}
m.Timestamp = time.Unix(millis/1000, (millis%1000)*int64(time.Millisecond))
}

m.Key, err = pd.getBytes()
if err != nil {
return err
Expand Down
11 changes: 8 additions & 3 deletions produce_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func (pr *ProduceResponseBlock) decode(pd packetDecoder, version int16) (err err
if millis, err := pd.getInt64(); err != nil {
return err
} else {
pr.Timestamp = time.Unix(millis/1000, millis%1000)
pr.Timestamp = time.Unix(millis/1000, (millis%1000)*int64(time.Millisecond))
}
}

Expand All @@ -34,7 +34,7 @@ func (pr *ProduceResponseBlock) decode(pd packetDecoder, version int16) (err err
type ProduceResponse struct {
Blocks map[string]map[int32]*ProduceResponseBlock
Version int16
ThrottleTime int32 // only provided if Version >= 1
ThrottleTime time.Duration // only provided if Version >= 1
}

func (pr *ProduceResponse) decode(pd packetDecoder, version int16) (err error) {
Expand Down Expand Up @@ -75,8 +75,10 @@ func (pr *ProduceResponse) decode(pd packetDecoder, version int16) (err error) {
}

if pr.Version >= 1 {
if pr.ThrottleTime, err = pd.getInt32(); err != nil {
if millis, err := pd.getInt32(); err != nil {
return err
} else {
pr.ThrottleTime = time.Duration(millis) * time.Millisecond
}
}

Expand All @@ -103,6 +105,9 @@ func (pr *ProduceResponse) encode(pe packetEncoder) error {
pe.putInt64(prb.Offset)
}
}
if pr.Version >= 1 {
pe.putInt32(int32(pr.ThrottleTime / time.Millisecond))
}
return nil
}

Expand Down

0 comments on commit 2979d3c

Please sign in to comment.