Skip to content

Commit

Permalink
Merge pull request #976 from wladh/producer
Browse files Browse the repository at this point in the history
Add producer support for Kafka 0.11 records
  • Loading branch information
eapache authored Nov 3, 2017
2 parents f12be49 + 5fd60c2 commit b305748
Show file tree
Hide file tree
Showing 15 changed files with 324 additions and 80 deletions.
23 changes: 20 additions & 3 deletions async_producer.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package sarama

import (
"encoding/binary"
"fmt"
"sync"
"time"
Expand Down Expand Up @@ -119,6 +120,10 @@ type ProducerMessage struct {
// StringEncoder and ByteEncoder.
Value Encoder

// The headers are key-value pairs that are transparently passed
// by Kafka between producers and consumers.
Headers []RecordHeader

// This field is used to hold arbitrary data you wish to include so it
// will be available when receiving on the Successes and Errors channels.
// Sarama completely ignores this field and is only to be used for
Expand Down Expand Up @@ -146,8 +151,16 @@ type ProducerMessage struct {

const producerMessageOverhead = 26 // the metadata overhead of CRC, flags, etc.

func (m *ProducerMessage) byteSize() int {
size := producerMessageOverhead
func (m *ProducerMessage) byteSize(version int) int {
var size int
if version >= 2 {
size = maximumRecordOverhead
for _, h := range m.Headers {
size += len(h.Key) + len(h.Value) + 2*binary.MaxVarintLen32
}
} else {
size = producerMessageOverhead
}
if m.Key != nil {
size += m.Key.Length()
}
Expand Down Expand Up @@ -254,7 +267,11 @@ func (p *asyncProducer) dispatcher() {
p.inFlight.Add(1)
}

if msg.byteSize() > p.conf.Producer.MaxMessageBytes {
version := 1
if p.conf.Version.IsAtLeast(V0_11_0_0) {
version = 2
}
if msg.byteSize(version) > p.conf.Producer.MaxMessageBytes {
p.returnError(msg, ErrMessageSizeTooLarge)
continue
}
Expand Down
2 changes: 1 addition & 1 deletion mockresponses.go
Original file line number Diff line number Diff line change
Expand Up @@ -408,7 +408,7 @@ func (mr *MockProduceResponse) SetError(topic string, partition int32, kerror KE
func (mr *MockProduceResponse) For(reqBody versionedDecoder) encoder {
req := reqBody.(*ProduceRequest)
res := &ProduceResponse{}
for topic, partitions := range req.msgSets {
for topic, partitions := range req.records {
for partition := range partitions {
res.AddTopicPartition(topic, partition, mr.getError(topic, partition))
}
Expand Down
1 change: 1 addition & 0 deletions packet_decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ type packetDecoder interface {
getVarintBytes() ([]byte, error)
getRawBytes(length int) ([]byte, error)
getString() (string, error)
getNullableString() (*string, error)
getInt32Array() ([]int32, error)
getInt64Array() ([]int64, error)
getStringArray() ([]string, error)
Expand Down
1 change: 1 addition & 0 deletions packet_encoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ type packetEncoder interface {
putVarintBytes(in []byte) error
putRawBytes(in []byte) error
putString(in string) error
putNullableString(in *string) error
putStringArray(in []string) error
putInt32Array(in []int32) error
putInt64Array(in []int64) error
Expand Down
8 changes: 8 additions & 0 deletions prep_encoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,14 @@ func (pe *prepEncoder) putRawBytes(in []byte) error {
return nil
}

func (pe *prepEncoder) putNullableString(in *string) error {
if in == nil {
pe.length += 2
return nil
}
return pe.putString(*in)
}

func (pe *prepEncoder) putString(in string) error {
pe.length += 2
if len(in) > math.MaxInt16 {
Expand Down
152 changes: 100 additions & 52 deletions produce_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,29 +21,71 @@ const (
)

type ProduceRequest struct {
RequiredAcks RequiredAcks
Timeout int32
Version int16 // v1 requires Kafka 0.9, v2 requires Kafka 0.10
msgSets map[string]map[int32]*MessageSet
TransactionalID *string
RequiredAcks RequiredAcks
Timeout int32
Version int16 // v1 requires Kafka 0.9, v2 requires Kafka 0.10, v3 requires Kafka 0.11
records map[string]map[int32]Records
}

func updateMsgSetMetrics(msgSet *MessageSet, compressionRatioMetric metrics.Histogram,
topicCompressionRatioMetric metrics.Histogram) int64 {
var topicRecordCount int64
for _, messageBlock := range msgSet.Messages {
// Is this a fake "message" wrapping real messages?
if messageBlock.Msg.Set != nil {
topicRecordCount += int64(len(messageBlock.Msg.Set.Messages))
} else {
// A single uncompressed message
topicRecordCount++
}
// Better be safe than sorry when computing the compression ratio
if messageBlock.Msg.compressedSize != 0 {
compressionRatio := float64(len(messageBlock.Msg.Value)) /
float64(messageBlock.Msg.compressedSize)
// Histogram do not support decimal values, let's multiple it by 100 for better precision
intCompressionRatio := int64(100 * compressionRatio)
compressionRatioMetric.Update(intCompressionRatio)
topicCompressionRatioMetric.Update(intCompressionRatio)
}
}
return topicRecordCount
}

func updateBatchMetrics(recordBatch *RecordBatch, compressionRatioMetric metrics.Histogram,
topicCompressionRatioMetric metrics.Histogram) int64 {
if recordBatch.compressedRecords != nil {
compressionRatio := int64(float64(recordBatch.recordsLen) / float64(len(recordBatch.compressedRecords)) * 100)
compressionRatioMetric.Update(compressionRatio)
topicCompressionRatioMetric.Update(compressionRatio)
}

return int64(len(recordBatch.Records))
}

func (r *ProduceRequest) encode(pe packetEncoder) error {
if r.Version >= 3 {
if err := pe.putNullableString(r.TransactionalID); err != nil {
return err
}
}
pe.putInt16(int16(r.RequiredAcks))
pe.putInt32(r.Timeout)
err := pe.putArrayLength(len(r.msgSets))
if err != nil {
return err
}
metricRegistry := pe.metricRegistry()
var batchSizeMetric metrics.Histogram
var compressionRatioMetric metrics.Histogram
if metricRegistry != nil {
batchSizeMetric = getOrRegisterHistogram("batch-size", metricRegistry)
compressionRatioMetric = getOrRegisterHistogram("compression-ratio", metricRegistry)
}

totalRecordCount := int64(0)
for topic, partitions := range r.msgSets {

err := pe.putArrayLength(len(r.records))
if err != nil {
return err
}

for topic, partitions := range r.records {
err = pe.putString(topic)
if err != nil {
return err
Expand All @@ -57,11 +99,11 @@ func (r *ProduceRequest) encode(pe packetEncoder) error {
if metricRegistry != nil {
topicCompressionRatioMetric = getOrRegisterTopicHistogram("compression-ratio", topic, metricRegistry)
}
for id, msgSet := range partitions {
for id, records := range partitions {
startOffset := pe.offset()
pe.putInt32(id)
pe.push(&lengthField{})
err = msgSet.encode(pe)
err = records.encode(pe)
if err != nil {
return err
}
Expand All @@ -70,23 +112,10 @@ func (r *ProduceRequest) encode(pe packetEncoder) error {
return err
}
if metricRegistry != nil {
for _, messageBlock := range msgSet.Messages {
// Is this a fake "message" wrapping real messages?
if messageBlock.Msg.Set != nil {
topicRecordCount += int64(len(messageBlock.Msg.Set.Messages))
} else {
// A single uncompressed message
topicRecordCount++
}
// Better be safe than sorry when computing the compression ratio
if messageBlock.Msg.compressedSize != 0 {
compressionRatio := float64(len(messageBlock.Msg.Value)) /
float64(messageBlock.Msg.compressedSize)
// Histogram do not support decimal values, let's multiple it by 100 for better precision
intCompressionRatio := int64(100 * compressionRatio)
compressionRatioMetric.Update(intCompressionRatio)
topicCompressionRatioMetric.Update(intCompressionRatio)
}
if r.Version >= 3 {
topicRecordCount += updateBatchMetrics(records.recordBatch, compressionRatioMetric, topicCompressionRatioMetric)
} else {
topicRecordCount += updateMsgSetMetrics(records.msgSet, compressionRatioMetric, topicCompressionRatioMetric)
}
batchSize := int64(pe.offset() - startOffset)
batchSizeMetric.Update(batchSize)
Expand All @@ -108,6 +137,15 @@ func (r *ProduceRequest) encode(pe packetEncoder) error {
}

func (r *ProduceRequest) decode(pd packetDecoder, version int16) error {
r.Version = version

if version >= 3 {
id, err := pd.getNullableString()
if err != nil {
return err
}
r.TransactionalID = id
}
requiredAcks, err := pd.getInt16()
if err != nil {
return err
Expand All @@ -123,7 +161,8 @@ func (r *ProduceRequest) decode(pd packetDecoder, version int16) error {
if topicCount == 0 {
return nil
}
r.msgSets = make(map[string]map[int32]*MessageSet)

r.records = make(map[string]map[int32]Records)
for i := 0; i < topicCount; i++ {
topic, err := pd.getString()
if err != nil {
Expand All @@ -133,28 +172,34 @@ func (r *ProduceRequest) decode(pd packetDecoder, version int16) error {
if err != nil {
return err
}
r.msgSets[topic] = make(map[int32]*MessageSet)
r.records[topic] = make(map[int32]Records)

for j := 0; j < partitionCount; j++ {
partition, err := pd.getInt32()
if err != nil {
return err
}
messageSetSize, err := pd.getInt32()
size, err := pd.getInt32()
if err != nil {
return err
}
msgSetDecoder, err := pd.getSubset(int(messageSetSize))
recordsDecoder, err := pd.getSubset(int(size))
if err != nil {
return err
}
msgSet := &MessageSet{}
err = msgSet.decode(msgSetDecoder)
if err != nil {
var records Records
if version >= 3 {
records = newDefaultRecords(nil)
} else {
records = newLegacyRecords(nil)
}
if err := records.decode(recordsDecoder); err != nil {
return err
}
r.msgSets[topic][partition] = msgSet
r.records[topic][partition] = records
}
}

return nil
}

Expand All @@ -172,38 +217,41 @@ func (r *ProduceRequest) requiredVersion() KafkaVersion {
return V0_9_0_0
case 2:
return V0_10_0_0
case 3:
return V0_11_0_0
default:
return minVersion
}
}

func (r *ProduceRequest) AddMessage(topic string, partition int32, msg *Message) {
if r.msgSets == nil {
r.msgSets = make(map[string]map[int32]*MessageSet)
func (r *ProduceRequest) ensureRecords(topic string, partition int32) {
if r.records == nil {
r.records = make(map[string]map[int32]Records)
}

if r.msgSets[topic] == nil {
r.msgSets[topic] = make(map[int32]*MessageSet)
if r.records[topic] == nil {
r.records[topic] = make(map[int32]Records)
}
}

set := r.msgSets[topic][partition]
func (r *ProduceRequest) AddMessage(topic string, partition int32, msg *Message) {
r.ensureRecords(topic, partition)
set := r.records[topic][partition].msgSet

if set == nil {
set = new(MessageSet)
r.msgSets[topic][partition] = set
r.records[topic][partition] = newLegacyRecords(set)
}

set.addMessage(msg)
}

func (r *ProduceRequest) AddSet(topic string, partition int32, set *MessageSet) {
if r.msgSets == nil {
r.msgSets = make(map[string]map[int32]*MessageSet)
}

if r.msgSets[topic] == nil {
r.msgSets[topic] = make(map[int32]*MessageSet)
}
r.ensureRecords(topic, partition)
r.records[topic][partition] = newLegacyRecords(set)
}

r.msgSets[topic][partition] = set
func (r *ProduceRequest) AddBatch(topic string, partition int32, batch *RecordBatch) {
r.ensureRecords(topic, partition)
r.records[topic][partition] = newDefaultRecords(batch)
}
Loading

0 comments on commit b305748

Please sign in to comment.