Skip to content

Commit

Permalink
Tests for timestamps added
Browse files Browse the repository at this point in the history
  • Loading branch information
Sergey Zimin committed Jan 17, 2019
1 parent f0e5fa7 commit d93518f
Show file tree
Hide file tree
Showing 4 changed files with 129 additions and 8 deletions.
102 changes: 102 additions & 0 deletions consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -985,6 +985,108 @@ func TestConsumerExpiryTicker(t *testing.T) {
broker0.Close()
}

func TestConsumerTimestamps(t *testing.T) {
now := time.Now().Truncate(time.Millisecond)
type testMessage struct {
key Encoder
value Encoder
offset int64
timestamp time.Time
}
for _, d := range []struct {
kversion KafkaVersion
logAppendTime bool
messages []testMessage
expectedTimestamp []time.Time
}{
{V0_8_2_0, false, []testMessage{
{nil, testMsg, 1, now},
{nil, testMsg, 2, now},
}, []time.Time{{}, {}}},
{V0_10_2_1, false, []testMessage{
{nil, testMsg, 1, now.Add(time.Second)},
{nil, testMsg, 2, now.Add(2 * time.Second)},
}, []time.Time{now.Add(time.Second), now.Add(2 * time.Second)}},
{V0_10_2_1, true, []testMessage{
{nil, testMsg, 1, now.Add(time.Second)},
{nil, testMsg, 2, now.Add(2 * time.Second)},
}, []time.Time{now, now}},
{V0_11_0_0, false, []testMessage{
{nil, testMsg, 1, now.Add(time.Second)},
{nil, testMsg, 2, now.Add(2 * time.Second)},
}, []time.Time{now.Add(time.Second), now.Add(2 * time.Second)}},
{V0_11_0_0, true, []testMessage{
{nil, testMsg, 1, now.Add(time.Second)},
{nil, testMsg, 2, now.Add(2 * time.Second)},
}, []time.Time{now, now}},
} {
var fr *FetchResponse
var offsetResponseVersion int16
cfg := NewConfig()
cfg.Version = d.kversion
switch (d.kversion) {
default:
fr = &FetchResponse{}
for _, m := range d.messages {
fr.AddMessageWithTimestamp("my_topic", 0, m.key, testMsg, m.offset, m.timestamp,0)
}
case V0_10_2_1:
offsetResponseVersion = 1
fr = &FetchResponse{Version: 3, LogAppendTime: d.logAppendTime, Timestamp: now}
for _, m := range d.messages {
fr.AddMessageWithTimestamp("my_topic", 0, m.key, testMsg, m.offset, m.timestamp, 1)
}
case V0_11_0_0:
offsetResponseVersion = 1
fr = &FetchResponse{Version: 4, LogAppendTime: d.logAppendTime, Timestamp: now}
for _, m := range d.messages {
fr.AddRecordWithTimestamp("my_topic", 0, m.key, testMsg, m.offset, m.timestamp)
}
fr.SetLastOffsetDelta("my_topic", 0, 2)
fr.SetLastStableOffset("my_topic", 0, 2)
}

broker0 := NewMockBroker(t, 0)
broker0.SetHandlerByMap(map[string]MockResponse{
"MetadataRequest": NewMockMetadataResponse(t).
SetBroker(broker0.Addr(), broker0.BrokerID()).
SetLeader("my_topic", 0, broker0.BrokerID()),
"OffsetRequest": NewMockOffsetResponse(t).
SetVersion(offsetResponseVersion).
SetOffset("my_topic", 0, OffsetNewest, 1234).
SetOffset("my_topic", 0, OffsetOldest, 0),
"FetchRequest": NewMockSequence(fr),
})

master, err := NewConsumer([]string{broker0.Addr()}, cfg)
if err != nil {
t.Fatal(err)
}

consumer, err := master.ConsumePartition("my_topic", 0, 1)
if err != nil {
t.Fatal(err)
}

for i, ts := range d.expectedTimestamp {
select {
case msg := <-consumer.Messages():
assertMessageOffset(t, msg, int64(i)+1)
if msg.Timestamp != ts {
t.Errorf("Wrong timestamp (kversion:%v, logAppendTime:%v): got: %v, want: %v",
d.kversion, d.logAppendTime, msg.Timestamp, ts)
}
case err := <-consumer.Errors():
t.Fatal(err)
}
}

safeClose(t, consumer)
safeClose(t, master)
broker0.Close()
}
}

func assertMessageOffset(t *testing.T, msg *ConsumerMessage, expectedOffset int64) {
if msg.Offset != expectedOffset {
t.Errorf("Incorrect message offset: expected=%d, actual=%d", expectedOffset, msg.Offset)
Expand Down
29 changes: 21 additions & 8 deletions fetch_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,9 +186,11 @@ func (b *FetchResponseBlock) encode(pe packetEncoder, version int16) (err error)
}

type FetchResponse struct {
Blocks map[string]map[int32]*FetchResponseBlock
ThrottleTime time.Duration
Version int16 // v1 requires 0.9+, v2 requires 0.10+
Blocks map[string]map[int32]*FetchResponseBlock
ThrottleTime time.Duration
Version int16 // v1 requires 0.9+, v2 requires 0.10+
LogAppendTime bool
Timestamp time.Time
}

func (r *FetchResponse) decode(pd packetDecoder, version int16) (err error) {
Expand Down Expand Up @@ -355,10 +357,13 @@ func encodeKV(key, value Encoder) ([]byte, []byte) {
return kb, vb
}

func (r *FetchResponse) AddMessage(topic string, partition int32, key, value Encoder, offset int64) {
func (r *FetchResponse) AddMessageWithTimestamp(topic string, partition int32, key, value Encoder, offset int64, timestamp time.Time, version int8) {
frb := r.getOrCreateBlock(topic, partition)
kb, vb := encodeKV(key, value)
msg := &Message{Key: kb, Value: vb}
if r.LogAppendTime {
timestamp = r.Timestamp
}
msg := &Message{Key: kb, Value: vb, LogAppendTime: r.LogAppendTime, Timestamp: timestamp, Version: version}
msgBlock := &MessageBlock{Msg: msg, Offset: offset}
if len(frb.RecordsSet) == 0 {
records := newLegacyRecords(&MessageSet{})
Expand All @@ -368,18 +373,26 @@ func (r *FetchResponse) AddMessage(topic string, partition int32, key, value Enc
set.Messages = append(set.Messages, msgBlock)
}

func (r *FetchResponse) AddRecord(topic string, partition int32, key, value Encoder, offset int64) {
func (r *FetchResponse) AddRecordWithTimestamp(topic string, partition int32, key, value Encoder, offset int64, timestamp time.Time) {
frb := r.getOrCreateBlock(topic, partition)
kb, vb := encodeKV(key, value)
rec := &Record{Key: kb, Value: vb, OffsetDelta: offset}
if len(frb.RecordsSet) == 0 {
records := newDefaultRecords(&RecordBatch{Version: 2})
records := newDefaultRecords(&RecordBatch{Version: 2, LogAppendTime: r.LogAppendTime, FirstTimestamp: timestamp, MaxTimestamp: r.Timestamp})
frb.RecordsSet = []*Records{&records}
}
batch := frb.RecordsSet[0].RecordBatch
rec := &Record{Key: kb, Value: vb, OffsetDelta: offset, TimestampDelta: timestamp.Sub(batch.FirstTimestamp)}
batch.addRecord(rec)
}

func (r *FetchResponse) AddMessage(topic string, partition int32, key, value Encoder, offset int64) {
r.AddMessageWithTimestamp(topic, partition, key, value, offset, time.Time{}, 0)
}

func (r *FetchResponse) AddRecord(topic string, partition int32, key, value Encoder, offset int64) {
r.AddRecordWithTimestamp(topic, partition, key, value, offset, time.Time{})
}

func (r *FetchResponse) SetLastOffsetDelta(topic string, partition int32, offset int32) {
frb := r.getOrCreateBlock(topic, partition)
if len(frb.RecordsSet) == 0 {
Expand Down
3 changes: 3 additions & 0 deletions message.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ func (m *Message) encode(pe packetEncoder) error {
pe.putInt8(m.Version)

attributes := int8(m.Codec) & compressionCodecMask
if m.LogAppendTime {
attributes |= timestampTypeMask
}
pe.putInt8(attributes)

if m.Version >= 1 {
Expand Down
3 changes: 3 additions & 0 deletions record_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,9 @@ func (b *RecordBatch) computeAttributes() int16 {
if b.Control {
attr |= controlMask
}
if b.LogAppendTime {
attr |= timestampTypeMask
}
return attr
}

Expand Down

0 comments on commit d93518f

Please sign in to comment.