Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support LogAppend timestamps #1258

Merged
merged 2 commits into from
Feb 6, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 13 additions & 2 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -487,9 +487,13 @@ func (child *partitionConsumer) parseMessages(msgSet *MessageSet) ([]*ConsumerMe
for _, msgBlock := range msgSet.Messages {
for _, msg := range msgBlock.Messages() {
offset := msg.Offset
timestamp := msg.Msg.Timestamp
if msg.Msg.Version >= 1 {
baseOffset := msgBlock.Offset - msgBlock.Messages()[len(msgBlock.Messages())-1].Offset
offset += baseOffset
if msg.Msg.LogAppendTime {
timestamp = msgBlock.Msg.Timestamp
}
}
if offset < child.offset {
continue
Expand All @@ -500,7 +504,7 @@ func (child *partitionConsumer) parseMessages(msgSet *MessageSet) ([]*ConsumerMe
Key: msg.Msg.Key,
Value: msg.Msg.Value,
Offset: offset,
Timestamp: msg.Msg.Timestamp,
Timestamp: timestamp,
BlockTimestamp: msgBlock.Msg.Timestamp,
})
child.offset = offset + 1
Expand All @@ -519,13 +523,17 @@ func (child *partitionConsumer) parseRecords(batch *RecordBatch) ([]*ConsumerMes
if offset < child.offset {
continue
}
timestamp := batch.FirstTimestamp.Add(rec.TimestampDelta)
if batch.LogAppendTime {
timestamp = batch.MaxTimestamp
}
messages = append(messages, &ConsumerMessage{
Topic: child.topic,
Partition: child.partition,
Key: rec.Key,
Value: rec.Value,
Offset: offset,
Timestamp: batch.FirstTimestamp.Add(rec.TimestampDelta),
Timestamp: timestamp,
Headers: rec.Headers,
})
child.offset = offset + 1
Expand Down Expand Up @@ -787,6 +795,9 @@ func (bc *brokerConsumer) fetchNewMessages() (*FetchResponse, error) {
MinBytes: bc.consumer.conf.Consumer.Fetch.Min,
MaxWaitTime: int32(bc.consumer.conf.Consumer.MaxWaitTime / time.Millisecond),
}
if bc.consumer.conf.Version.IsAtLeast(V0_9_0_0) {
request.Version = 1
}
if bc.consumer.conf.Version.IsAtLeast(V0_10_0_0) {
request.Version = 2
}
Expand Down
117 changes: 117 additions & 0 deletions consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -985,6 +985,123 @@ func TestConsumerExpiryTicker(t *testing.T) {
broker0.Close()
}

func TestConsumerTimestamps(t *testing.T) {
now := time.Now().Truncate(time.Millisecond)
type testMessage struct {
key Encoder
value Encoder
offset int64
timestamp time.Time
}
for _, d := range []struct {
kversion KafkaVersion
logAppendTime bool
messages []testMessage
expectedTimestamp []time.Time
}{
{MinVersion, false, []testMessage{
{nil, testMsg, 1, now},
{nil, testMsg, 2, now},
}, []time.Time{{}, {}}},
{V0_9_0_0, false, []testMessage{
{nil, testMsg, 1, now},
{nil, testMsg, 2, now},
}, []time.Time{{}, {}}},
{V0_10_0_0, false, []testMessage{
{nil, testMsg, 1, now},
{nil, testMsg, 2, now},
}, []time.Time{{}, {}}},
{V0_10_2_1, false, []testMessage{
{nil, testMsg, 1, now.Add(time.Second)},
{nil, testMsg, 2, now.Add(2 * time.Second)},
}, []time.Time{now.Add(time.Second), now.Add(2 * time.Second)}},
{V0_10_2_1, true, []testMessage{
{nil, testMsg, 1, now.Add(time.Second)},
{nil, testMsg, 2, now.Add(2 * time.Second)},
}, []time.Time{now, now}},
{V0_11_0_0, false, []testMessage{
{nil, testMsg, 1, now.Add(time.Second)},
{nil, testMsg, 2, now.Add(2 * time.Second)},
}, []time.Time{now.Add(time.Second), now.Add(2 * time.Second)}},
{V0_11_0_0, true, []testMessage{

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should you include other supported versions here?

https://github.com/bai/sarama/blob/master/utils.go#L163

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have adjusted the test to cover all versions for which format of fetch is different (from https://github.com/bai/sarama/blob/0a21d90df4f6266fdf28d603e5ef91f2426c362a/fetch_request.go#L141)

{nil, testMsg, 1, now.Add(time.Second)},
{nil, testMsg, 2, now.Add(2 * time.Second)},
}, []time.Time{now, now}},
} {
var fr *FetchResponse
var offsetResponseVersion int16
cfg := NewConfig()
cfg.Version = d.kversion
switch {
case d.kversion.IsAtLeast(V0_11_0_0):
offsetResponseVersion = 1
fr = &FetchResponse{Version: 4, LogAppendTime: d.logAppendTime, Timestamp: now}
for _, m := range d.messages {
fr.AddRecordWithTimestamp("my_topic", 0, m.key, testMsg, m.offset, m.timestamp)
}
fr.SetLastOffsetDelta("my_topic", 0, 2)
fr.SetLastStableOffset("my_topic", 0, 2)
case d.kversion.IsAtLeast(V0_10_1_0):
offsetResponseVersion = 1
fr = &FetchResponse{Version: 3, LogAppendTime: d.logAppendTime, Timestamp: now}
for _, m := range d.messages {
fr.AddMessageWithTimestamp("my_topic", 0, m.key, testMsg, m.offset, m.timestamp, 1)
}
default:
var version int16
switch {
case d.kversion.IsAtLeast(V0_10_0_0):
version = 2
case d.kversion.IsAtLeast(V0_9_0_0):
version = 1
}
fr = &FetchResponse{Version: version}
for _, m := range d.messages {
fr.AddMessageWithTimestamp("my_topic", 0, m.key, testMsg, m.offset, m.timestamp, 0)
}
}

broker0 := NewMockBroker(t, 0)
broker0.SetHandlerByMap(map[string]MockResponse{
"MetadataRequest": NewMockMetadataResponse(t).
SetBroker(broker0.Addr(), broker0.BrokerID()).
SetLeader("my_topic", 0, broker0.BrokerID()),
"OffsetRequest": NewMockOffsetResponse(t).
SetVersion(offsetResponseVersion).
SetOffset("my_topic", 0, OffsetNewest, 1234).
SetOffset("my_topic", 0, OffsetOldest, 0),
"FetchRequest": NewMockSequence(fr),
})

master, err := NewConsumer([]string{broker0.Addr()}, cfg)
if err != nil {
t.Fatal(err)
}

consumer, err := master.ConsumePartition("my_topic", 0, 1)
if err != nil {
t.Fatal(err)
}

for i, ts := range d.expectedTimestamp {
select {
case msg := <-consumer.Messages():
assertMessageOffset(t, msg, int64(i)+1)
if msg.Timestamp != ts {
t.Errorf("Wrong timestamp (kversion:%v, logAppendTime:%v): got: %v, want: %v",
d.kversion, d.logAppendTime, msg.Timestamp, ts)
}
case err := <-consumer.Errors():
t.Fatal(err)
}
}

safeClose(t, consumer)
safeClose(t, master)
broker0.Close()
}
}

func assertMessageOffset(t *testing.T, msg *ConsumerMessage, expectedOffset int64) {
if msg.Offset != expectedOffset {
t.Errorf("Incorrect message offset: expected=%d, actual=%d", expectedOffset, msg.Offset)
Expand Down
29 changes: 21 additions & 8 deletions fetch_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,9 +186,11 @@ func (b *FetchResponseBlock) encode(pe packetEncoder, version int16) (err error)
}

type FetchResponse struct {
Blocks map[string]map[int32]*FetchResponseBlock
ThrottleTime time.Duration
Version int16 // v1 requires 0.9+, v2 requires 0.10+
Blocks map[string]map[int32]*FetchResponseBlock
ThrottleTime time.Duration
Version int16 // v1 requires 0.9+, v2 requires 0.10+
LogAppendTime bool
Timestamp time.Time
}

func (r *FetchResponse) decode(pd packetDecoder, version int16) (err error) {
Expand Down Expand Up @@ -355,10 +357,13 @@ func encodeKV(key, value Encoder) ([]byte, []byte) {
return kb, vb
}

func (r *FetchResponse) AddMessage(topic string, partition int32, key, value Encoder, offset int64) {
func (r *FetchResponse) AddMessageWithTimestamp(topic string, partition int32, key, value Encoder, offset int64, timestamp time.Time, version int8) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This and below are exported function and this change is going to be a breaking change. Let's see how we can handle them. @bai do you have an idea?

Copy link

@sam-obeid sam-obeid Feb 4, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You'll need to keep both exported functions, and make one delegate to the other!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@varun06 , I think I did preserve the original API call reimplementing the original function using this one (https://github.com/Shopify/sarama/pull/1258/files/e1eda41894f763094e70b74f6dbf958c4cbbd315#diff-be942b2617e64413fb251f24e4bd4ea2R388). Same for AddRecord.

Am I missing something ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This name change gonna break for folks who are using exported method "AddMessage".

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The “AddMessage” method is still there (check the link I provided above). Also note that there are multiple tests that would have got broken otherwise.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Apologies, Do we have AddRecord method also?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, “AddRecord” also kept.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool Bean. Let's get it going then. @bai can we merge this please?

frb := r.getOrCreateBlock(topic, partition)
kb, vb := encodeKV(key, value)
msg := &Message{Key: kb, Value: vb}
if r.LogAppendTime {
timestamp = r.Timestamp
}
msg := &Message{Key: kb, Value: vb, LogAppendTime: r.LogAppendTime, Timestamp: timestamp, Version: version}
msgBlock := &MessageBlock{Msg: msg, Offset: offset}
if len(frb.RecordsSet) == 0 {
records := newLegacyRecords(&MessageSet{})
Expand All @@ -368,18 +373,26 @@ func (r *FetchResponse) AddMessage(topic string, partition int32, key, value Enc
set.Messages = append(set.Messages, msgBlock)
}

func (r *FetchResponse) AddRecord(topic string, partition int32, key, value Encoder, offset int64) {
func (r *FetchResponse) AddRecordWithTimestamp(topic string, partition int32, key, value Encoder, offset int64, timestamp time.Time) {
frb := r.getOrCreateBlock(topic, partition)
kb, vb := encodeKV(key, value)
rec := &Record{Key: kb, Value: vb, OffsetDelta: offset}
if len(frb.RecordsSet) == 0 {
records := newDefaultRecords(&RecordBatch{Version: 2})
records := newDefaultRecords(&RecordBatch{Version: 2, LogAppendTime: r.LogAppendTime, FirstTimestamp: timestamp, MaxTimestamp: r.Timestamp})
frb.RecordsSet = []*Records{&records}
}
batch := frb.RecordsSet[0].RecordBatch
rec := &Record{Key: kb, Value: vb, OffsetDelta: offset, TimestampDelta: timestamp.Sub(batch.FirstTimestamp)}
batch.addRecord(rec)
}

func (r *FetchResponse) AddMessage(topic string, partition int32, key, value Encoder, offset int64) {
r.AddMessageWithTimestamp(topic, partition, key, value, offset, time.Time{}, 0)
}

func (r *FetchResponse) AddRecord(topic string, partition int32, key, value Encoder, offset int64) {
r.AddRecordWithTimestamp(topic, partition, key, value, offset, time.Time{})
}

func (r *FetchResponse) SetLastOffsetDelta(topic string, partition int32, offset int32) {
frb := r.getOrCreateBlock(topic, partition)
if len(frb.RecordsSet) == 0 {
Expand Down
14 changes: 11 additions & 3 deletions message.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,15 @@ import (
"time"
)

// CompressionCodec represents the various compression codecs recognized by Kafka in messages.
type CompressionCodec int8

// The lowest 3 bits contain the compression codec used for the message
const compressionCodecMask int8 = 0x07

// Bit 3 set for "LogAppend" timestamps
const timestampTypeMask = 0x08

// CompressionCodec represents the various compression codecs recognized by Kafka in messages.
type CompressionCodec int8

const (
CompressionNone CompressionCodec = 0
CompressionGZIP CompressionCodec = 1
Expand All @@ -36,6 +39,7 @@ const CompressionLevelDefault = -1000
type Message struct {
Codec CompressionCodec // codec used to compress the message contents
CompressionLevel int // compression level
LogAppendTime bool // the used timestamp is LogAppendTime
Key []byte // the message key, may be nil
Value []byte // the message contents
Set *MessageSet // the message set a message might wrap
Expand All @@ -52,6 +56,9 @@ func (m *Message) encode(pe packetEncoder) error {
pe.putInt8(m.Version)

attributes := int8(m.Codec) & compressionCodecMask
if m.LogAppendTime {
attributes |= timestampTypeMask
}
pe.putInt8(attributes)

if m.Version >= 1 {
Expand Down Expand Up @@ -108,6 +115,7 @@ func (m *Message) decode(pd packetDecoder) (err error) {
return err
}
m.Codec = CompressionCodec(attribute & compressionCodecMask)
m.LogAppendTime = attribute&timestampTypeMask == timestampTypeMask

if m.Version == 1 {
if err := (Timestamp{&m.Timestamp}).decode(pd); err != nil {
Expand Down
5 changes: 5 additions & 0 deletions record_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ type RecordBatch struct {
Codec CompressionCodec
CompressionLevel int
Control bool
LogAppendTime bool
LastOffsetDelta int32
FirstTimestamp time.Time
MaxTimestamp time.Time
Expand Down Expand Up @@ -120,6 +121,7 @@ func (b *RecordBatch) decode(pd packetDecoder) (err error) {
}
b.Codec = CompressionCodec(int8(attributes) & compressionCodecMask)
b.Control = attributes&controlMask == controlMask
b.LogAppendTime = attributes&timestampTypeMask == timestampTypeMask

if b.LastOffsetDelta, err = pd.getInt32(); err != nil {
return err
Expand Down Expand Up @@ -200,6 +202,9 @@ func (b *RecordBatch) computeAttributes() int16 {
if b.Control {
attr |= controlMask
}
if b.LogAppendTime {
attr |= timestampTypeMask
}
return attr
}

Expand Down