diff --git a/consumer.go b/consumer.go index 33d9d143f..258d1a570 100644 --- a/consumer.go +++ b/consumer.go @@ -487,9 +487,13 @@ func (child *partitionConsumer) parseMessages(msgSet *MessageSet) ([]*ConsumerMe for _, msgBlock := range msgSet.Messages { for _, msg := range msgBlock.Messages() { offset := msg.Offset + timestamp := msg.Msg.Timestamp if msg.Msg.Version >= 1 { baseOffset := msgBlock.Offset - msgBlock.Messages()[len(msgBlock.Messages())-1].Offset offset += baseOffset + if msg.Msg.LogAppendTime { + timestamp = msgBlock.Msg.Timestamp + } } if offset < child.offset { continue @@ -500,7 +504,7 @@ func (child *partitionConsumer) parseMessages(msgSet *MessageSet) ([]*ConsumerMe Key: msg.Msg.Key, Value: msg.Msg.Value, Offset: offset, - Timestamp: msg.Msg.Timestamp, + Timestamp: timestamp, BlockTimestamp: msgBlock.Msg.Timestamp, }) child.offset = offset + 1 @@ -519,13 +523,17 @@ func (child *partitionConsumer) parseRecords(batch *RecordBatch) ([]*ConsumerMes if offset < child.offset { continue } + timestamp := batch.FirstTimestamp.Add(rec.TimestampDelta) + if batch.LogAppendTime { + timestamp = batch.MaxTimestamp + } messages = append(messages, &ConsumerMessage{ Topic: child.topic, Partition: child.partition, Key: rec.Key, Value: rec.Value, Offset: offset, - Timestamp: batch.FirstTimestamp.Add(rec.TimestampDelta), + Timestamp: timestamp, Headers: rec.Headers, }) child.offset = offset + 1 @@ -787,6 +795,9 @@ func (bc *brokerConsumer) fetchNewMessages() (*FetchResponse, error) { MinBytes: bc.consumer.conf.Consumer.Fetch.Min, MaxWaitTime: int32(bc.consumer.conf.Consumer.MaxWaitTime / time.Millisecond), } + if bc.consumer.conf.Version.IsAtLeast(V0_9_0_0) { + request.Version = 1 + } if bc.consumer.conf.Version.IsAtLeast(V0_10_0_0) { request.Version = 2 } diff --git a/consumer_test.go b/consumer_test.go index 4bd662908..3b753b1a5 100644 --- a/consumer_test.go +++ b/consumer_test.go @@ -985,6 +985,123 @@ func TestConsumerExpiryTicker(t *testing.T) { broker0.Close() } +func TestConsumerTimestamps(t *testing.T) { + now := time.Now().Truncate(time.Millisecond) + type testMessage struct { + key Encoder + value Encoder + offset int64 + timestamp time.Time + } + for _, d := range []struct { + kversion KafkaVersion + logAppendTime bool + messages []testMessage + expectedTimestamp []time.Time + }{ + {MinVersion, false, []testMessage{ + {nil, testMsg, 1, now}, + {nil, testMsg, 2, now}, + }, []time.Time{{}, {}}}, + {V0_9_0_0, false, []testMessage{ + {nil, testMsg, 1, now}, + {nil, testMsg, 2, now}, + }, []time.Time{{}, {}}}, + {V0_10_0_0, false, []testMessage{ + {nil, testMsg, 1, now}, + {nil, testMsg, 2, now}, + }, []time.Time{{}, {}}}, + {V0_10_2_1, false, []testMessage{ + {nil, testMsg, 1, now.Add(time.Second)}, + {nil, testMsg, 2, now.Add(2 * time.Second)}, + }, []time.Time{now.Add(time.Second), now.Add(2 * time.Second)}}, + {V0_10_2_1, true, []testMessage{ + {nil, testMsg, 1, now.Add(time.Second)}, + {nil, testMsg, 2, now.Add(2 * time.Second)}, + }, []time.Time{now, now}}, + {V0_11_0_0, false, []testMessage{ + {nil, testMsg, 1, now.Add(time.Second)}, + {nil, testMsg, 2, now.Add(2 * time.Second)}, + }, []time.Time{now.Add(time.Second), now.Add(2 * time.Second)}}, + {V0_11_0_0, true, []testMessage{ + {nil, testMsg, 1, now.Add(time.Second)}, + {nil, testMsg, 2, now.Add(2 * time.Second)}, + }, []time.Time{now, now}}, + } { + var fr *FetchResponse + var offsetResponseVersion int16 + cfg := NewConfig() + cfg.Version = d.kversion + switch { + case d.kversion.IsAtLeast(V0_11_0_0): + offsetResponseVersion = 1 + fr = &FetchResponse{Version: 4, LogAppendTime: d.logAppendTime, Timestamp: now} + for _, m := range d.messages { + fr.AddRecordWithTimestamp("my_topic", 0, m.key, testMsg, m.offset, m.timestamp) + } + fr.SetLastOffsetDelta("my_topic", 0, 2) + fr.SetLastStableOffset("my_topic", 0, 2) + case d.kversion.IsAtLeast(V0_10_1_0): + offsetResponseVersion = 1 + fr = &FetchResponse{Version: 3, LogAppendTime: d.logAppendTime, Timestamp: now} + for _, m := range d.messages { + fr.AddMessageWithTimestamp("my_topic", 0, m.key, testMsg, m.offset, m.timestamp, 1) + } + default: + var version int16 + switch { + case d.kversion.IsAtLeast(V0_10_0_0): + version = 2 + case d.kversion.IsAtLeast(V0_9_0_0): + version = 1 + } + fr = &FetchResponse{Version: version} + for _, m := range d.messages { + fr.AddMessageWithTimestamp("my_topic", 0, m.key, testMsg, m.offset, m.timestamp, 0) + } + } + + broker0 := NewMockBroker(t, 0) + broker0.SetHandlerByMap(map[string]MockResponse{ + "MetadataRequest": NewMockMetadataResponse(t). + SetBroker(broker0.Addr(), broker0.BrokerID()). + SetLeader("my_topic", 0, broker0.BrokerID()), + "OffsetRequest": NewMockOffsetResponse(t). + SetVersion(offsetResponseVersion). + SetOffset("my_topic", 0, OffsetNewest, 1234). + SetOffset("my_topic", 0, OffsetOldest, 0), + "FetchRequest": NewMockSequence(fr), + }) + + master, err := NewConsumer([]string{broker0.Addr()}, cfg) + if err != nil { + t.Fatal(err) + } + + consumer, err := master.ConsumePartition("my_topic", 0, 1) + if err != nil { + t.Fatal(err) + } + + for i, ts := range d.expectedTimestamp { + select { + case msg := <-consumer.Messages(): + assertMessageOffset(t, msg, int64(i)+1) + if msg.Timestamp != ts { + t.Errorf("Wrong timestamp (kversion:%v, logAppendTime:%v): got: %v, want: %v", + d.kversion, d.logAppendTime, msg.Timestamp, ts) + } + case err := <-consumer.Errors(): + t.Fatal(err) + } + } + + safeClose(t, consumer) + safeClose(t, master) + broker0.Close() + } +} + func assertMessageOffset(t *testing.T, msg *ConsumerMessage, expectedOffset int64) { if msg.Offset != expectedOffset { t.Errorf("Incorrect message offset: expected=%d, actual=%d", expectedOffset, msg.Offset) diff --git a/fetch_response.go b/fetch_response.go index 90acfc280..9df99c17e 100644 --- a/fetch_response.go +++ b/fetch_response.go @@ -186,9 +186,11 @@ func (b *FetchResponseBlock) encode(pe packetEncoder, version int16) (err error) } type FetchResponse struct { - Blocks map[string]map[int32]*FetchResponseBlock - ThrottleTime time.Duration - Version int16 // v1 requires 0.9+, v2 requires 0.10+ + Blocks map[string]map[int32]*FetchResponseBlock + ThrottleTime time.Duration + Version int16 // v1 requires 0.9+, v2 requires 0.10+ + LogAppendTime bool + Timestamp time.Time } func (r *FetchResponse) decode(pd packetDecoder, version int16) (err error) { @@ -355,10 +357,13 @@ func encodeKV(key, value Encoder) ([]byte, []byte) { return kb, vb } -func (r *FetchResponse) AddMessage(topic string, partition int32, key, value Encoder, offset int64) { +func (r *FetchResponse) AddMessageWithTimestamp(topic string, partition int32, key, value Encoder, offset int64, timestamp time.Time, version int8) { frb := r.getOrCreateBlock(topic, partition) kb, vb := encodeKV(key, value) - msg := &Message{Key: kb, Value: vb} + if r.LogAppendTime { + timestamp = r.Timestamp + } + msg := &Message{Key: kb, Value: vb, LogAppendTime: r.LogAppendTime, Timestamp: timestamp, Version: version} msgBlock := &MessageBlock{Msg: msg, Offset: offset} if len(frb.RecordsSet) == 0 { records := newLegacyRecords(&MessageSet{}) @@ -368,18 +373,26 @@ func (r *FetchResponse) AddMessage(topic string, partition int32, key, value Enc set.Messages = append(set.Messages, msgBlock) } -func (r *FetchResponse) AddRecord(topic string, partition int32, key, value Encoder, offset int64) { +func (r *FetchResponse) AddRecordWithTimestamp(topic string, partition int32, key, value Encoder, offset int64, timestamp time.Time) { frb := r.getOrCreateBlock(topic, partition) kb, vb := encodeKV(key, value) - rec := &Record{Key: kb, Value: vb, OffsetDelta: offset} if len(frb.RecordsSet) == 0 { - records := newDefaultRecords(&RecordBatch{Version: 2}) + records := newDefaultRecords(&RecordBatch{Version: 2, LogAppendTime: r.LogAppendTime, FirstTimestamp: timestamp, MaxTimestamp: r.Timestamp}) frb.RecordsSet = []*Records{&records} } batch := frb.RecordsSet[0].RecordBatch + rec := &Record{Key: kb, Value: vb, OffsetDelta: offset, TimestampDelta: timestamp.Sub(batch.FirstTimestamp)} batch.addRecord(rec) } +func (r *FetchResponse) AddMessage(topic string, partition int32, key, value Encoder, offset int64) { + r.AddMessageWithTimestamp(topic, partition, key, value, offset, time.Time{}, 0) +} + +func (r *FetchResponse) AddRecord(topic string, partition int32, key, value Encoder, offset int64) { + r.AddRecordWithTimestamp(topic, partition, key, value, offset, time.Time{}) +} + func (r *FetchResponse) SetLastOffsetDelta(topic string, partition int32, offset int32) { frb := r.getOrCreateBlock(topic, partition) if len(frb.RecordsSet) == 0 { diff --git a/message.go b/message.go index 51d3309c0..1b6a91cfc 100644 --- a/message.go +++ b/message.go @@ -5,12 +5,15 @@ import ( "time" ) -// CompressionCodec represents the various compression codecs recognized by Kafka in messages. -type CompressionCodec int8 - // The lowest 3 bits contain the compression codec used for the message const compressionCodecMask int8 = 0x07 +// Bit 3 set for "LogAppend" timestamps +const timestampTypeMask = 0x08 + +// CompressionCodec represents the various compression codecs recognized by Kafka in messages. +type CompressionCodec int8 + const ( CompressionNone CompressionCodec = 0 CompressionGZIP CompressionCodec = 1 @@ -36,6 +39,7 @@ const CompressionLevelDefault = -1000 type Message struct { Codec CompressionCodec // codec used to compress the message contents CompressionLevel int // compression level + LogAppendTime bool // the used timestamp is LogAppendTime Key []byte // the message key, may be nil Value []byte // the message contents Set *MessageSet // the message set a message might wrap @@ -52,6 +56,9 @@ func (m *Message) encode(pe packetEncoder) error { pe.putInt8(m.Version) attributes := int8(m.Codec) & compressionCodecMask + if m.LogAppendTime { + attributes |= timestampTypeMask + } pe.putInt8(attributes) if m.Version >= 1 { @@ -108,6 +115,7 @@ func (m *Message) decode(pd packetDecoder) (err error) { return err } m.Codec = CompressionCodec(attribute & compressionCodecMask) + m.LogAppendTime = attribute×tampTypeMask == timestampTypeMask if m.Version == 1 { if err := (Timestamp{&m.Timestamp}).decode(pd); err != nil { diff --git a/record_batch.go b/record_batch.go index e0f183f7a..a36f7e629 100644 --- a/record_batch.go +++ b/record_batch.go @@ -36,6 +36,7 @@ type RecordBatch struct { Codec CompressionCodec CompressionLevel int Control bool + LogAppendTime bool LastOffsetDelta int32 FirstTimestamp time.Time MaxTimestamp time.Time @@ -120,6 +121,7 @@ func (b *RecordBatch) decode(pd packetDecoder) (err error) { } b.Codec = CompressionCodec(int8(attributes) & compressionCodecMask) b.Control = attributes&controlMask == controlMask + b.LogAppendTime = attributes×tampTypeMask == timestampTypeMask if b.LastOffsetDelta, err = pd.getInt32(); err != nil { return err @@ -200,6 +202,9 @@ func (b *RecordBatch) computeAttributes() int16 { if b.Control { attr |= controlMask } + if b.LogAppendTime { + attr |= timestampTypeMask + } return attr }