From 16da292b17b4a0c4750a38895b853522265ca68a Mon Sep 17 00:00:00 2001 From: Evan Huus Date: Tue, 23 Aug 2016 11:43:00 -0400 Subject: [PATCH] Consumer: handle compressed relative offsets New message format does something weird with these. See https://cwiki.apache.org/confluence/display/KAFKA/KIP-31+-+Move+to+relative+offsets+in+compressed+message+sets Fixes #720. Supercedes #721. Thanks to @dynamix for the first draft of the fix. --- consumer.go | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/consumer.go b/consumer.go index ff7318afd..5271e21de 100644 --- a/consumer.go +++ b/consumer.go @@ -488,21 +488,26 @@ func (child *partitionConsumer) parseResponse(response *FetchResponse) ([]*Consu for _, msgBlock := range block.MsgSet.Messages { for _, msg := range msgBlock.Messages() { - if prelude && msg.Offset < child.offset { + offset := msg.Offset + if msg.Msg.Version >= 1 { + baseOffset := msgBlock.Offset - msgBlock.Messages()[len(msgBlock.Messages())-1].Offset + offset += baseOffset + } + if prelude && offset < child.offset { continue } prelude = false - if msg.Offset >= child.offset { + if offset >= child.offset { messages = append(messages, &ConsumerMessage{ Topic: child.topic, Partition: child.partition, Key: msg.Msg.Key, Value: msg.Msg.Value, - Offset: msg.Offset, + Offset: offset, Timestamp: msg.Msg.Timestamp, }) - child.offset = msg.Offset + 1 + child.offset = offset + 1 } else { incomplete = true }