Skip to content

Commit

Permalink
Merge pull request #1015 from bobrik/offset-v3
Browse files Browse the repository at this point in the history
Set Record.OffsetDelta to avoid broker recompression
  • Loading branch information
eapache committed Dec 31, 2017
2 parents b1433c2 + 6899ef5 commit f196330
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 1 deletion.
4 changes: 4 additions & 0 deletions produce_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,10 @@ func (ps *produceSet) buildRequest() *ProduceRequest {
for topic, partitionSet := range ps.msgs {
for partition, set := range partitionSet {
if req.Version >= 3 {
for i, record := range set.recordsToSend.recordBatch.Records {
record.OffsetDelta = int64(i)
}

req.AddBatch(topic, partition, set.recordsToSend.recordBatch)
continue
}
Expand Down
6 changes: 5 additions & 1 deletion produce_set_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ func TestProduceSetCompressedRequestBuilding(t *testing.T) {
t.Error("Wrong compressed message version")
}
if compMsgBlock.Offset != int64(i) {
t.Error("Wrong relative inner offset")
t.Errorf("Wrong relative inner offset, expected %d, got %d", i, compMsgBlock.Offset)
}
}
if msg.Version != 1 {
Expand Down Expand Up @@ -237,6 +237,10 @@ func TestProduceSetV3RequestBuilding(t *testing.T) {
t.Errorf("Wrong timestamp delta: %v", rec.TimestampDelta)
}

if rec.OffsetDelta != int64(i) {
t.Errorf("Wrong relative inner offset, expected %d, got %d", i, rec.OffsetDelta)
}

for j, h := range batch.Records[i].Headers {
exp := fmt.Sprintf("header-%d", j+1)
if string(h.Key) != exp {
Expand Down

0 comments on commit f196330

Please sign in to comment.