Skip to content

Commit

Permalink
Merge pull request #735 from Shopify/consumer/compressed-relative-off…
Browse files Browse the repository at this point in the history
…sets

Consumer: handle compressed relative offsets
  • Loading branch information
eapache authored Aug 30, 2016
2 parents 9489511 + 16da292 commit e0c3cfa
Showing 1 changed file with 9 additions and 4 deletions.
13 changes: 9 additions & 4 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -488,21 +488,26 @@ func (child *partitionConsumer) parseResponse(response *FetchResponse) ([]*Consu
for _, msgBlock := range block.MsgSet.Messages {

for _, msg := range msgBlock.Messages() {
if prelude && msg.Offset < child.offset {
offset := msg.Offset
if msg.Msg.Version >= 1 {
baseOffset := msgBlock.Offset - msgBlock.Messages()[len(msgBlock.Messages())-1].Offset
offset += baseOffset
}
if prelude && offset < child.offset {
continue
}
prelude = false

if msg.Offset >= child.offset {
if offset >= child.offset {
messages = append(messages, &ConsumerMessage{
Topic: child.topic,
Partition: child.partition,
Key: msg.Msg.Key,
Value: msg.Msg.Value,
Offset: msg.Offset,
Offset: offset,
Timestamp: msg.Msg.Timestamp,
})
child.offset = msg.Offset + 1
child.offset = offset + 1
} else {
incomplete = true
}
Expand Down

0 comments on commit e0c3cfa

Please sign in to comment.