diff --git a/async_producer.go b/async_producer.go index 6d71a6d8f..1eff81cbf 100644 --- a/async_producer.go +++ b/async_producer.go @@ -1,6 +1,7 @@ package sarama import ( + "encoding/binary" "fmt" "sync" "time" @@ -119,6 +120,10 @@ type ProducerMessage struct { // StringEncoder and ByteEncoder. Value Encoder + // The headers are key-value pairs that are transparently passed + // by Kafka between producers and consumers. + Headers []RecordHeader + // This field is used to hold arbitrary data you wish to include so it // will be available when receiving on the Successes and Errors channels. // Sarama completely ignores this field and is only to be used for @@ -146,8 +151,16 @@ type ProducerMessage struct { const producerMessageOverhead = 26 // the metadata overhead of CRC, flags, etc. -func (m *ProducerMessage) byteSize() int { - size := producerMessageOverhead +func (m *ProducerMessage) byteSize(version int) int { + var size int + if version >= 2 { + size = maximumRecordOverhead + for _, h := range m.Headers { + size += len(h.Key) + len(h.Value) + 2*binary.MaxVarintLen32 + } + } else { + size = producerMessageOverhead + } if m.Key != nil { size += m.Key.Length() } @@ -254,7 +267,11 @@ func (p *asyncProducer) dispatcher() { p.inFlight.Add(1) } - if msg.byteSize() > p.conf.Producer.MaxMessageBytes { + version := 1 + if p.conf.Version.IsAtLeast(V0_11_0_0) { + version = 2 + } + if msg.byteSize(version) > p.conf.Producer.MaxMessageBytes { p.returnError(msg, ErrMessageSizeTooLarge) continue } diff --git a/mockresponses.go b/mockresponses.go index c9bf2dc43..9659757b7 100644 --- a/mockresponses.go +++ b/mockresponses.go @@ -408,7 +408,7 @@ func (mr *MockProduceResponse) SetError(topic string, partition int32, kerror KE func (mr *MockProduceResponse) For(reqBody versionedDecoder) encoder { req := reqBody.(*ProduceRequest) res := &ProduceResponse{} - for topic, partitions := range req.msgSets { + for topic, partitions := range req.records { for partition := range partitions { res.AddTopicPartition(topic, partition, mr.getError(topic, partition)) } diff --git a/packet_decoder.go b/packet_decoder.go index 387d12cb1..904e28074 100644 --- a/packet_decoder.go +++ b/packet_decoder.go @@ -17,6 +17,7 @@ type packetDecoder interface { getVarintBytes() ([]byte, error) getRawBytes(length int) ([]byte, error) getString() (string, error) + getNullableString() (*string, error) getInt32Array() ([]int32, error) getInt64Array() ([]int64, error) getStringArray() ([]string, error) diff --git a/packet_encoder.go b/packet_encoder.go index b356ab8e8..aecd2b80c 100644 --- a/packet_encoder.go +++ b/packet_encoder.go @@ -19,6 +19,7 @@ type packetEncoder interface { putVarintBytes(in []byte) error putRawBytes(in []byte) error putString(in string) error + putNullableString(in *string) error putStringArray(in []string) error putInt32Array(in []int32) error putInt64Array(in []int64) error diff --git a/prep_encoder.go b/prep_encoder.go index 3c890fcdc..d99cd71ad 100644 --- a/prep_encoder.go +++ b/prep_encoder.go @@ -71,6 +71,14 @@ func (pe *prepEncoder) putRawBytes(in []byte) error { return nil } +func (pe *prepEncoder) putNullableString(in *string) error { + if in == nil { + pe.length += 2 + return nil + } + return pe.putString(*in) +} + func (pe *prepEncoder) putString(in string) error { pe.length += 2 if len(in) > math.MaxInt16 { diff --git a/produce_request.go b/produce_request.go index 40dc80151..300984cef 100644 --- a/produce_request.go +++ b/produce_request.go @@ -21,19 +21,56 @@ const ( ) type ProduceRequest struct { - RequiredAcks RequiredAcks - Timeout int32 - Version int16 // v1 requires Kafka 0.9, v2 requires Kafka 0.10 - msgSets map[string]map[int32]*MessageSet + TransactionalID *string + RequiredAcks RequiredAcks + Timeout int32 + Version int16 // v1 requires Kafka 0.9, v2 requires Kafka 0.10, v3 requires Kafka 0.11 + records map[string]map[int32]Records +} + +func updateMsgSetMetrics(msgSet *MessageSet, compressionRatioMetric metrics.Histogram, + topicCompressionRatioMetric metrics.Histogram) int64 { + var topicRecordCount int64 + for _, messageBlock := range msgSet.Messages { + // Is this a fake "message" wrapping real messages? + if messageBlock.Msg.Set != nil { + topicRecordCount += int64(len(messageBlock.Msg.Set.Messages)) + } else { + // A single uncompressed message + topicRecordCount++ + } + // Better be safe than sorry when computing the compression ratio + if messageBlock.Msg.compressedSize != 0 { + compressionRatio := float64(len(messageBlock.Msg.Value)) / + float64(messageBlock.Msg.compressedSize) + // Histogram do not support decimal values, let's multiple it by 100 for better precision + intCompressionRatio := int64(100 * compressionRatio) + compressionRatioMetric.Update(intCompressionRatio) + topicCompressionRatioMetric.Update(intCompressionRatio) + } + } + return topicRecordCount +} + +func updateBatchMetrics(recordBatch *RecordBatch, compressionRatioMetric metrics.Histogram, + topicCompressionRatioMetric metrics.Histogram) int64 { + if recordBatch.compressedRecords != nil { + compressionRatio := int64(float64(recordBatch.recordsLen) / float64(len(recordBatch.compressedRecords)) * 100) + compressionRatioMetric.Update(compressionRatio) + topicCompressionRatioMetric.Update(compressionRatio) + } + + return int64(len(recordBatch.Records)) } func (r *ProduceRequest) encode(pe packetEncoder) error { + if r.Version >= 3 { + if err := pe.putNullableString(r.TransactionalID); err != nil { + return err + } + } pe.putInt16(int16(r.RequiredAcks)) pe.putInt32(r.Timeout) - err := pe.putArrayLength(len(r.msgSets)) - if err != nil { - return err - } metricRegistry := pe.metricRegistry() var batchSizeMetric metrics.Histogram var compressionRatioMetric metrics.Histogram @@ -41,9 +78,14 @@ func (r *ProduceRequest) encode(pe packetEncoder) error { batchSizeMetric = getOrRegisterHistogram("batch-size", metricRegistry) compressionRatioMetric = getOrRegisterHistogram("compression-ratio", metricRegistry) } - totalRecordCount := int64(0) - for topic, partitions := range r.msgSets { + + err := pe.putArrayLength(len(r.records)) + if err != nil { + return err + } + + for topic, partitions := range r.records { err = pe.putString(topic) if err != nil { return err @@ -57,11 +99,11 @@ func (r *ProduceRequest) encode(pe packetEncoder) error { if metricRegistry != nil { topicCompressionRatioMetric = getOrRegisterTopicHistogram("compression-ratio", topic, metricRegistry) } - for id, msgSet := range partitions { + for id, records := range partitions { startOffset := pe.offset() pe.putInt32(id) pe.push(&lengthField{}) - err = msgSet.encode(pe) + err = records.encode(pe) if err != nil { return err } @@ -70,23 +112,10 @@ func (r *ProduceRequest) encode(pe packetEncoder) error { return err } if metricRegistry != nil { - for _, messageBlock := range msgSet.Messages { - // Is this a fake "message" wrapping real messages? - if messageBlock.Msg.Set != nil { - topicRecordCount += int64(len(messageBlock.Msg.Set.Messages)) - } else { - // A single uncompressed message - topicRecordCount++ - } - // Better be safe than sorry when computing the compression ratio - if messageBlock.Msg.compressedSize != 0 { - compressionRatio := float64(len(messageBlock.Msg.Value)) / - float64(messageBlock.Msg.compressedSize) - // Histogram do not support decimal values, let's multiple it by 100 for better precision - intCompressionRatio := int64(100 * compressionRatio) - compressionRatioMetric.Update(intCompressionRatio) - topicCompressionRatioMetric.Update(intCompressionRatio) - } + if r.Version >= 3 { + topicRecordCount += updateBatchMetrics(records.recordBatch, compressionRatioMetric, topicCompressionRatioMetric) + } else { + topicRecordCount += updateMsgSetMetrics(records.msgSet, compressionRatioMetric, topicCompressionRatioMetric) } batchSize := int64(pe.offset() - startOffset) batchSizeMetric.Update(batchSize) @@ -108,6 +137,15 @@ func (r *ProduceRequest) encode(pe packetEncoder) error { } func (r *ProduceRequest) decode(pd packetDecoder, version int16) error { + r.Version = version + + if version >= 3 { + id, err := pd.getNullableString() + if err != nil { + return err + } + r.TransactionalID = id + } requiredAcks, err := pd.getInt16() if err != nil { return err @@ -123,7 +161,8 @@ func (r *ProduceRequest) decode(pd packetDecoder, version int16) error { if topicCount == 0 { return nil } - r.msgSets = make(map[string]map[int32]*MessageSet) + + r.records = make(map[string]map[int32]Records) for i := 0; i < topicCount; i++ { topic, err := pd.getString() if err != nil { @@ -133,28 +172,34 @@ func (r *ProduceRequest) decode(pd packetDecoder, version int16) error { if err != nil { return err } - r.msgSets[topic] = make(map[int32]*MessageSet) + r.records[topic] = make(map[int32]Records) + for j := 0; j < partitionCount; j++ { partition, err := pd.getInt32() if err != nil { return err } - messageSetSize, err := pd.getInt32() + size, err := pd.getInt32() if err != nil { return err } - msgSetDecoder, err := pd.getSubset(int(messageSetSize)) + recordsDecoder, err := pd.getSubset(int(size)) if err != nil { return err } - msgSet := &MessageSet{} - err = msgSet.decode(msgSetDecoder) - if err != nil { + var records Records + if version >= 3 { + records = newDefaultRecords(nil) + } else { + records = newLegacyRecords(nil) + } + if err := records.decode(recordsDecoder); err != nil { return err } - r.msgSets[topic][partition] = msgSet + r.records[topic][partition] = records } } + return nil } @@ -172,38 +217,41 @@ func (r *ProduceRequest) requiredVersion() KafkaVersion { return V0_9_0_0 case 2: return V0_10_0_0 + case 3: + return V0_11_0_0 default: return minVersion } } -func (r *ProduceRequest) AddMessage(topic string, partition int32, msg *Message) { - if r.msgSets == nil { - r.msgSets = make(map[string]map[int32]*MessageSet) +func (r *ProduceRequest) ensureRecords(topic string, partition int32) { + if r.records == nil { + r.records = make(map[string]map[int32]Records) } - if r.msgSets[topic] == nil { - r.msgSets[topic] = make(map[int32]*MessageSet) + if r.records[topic] == nil { + r.records[topic] = make(map[int32]Records) } +} - set := r.msgSets[topic][partition] +func (r *ProduceRequest) AddMessage(topic string, partition int32, msg *Message) { + r.ensureRecords(topic, partition) + set := r.records[topic][partition].msgSet if set == nil { set = new(MessageSet) - r.msgSets[topic][partition] = set + r.records[topic][partition] = newLegacyRecords(set) } set.addMessage(msg) } func (r *ProduceRequest) AddSet(topic string, partition int32, set *MessageSet) { - if r.msgSets == nil { - r.msgSets = make(map[string]map[int32]*MessageSet) - } - - if r.msgSets[topic] == nil { - r.msgSets[topic] = make(map[int32]*MessageSet) - } + r.ensureRecords(topic, partition) + r.records[topic][partition] = newLegacyRecords(set) +} - r.msgSets[topic][partition] = set +func (r *ProduceRequest) AddBatch(topic string, partition int32, batch *RecordBatch) { + r.ensureRecords(topic, partition) + r.records[topic][partition] = newDefaultRecords(batch) } diff --git a/produce_request_test.go b/produce_request_test.go index 21f4ba5b1..be6459596 100644 --- a/produce_request_test.go +++ b/produce_request_test.go @@ -2,6 +2,7 @@ package sarama import ( "testing" + "time" ) var ( @@ -32,6 +33,41 @@ var ( 0x00, 0xFF, 0xFF, 0xFF, 0xFF, 0x00, 0x00, 0x00, 0x02, 0x00, 0xEE} + + produceRequestOneRecord = []byte{ + 0xFF, 0xFF, // Transaction ID + 0x01, 0x23, // Required Acks + 0x00, 0x00, 0x04, 0x44, // Timeout + 0x00, 0x00, 0x00, 0x01, // Number of Topics + 0x00, 0x05, 't', 'o', 'p', 'i', 'c', // Topic + 0x00, 0x00, 0x00, 0x01, // Number of Partitions + 0x00, 0x00, 0x00, 0xAD, // Partition + 0x00, 0x00, 0x00, 0x52, // Records length + // recordBatch + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x46, + 0x00, 0x00, 0x00, 0x00, + 0x02, + 0x54, 0x79, 0x61, 0xFD, + 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x01, 0x58, 0x8D, 0xCD, 0x59, 0x38, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x01, + // record + 0x28, + 0x00, + 0x0A, + 0x00, + 0x08, 0x01, 0x02, 0x03, 0x04, + 0x06, 0x05, 0x06, 0x07, + 0x02, + 0x06, 0x08, 0x09, 0x0A, + 0x04, 0x0B, 0x0C, + } ) func TestProduceRequest(t *testing.T) { @@ -44,4 +80,24 @@ func TestProduceRequest(t *testing.T) { request.AddMessage("topic", 0xAD, &Message{Codec: CompressionNone, Key: nil, Value: []byte{0x00, 0xEE}}) testRequest(t, "one message", request, produceRequestOneMessage) + + request.Version = 3 + batch := &RecordBatch{ + Version: 2, + FirstTimestamp: time.Unix(1479847795, 0), + MaxTimestamp: time.Unix(0, 0), + Records: []*Record{{ + TimestampDelta: 5 * time.Millisecond, + Key: []byte{0x01, 0x02, 0x03, 0x04}, + Value: []byte{0x05, 0x06, 0x07}, + Headers: []*RecordHeader{{ + Key: []byte{0x08, 0x09, 0x0A}, + Value: []byte{0x0B, 0x0C}, + }}, + }}, + } + request.AddBatch("topic", 0xAD, batch) + packet := testRequestEncode(t, "one record", request, produceRequestOneRecord) + batch.Records[0].length.startOffset = 0 + testRequestDecode(t, "one record", request, packet) } diff --git a/produce_response.go b/produce_response.go index fc926161f..043c40f87 100644 --- a/produce_response.go +++ b/produce_response.go @@ -149,6 +149,8 @@ func (r *ProduceResponse) requiredVersion() KafkaVersion { return V0_9_0_0 case 2: return V0_10_0_0 + case 3: + return V0_11_0_0 default: return minVersion } diff --git a/produce_set.go b/produce_set.go index 158d9c475..9e1d50d3a 100644 --- a/produce_set.go +++ b/produce_set.go @@ -1,11 +1,14 @@ package sarama -import "time" +import ( + "encoding/binary" + "time" +) type partitionSet struct { - msgs []*ProducerMessage - setToSend *MessageSet - bufferBytes int + msgs []*ProducerMessage + recordsToSend Records + bufferBytes int } type produceSet struct { @@ -39,31 +42,64 @@ func (ps *produceSet) add(msg *ProducerMessage) error { } } + timestamp := msg.Timestamp + if msg.Timestamp.IsZero() { + timestamp = time.Now() + } + partitions := ps.msgs[msg.Topic] if partitions == nil { partitions = make(map[int32]*partitionSet) ps.msgs[msg.Topic] = partitions } + var size int + set := partitions[msg.Partition] if set == nil { - set = &partitionSet{setToSend: new(MessageSet)} + if ps.parent.conf.Version.IsAtLeast(V0_11_0_0) { + batch := &RecordBatch{ + FirstTimestamp: timestamp, + Version: 2, + ProducerID: -1, /* No producer id */ + Codec: ps.parent.conf.Producer.Compression, + } + set = &partitionSet{recordsToSend: newDefaultRecords(batch)} + size = recordBatchOverhead + } else { + set = &partitionSet{recordsToSend: newLegacyRecords(new(MessageSet))} + } partitions[msg.Partition] = set } set.msgs = append(set.msgs, msg) - msgToSend := &Message{Codec: CompressionNone, Key: key, Value: val} - if ps.parent.conf.Version.IsAtLeast(V0_10_0_0) { - if msg.Timestamp.IsZero() { - msgToSend.Timestamp = time.Now() - } else { - msgToSend.Timestamp = msg.Timestamp + if ps.parent.conf.Version.IsAtLeast(V0_11_0_0) { + // We are being conservative here to avoid having to prep encode the record + size += maximumRecordOverhead + rec := &Record{ + Key: key, + Value: val, + TimestampDelta: timestamp.Sub(set.recordsToSend.recordBatch.FirstTimestamp), + } + size += len(key) + len(val) + if len(msg.Headers) > 0 { + rec.Headers = make([]*RecordHeader, len(msg.Headers)) + for i, h := range msg.Headers { + rec.Headers[i] = &h + size += len(h.Key) + len(h.Value) + 2*binary.MaxVarintLen32 + } + } + set.recordsToSend.recordBatch.addRecord(rec) + } else { + msgToSend := &Message{Codec: CompressionNone, Key: key, Value: val} + if ps.parent.conf.Version.IsAtLeast(V0_10_0_0) { + msgToSend.Timestamp = timestamp + msgToSend.Version = 1 } - msgToSend.Version = 1 + set.recordsToSend.msgSet.addMessage(msgToSend) + size = producerMessageOverhead + len(key) + len(val) } - set.setToSend.addMessage(msgToSend) - size := producerMessageOverhead + len(key) + len(val) set.bufferBytes += size ps.bufferBytes += size ps.bufferCount++ @@ -79,17 +115,24 @@ func (ps *produceSet) buildRequest() *ProduceRequest { if ps.parent.conf.Version.IsAtLeast(V0_10_0_0) { req.Version = 2 } + if ps.parent.conf.Version.IsAtLeast(V0_11_0_0) { + req.Version = 3 + } for topic, partitionSet := range ps.msgs { for partition, set := range partitionSet { + if req.Version >= 3 { + req.AddBatch(topic, partition, set.recordsToSend.recordBatch) + continue + } if ps.parent.conf.Producer.Compression == CompressionNone { - req.AddSet(topic, partition, set.setToSend) + req.AddSet(topic, partition, set.recordsToSend.msgSet) } else { // When compression is enabled, the entire set for each partition is compressed // and sent as the payload of a single fake "message" with the appropriate codec // set and no key. When the server sees a message with a compression codec, it // decompresses the payload and treats the result as its message set. - payload, err := encode(set.setToSend, ps.parent.conf.MetricRegistry) + payload, err := encode(set.recordsToSend.msgSet, ps.parent.conf.MetricRegistry) if err != nil { Logger.Println(err) // if this happens, it's basically our fault. panic(err) @@ -98,11 +141,11 @@ func (ps *produceSet) buildRequest() *ProduceRequest { Codec: ps.parent.conf.Producer.Compression, Key: nil, Value: payload, - Set: set.setToSend, // Provide the underlying message set for accurate metrics + Set: set.recordsToSend.msgSet, // Provide the underlying message set for accurate metrics } if ps.parent.conf.Version.IsAtLeast(V0_10_0_0) { compMsg.Version = 1 - compMsg.Timestamp = set.setToSend.Messages[0].Msg.Timestamp + compMsg.Timestamp = set.recordsToSend.msgSet.Messages[0].Msg.Timestamp } req.AddMessage(topic, partition, compMsg) } @@ -135,14 +178,19 @@ func (ps *produceSet) dropPartition(topic string, partition int32) []*ProducerMe } func (ps *produceSet) wouldOverflow(msg *ProducerMessage) bool { + version := 1 + if ps.parent.conf.Version.IsAtLeast(V0_11_0_0) { + version = 2 + } + switch { // Would we overflow our maximum possible size-on-the-wire? 10KiB is arbitrary overhead for safety. - case ps.bufferBytes+msg.byteSize() >= int(MaxRequestSize-(10*1024)): + case ps.bufferBytes+msg.byteSize(version) >= int(MaxRequestSize-(10*1024)): return true // Would we overflow the size-limit of a compressed message-batch for this partition? case ps.parent.conf.Producer.Compression != CompressionNone && ps.msgs[msg.Topic] != nil && ps.msgs[msg.Topic][msg.Partition] != nil && - ps.msgs[msg.Topic][msg.Partition].bufferBytes+msg.byteSize() >= ps.parent.conf.Producer.MaxMessageBytes: + ps.msgs[msg.Topic][msg.Partition].bufferBytes+msg.byteSize(version) >= ps.parent.conf.Producer.MaxMessageBytes: return true // Would we overflow simply in number of messages? case ps.parent.conf.Producer.Flush.MaxMessages > 0 && ps.bufferCount >= ps.parent.conf.Producer.Flush.MaxMessages: diff --git a/produce_set_test.go b/produce_set_test.go index d016a10b7..0f96e8818 100644 --- a/produce_set_test.go +++ b/produce_set_test.go @@ -137,7 +137,7 @@ func TestProduceSetRequestBuilding(t *testing.T) { t.Error("Timeout not set properly") } - if len(req.msgSets) != 2 { + if len(req.records) != 2 { t.Error("Wrong number of topics in request") } } @@ -166,7 +166,7 @@ func TestProduceSetCompressedRequestBuilding(t *testing.T) { t.Error("Wrong request version") } - for _, msgBlock := range req.msgSets["t1"][0].Messages { + for _, msgBlock := range req.records["t1"][0].msgSet.Messages { msg := msgBlock.Msg err := msg.decodeSet() if err != nil { @@ -183,3 +183,40 @@ func TestProduceSetCompressedRequestBuilding(t *testing.T) { } } } + +func TestProduceSetV3RequestBuilding(t *testing.T) { + parent, ps := makeProduceSet() + parent.conf.Producer.RequiredAcks = WaitForAll + parent.conf.Producer.Timeout = 10 * time.Second + parent.conf.Version = V0_11_0_0 + + now := time.Now() + msg := &ProducerMessage{ + Topic: "t1", + Partition: 0, + Key: StringEncoder(TestMessage), + Value: StringEncoder(TestMessage), + Timestamp: now, + } + for i := 0; i < 10; i++ { + safeAddMessage(t, ps, msg) + msg.Timestamp = msg.Timestamp.Add(time.Second) + } + + req := ps.buildRequest() + + if req.Version != 3 { + t.Error("Wrong request version") + } + + batch := req.records["t1"][0].recordBatch + if batch.FirstTimestamp != now { + t.Errorf("Wrong first timestamp: %v", batch.FirstTimestamp) + } + for i := 0; i < 10; i++ { + rec := batch.Records[i] + if rec.TimestampDelta != time.Duration(i)*time.Second { + t.Errorf("Wrong timestamp delta: %v", rec.TimestampDelta) + } + } +} diff --git a/real_decoder.go b/real_decoder.go index 48ab8e86e..3ff8212ad 100644 --- a/real_decoder.go +++ b/real_decoder.go @@ -142,6 +142,15 @@ func (rd *realDecoder) getString() (string, error) { return tmpStr, nil } +func (rd *realDecoder) getNullableString() (*string, error) { + tmp, err := rd.getInt16() + if err != nil || tmp == -1 { + return nil, err + } + str, err := rd.getString() + return &str, err +} + func (rd *realDecoder) getInt32Array() ([]int32, error) { if rd.remaining() < 4 { rd.off = len(rd.raw) diff --git a/real_encoder.go b/real_encoder.go index 70e18edd8..51112e70c 100644 --- a/real_encoder.go +++ b/real_encoder.go @@ -77,6 +77,14 @@ func (re *realEncoder) putString(in string) error { return nil } +func (re *realEncoder) putNullableString(in *string) error { + if in == nil { + re.putInt16(-1) + return nil + } + return re.putString(*in) +} + func (re *realEncoder) putStringArray(in []string) error { err := re.putArrayLength(len(in)) if err != nil { diff --git a/record.go b/record.go index f6ab5b674..cded308cf 100644 --- a/record.go +++ b/record.go @@ -1,9 +1,13 @@ package sarama -import "time" +import ( + "encoding/binary" + "time" +) const ( - controlMask = 0x20 + controlMask = 0x20 + maximumRecordOverhead = 5*binary.MaxVarintLen32 + binary.MaxVarintLen64 + 1 ) type RecordHeader struct { diff --git a/record_batch.go b/record_batch.go index d95f16092..3c148be58 100644 --- a/record_batch.go +++ b/record_batch.go @@ -191,6 +191,7 @@ func (b *RecordBatch) decode(pd packetDecoder) (err error) { return PacketDecodingError{fmt.Sprintf("invalid compression specified (%d)", b.Codec)} } + b.recordsLen = len(recBuffer) err = decode(recBuffer, recordsArray(b.Records)) if err == ErrInsufficientData { b.PartialTrailingRecord = true diff --git a/record_test.go b/record_test.go index 38ae8cfa6..68824edee 100644 --- a/record_test.go +++ b/record_test.go @@ -81,6 +81,7 @@ var recordBatchTestCases = []struct { Value: []byte{11, 12}, }}, }}, + recordsLen: 21, }, encoded: []byte{ 0, 0, 0, 0, 0, 0, 0, 0, // First Offset @@ -127,6 +128,7 @@ var recordBatchTestCases = []struct { Value: []byte{11, 12}, }}, }}, + recordsLen: 21, }, encoded: []byte{ 0, 0, 0, 0, 0, 0, 0, 0, // First Offset @@ -179,6 +181,7 @@ var recordBatchTestCases = []struct { Value: []byte{11, 12}, }}, }}, + recordsLen: 21, }, encoded: []byte{ 0, 0, 0, 0, 0, 0, 0, 0, // First Offset @@ -213,6 +216,7 @@ var recordBatchTestCases = []struct { Value: []byte{11, 12}, }}, }}, + recordsLen: 21, }, encoded: []byte{ 0, 0, 0, 0, 0, 0, 0, 0, // First Offset