diff --git a/produce_set.go b/produce_set.go index 61eb3f018..b9eac95ed 100644 --- a/produce_set.go +++ b/produce_set.go @@ -122,6 +122,10 @@ func (ps *produceSet) buildRequest() *ProduceRequest { for topic, partitionSet := range ps.msgs { for partition, set := range partitionSet { if req.Version >= 3 { + for i, record := range set.recordsToSend.recordBatch.Records { + record.OffsetDelta = int64(i) + } + req.AddBatch(topic, partition, set.recordsToSend.recordBatch) continue } diff --git a/produce_set_test.go b/produce_set_test.go index 39796cd4b..fee6e3fa1 100644 --- a/produce_set_test.go +++ b/produce_set_test.go @@ -179,7 +179,7 @@ func TestProduceSetCompressedRequestBuilding(t *testing.T) { t.Error("Wrong compressed message version") } if compMsgBlock.Offset != int64(i) { - t.Error("Wrong relative inner offset") + t.Errorf("Wrong relative inner offset, expected %d, got %d", i, compMsgBlock.Offset) } } if msg.Version != 1 { @@ -237,6 +237,10 @@ func TestProduceSetV3RequestBuilding(t *testing.T) { t.Errorf("Wrong timestamp delta: %v", rec.TimestampDelta) } + if rec.OffsetDelta != int64(i) { + t.Errorf("Wrong relative inner offset, expected %d, got %d", i, rec.OffsetDelta) + } + for j, h := range batch.Records[i].Headers { exp := fmt.Sprintf("header-%d", j+1) if string(h.Key) != exp {