diff --git a/consumer.go b/consumer.go index ff7318afd..5271e21de 100644 --- a/consumer.go +++ b/consumer.go @@ -488,21 +488,26 @@ func (child *partitionConsumer) parseResponse(response *FetchResponse) ([]*Consu for _, msgBlock := range block.MsgSet.Messages { for _, msg := range msgBlock.Messages() { - if prelude && msg.Offset < child.offset { + offset := msg.Offset + if msg.Msg.Version >= 1 { + baseOffset := msgBlock.Offset - msgBlock.Messages()[len(msgBlock.Messages())-1].Offset + offset += baseOffset + } + if prelude && offset < child.offset { continue } prelude = false - if msg.Offset >= child.offset { + if offset >= child.offset { messages = append(messages, &ConsumerMessage{ Topic: child.topic, Partition: child.partition, Key: msg.Msg.Key, Value: msg.Msg.Value, - Offset: msg.Offset, + Offset: offset, Timestamp: msg.Msg.Timestamp, }) - child.offset = msg.Offset + 1 + child.offset = offset + 1 } else { incomplete = true }