From 5fd60c2be0d4eaf97c800319c0e3602b9a13c452 Mon Sep 17 00:00:00 2001 From: Vlad Hanciuta Date: Tue, 24 Oct 2017 15:22:40 +0100 Subject: [PATCH] Add producer support for Kafka 0.11 Records This changeset introduces support for producing messages in the new Kafka 0.11 Record format (including attaching headers). It doesn't support transactions or idempotent messages. --- async_producer.go | 23 +++++- mockresponses.go | 2 +- produce_request.go | 152 ++++++++++++++++++++++++++-------------- produce_request_test.go | 56 +++++++++++++++ produce_response.go | 2 + produce_set.go | 88 +++++++++++++++++------ produce_set_test.go | 41 ++++++++++- record.go | 8 ++- record_batch.go | 1 + record_test.go | 4 ++ 10 files changed, 297 insertions(+), 80 deletions(-) diff --git a/async_producer.go b/async_producer.go index 6d71a6d8f..1eff81cbf 100644 --- a/async_producer.go +++ b/async_producer.go @@ -1,6 +1,7 @@ package sarama import ( + "encoding/binary" "fmt" "sync" "time" @@ -119,6 +120,10 @@ type ProducerMessage struct { // StringEncoder and ByteEncoder. Value Encoder + // The headers are key-value pairs that are transparently passed + // by Kafka between producers and consumers. + Headers []RecordHeader + // This field is used to hold arbitrary data you wish to include so it // will be available when receiving on the Successes and Errors channels. // Sarama completely ignores this field and is only to be used for @@ -146,8 +151,16 @@ type ProducerMessage struct { const producerMessageOverhead = 26 // the metadata overhead of CRC, flags, etc. -func (m *ProducerMessage) byteSize() int { - size := producerMessageOverhead +func (m *ProducerMessage) byteSize(version int) int { + var size int + if version >= 2 { + size = maximumRecordOverhead + for _, h := range m.Headers { + size += len(h.Key) + len(h.Value) + 2*binary.MaxVarintLen32 + } + } else { + size = producerMessageOverhead + } if m.Key != nil { size += m.Key.Length() } @@ -254,7 +267,11 @@ func (p *asyncProducer) dispatcher() { p.inFlight.Add(1) } - if msg.byteSize() > p.conf.Producer.MaxMessageBytes { + version := 1 + if p.conf.Version.IsAtLeast(V0_11_0_0) { + version = 2 + } + if msg.byteSize(version) > p.conf.Producer.MaxMessageBytes { p.returnError(msg, ErrMessageSizeTooLarge) continue } diff --git a/mockresponses.go b/mockresponses.go index c9bf2dc43..9659757b7 100644 --- a/mockresponses.go +++ b/mockresponses.go @@ -408,7 +408,7 @@ func (mr *MockProduceResponse) SetError(topic string, partition int32, kerror KE func (mr *MockProduceResponse) For(reqBody versionedDecoder) encoder { req := reqBody.(*ProduceRequest) res := &ProduceResponse{} - for topic, partitions := range req.msgSets { + for topic, partitions := range req.records { for partition := range partitions { res.AddTopicPartition(topic, partition, mr.getError(topic, partition)) } diff --git a/produce_request.go b/produce_request.go index 40dc80151..300984cef 100644 --- a/produce_request.go +++ b/produce_request.go @@ -21,19 +21,56 @@ const ( ) type ProduceRequest struct { - RequiredAcks RequiredAcks - Timeout int32 - Version int16 // v1 requires Kafka 0.9, v2 requires Kafka 0.10 - msgSets map[string]map[int32]*MessageSet + TransactionalID *string + RequiredAcks RequiredAcks + Timeout int32 + Version int16 // v1 requires Kafka 0.9, v2 requires Kafka 0.10, v3 requires Kafka 0.11 + records map[string]map[int32]Records +} + +func updateMsgSetMetrics(msgSet *MessageSet, compressionRatioMetric metrics.Histogram, + topicCompressionRatioMetric metrics.Histogram) int64 { + var topicRecordCount int64 + for _, messageBlock := range msgSet.Messages { + // Is this a fake "message" wrapping real messages? + if messageBlock.Msg.Set != nil { + topicRecordCount += int64(len(messageBlock.Msg.Set.Messages)) + } else { + // A single uncompressed message + topicRecordCount++ + } + // Better be safe than sorry when computing the compression ratio + if messageBlock.Msg.compressedSize != 0 { + compressionRatio := float64(len(messageBlock.Msg.Value)) / + float64(messageBlock.Msg.compressedSize) + // Histogram do not support decimal values, let's multiple it by 100 for better precision + intCompressionRatio := int64(100 * compressionRatio) + compressionRatioMetric.Update(intCompressionRatio) + topicCompressionRatioMetric.Update(intCompressionRatio) + } + } + return topicRecordCount +} + +func updateBatchMetrics(recordBatch *RecordBatch, compressionRatioMetric metrics.Histogram, + topicCompressionRatioMetric metrics.Histogram) int64 { + if recordBatch.compressedRecords != nil { + compressionRatio := int64(float64(recordBatch.recordsLen) / float64(len(recordBatch.compressedRecords)) * 100) + compressionRatioMetric.Update(compressionRatio) + topicCompressionRatioMetric.Update(compressionRatio) + } + + return int64(len(recordBatch.Records)) } func (r *ProduceRequest) encode(pe packetEncoder) error { + if r.Version >= 3 { + if err := pe.putNullableString(r.TransactionalID); err != nil { + return err + } + } pe.putInt16(int16(r.RequiredAcks)) pe.putInt32(r.Timeout) - err := pe.putArrayLength(len(r.msgSets)) - if err != nil { - return err - } metricRegistry := pe.metricRegistry() var batchSizeMetric metrics.Histogram var compressionRatioMetric metrics.Histogram @@ -41,9 +78,14 @@ func (r *ProduceRequest) encode(pe packetEncoder) error { batchSizeMetric = getOrRegisterHistogram("batch-size", metricRegistry) compressionRatioMetric = getOrRegisterHistogram("compression-ratio", metricRegistry) } - totalRecordCount := int64(0) - for topic, partitions := range r.msgSets { + + err := pe.putArrayLength(len(r.records)) + if err != nil { + return err + } + + for topic, partitions := range r.records { err = pe.putString(topic) if err != nil { return err @@ -57,11 +99,11 @@ func (r *ProduceRequest) encode(pe packetEncoder) error { if metricRegistry != nil { topicCompressionRatioMetric = getOrRegisterTopicHistogram("compression-ratio", topic, metricRegistry) } - for id, msgSet := range partitions { + for id, records := range partitions { startOffset := pe.offset() pe.putInt32(id) pe.push(&lengthField{}) - err = msgSet.encode(pe) + err = records.encode(pe) if err != nil { return err } @@ -70,23 +112,10 @@ func (r *ProduceRequest) encode(pe packetEncoder) error { return err } if metricRegistry != nil { - for _, messageBlock := range msgSet.Messages { - // Is this a fake "message" wrapping real messages? - if messageBlock.Msg.Set != nil { - topicRecordCount += int64(len(messageBlock.Msg.Set.Messages)) - } else { - // A single uncompressed message - topicRecordCount++ - } - // Better be safe than sorry when computing the compression ratio - if messageBlock.Msg.compressedSize != 0 { - compressionRatio := float64(len(messageBlock.Msg.Value)) / - float64(messageBlock.Msg.compressedSize) - // Histogram do not support decimal values, let's multiple it by 100 for better precision - intCompressionRatio := int64(100 * compressionRatio) - compressionRatioMetric.Update(intCompressionRatio) - topicCompressionRatioMetric.Update(intCompressionRatio) - } + if r.Version >= 3 { + topicRecordCount += updateBatchMetrics(records.recordBatch, compressionRatioMetric, topicCompressionRatioMetric) + } else { + topicRecordCount += updateMsgSetMetrics(records.msgSet, compressionRatioMetric, topicCompressionRatioMetric) } batchSize := int64(pe.offset() - startOffset) batchSizeMetric.Update(batchSize) @@ -108,6 +137,15 @@ func (r *ProduceRequest) encode(pe packetEncoder) error { } func (r *ProduceRequest) decode(pd packetDecoder, version int16) error { + r.Version = version + + if version >= 3 { + id, err := pd.getNullableString() + if err != nil { + return err + } + r.TransactionalID = id + } requiredAcks, err := pd.getInt16() if err != nil { return err @@ -123,7 +161,8 @@ func (r *ProduceRequest) decode(pd packetDecoder, version int16) error { if topicCount == 0 { return nil } - r.msgSets = make(map[string]map[int32]*MessageSet) + + r.records = make(map[string]map[int32]Records) for i := 0; i < topicCount; i++ { topic, err := pd.getString() if err != nil { @@ -133,28 +172,34 @@ func (r *ProduceRequest) decode(pd packetDecoder, version int16) error { if err != nil { return err } - r.msgSets[topic] = make(map[int32]*MessageSet) + r.records[topic] = make(map[int32]Records) + for j := 0; j < partitionCount; j++ { partition, err := pd.getInt32() if err != nil { return err } - messageSetSize, err := pd.getInt32() + size, err := pd.getInt32() if err != nil { return err } - msgSetDecoder, err := pd.getSubset(int(messageSetSize)) + recordsDecoder, err := pd.getSubset(int(size)) if err != nil { return err } - msgSet := &MessageSet{} - err = msgSet.decode(msgSetDecoder) - if err != nil { + var records Records + if version >= 3 { + records = newDefaultRecords(nil) + } else { + records = newLegacyRecords(nil) + } + if err := records.decode(recordsDecoder); err != nil { return err } - r.msgSets[topic][partition] = msgSet + r.records[topic][partition] = records } } + return nil } @@ -172,38 +217,41 @@ func (r *ProduceRequest) requiredVersion() KafkaVersion { return V0_9_0_0 case 2: return V0_10_0_0 + case 3: + return V0_11_0_0 default: return minVersion } } -func (r *ProduceRequest) AddMessage(topic string, partition int32, msg *Message) { - if r.msgSets == nil { - r.msgSets = make(map[string]map[int32]*MessageSet) +func (r *ProduceRequest) ensureRecords(topic string, partition int32) { + if r.records == nil { + r.records = make(map[string]map[int32]Records) } - if r.msgSets[topic] == nil { - r.msgSets[topic] = make(map[int32]*MessageSet) + if r.records[topic] == nil { + r.records[topic] = make(map[int32]Records) } +} - set := r.msgSets[topic][partition] +func (r *ProduceRequest) AddMessage(topic string, partition int32, msg *Message) { + r.ensureRecords(topic, partition) + set := r.records[topic][partition].msgSet if set == nil { set = new(MessageSet) - r.msgSets[topic][partition] = set + r.records[topic][partition] = newLegacyRecords(set) } set.addMessage(msg) } func (r *ProduceRequest) AddSet(topic string, partition int32, set *MessageSet) { - if r.msgSets == nil { - r.msgSets = make(map[string]map[int32]*MessageSet) - } - - if r.msgSets[topic] == nil { - r.msgSets[topic] = make(map[int32]*MessageSet) - } + r.ensureRecords(topic, partition) + r.records[topic][partition] = newLegacyRecords(set) +} - r.msgSets[topic][partition] = set +func (r *ProduceRequest) AddBatch(topic string, partition int32, batch *RecordBatch) { + r.ensureRecords(topic, partition) + r.records[topic][partition] = newDefaultRecords(batch) } diff --git a/produce_request_test.go b/produce_request_test.go index 21f4ba5b1..be6459596 100644 --- a/produce_request_test.go +++ b/produce_request_test.go @@ -2,6 +2,7 @@ package sarama import ( "testing" + "time" ) var ( @@ -32,6 +33,41 @@ var ( 0x00, 0xFF, 0xFF, 0xFF, 0xFF, 0x00, 0x00, 0x00, 0x02, 0x00, 0xEE} + + produceRequestOneRecord = []byte{ + 0xFF, 0xFF, // Transaction ID + 0x01, 0x23, // Required Acks + 0x00, 0x00, 0x04, 0x44, // Timeout + 0x00, 0x00, 0x00, 0x01, // Number of Topics + 0x00, 0x05, 't', 'o', 'p', 'i', 'c', // Topic + 0x00, 0x00, 0x00, 0x01, // Number of Partitions + 0x00, 0x00, 0x00, 0xAD, // Partition + 0x00, 0x00, 0x00, 0x52, // Records length + // recordBatch + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x46, + 0x00, 0x00, 0x00, 0x00, + 0x02, + 0x54, 0x79, 0x61, 0xFD, + 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x01, 0x58, 0x8D, 0xCD, 0x59, 0x38, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x01, + // record + 0x28, + 0x00, + 0x0A, + 0x00, + 0x08, 0x01, 0x02, 0x03, 0x04, + 0x06, 0x05, 0x06, 0x07, + 0x02, + 0x06, 0x08, 0x09, 0x0A, + 0x04, 0x0B, 0x0C, + } ) func TestProduceRequest(t *testing.T) { @@ -44,4 +80,24 @@ func TestProduceRequest(t *testing.T) { request.AddMessage("topic", 0xAD, &Message{Codec: CompressionNone, Key: nil, Value: []byte{0x00, 0xEE}}) testRequest(t, "one message", request, produceRequestOneMessage) + + request.Version = 3 + batch := &RecordBatch{ + Version: 2, + FirstTimestamp: time.Unix(1479847795, 0), + MaxTimestamp: time.Unix(0, 0), + Records: []*Record{{ + TimestampDelta: 5 * time.Millisecond, + Key: []byte{0x01, 0x02, 0x03, 0x04}, + Value: []byte{0x05, 0x06, 0x07}, + Headers: []*RecordHeader{{ + Key: []byte{0x08, 0x09, 0x0A}, + Value: []byte{0x0B, 0x0C}, + }}, + }}, + } + request.AddBatch("topic", 0xAD, batch) + packet := testRequestEncode(t, "one record", request, produceRequestOneRecord) + batch.Records[0].length.startOffset = 0 + testRequestDecode(t, "one record", request, packet) } diff --git a/produce_response.go b/produce_response.go index fc926161f..043c40f87 100644 --- a/produce_response.go +++ b/produce_response.go @@ -149,6 +149,8 @@ func (r *ProduceResponse) requiredVersion() KafkaVersion { return V0_9_0_0 case 2: return V0_10_0_0 + case 3: + return V0_11_0_0 default: return minVersion } diff --git a/produce_set.go b/produce_set.go index 158d9c475..9e1d50d3a 100644 --- a/produce_set.go +++ b/produce_set.go @@ -1,11 +1,14 @@ package sarama -import "time" +import ( + "encoding/binary" + "time" +) type partitionSet struct { - msgs []*ProducerMessage - setToSend *MessageSet - bufferBytes int + msgs []*ProducerMessage + recordsToSend Records + bufferBytes int } type produceSet struct { @@ -39,31 +42,64 @@ func (ps *produceSet) add(msg *ProducerMessage) error { } } + timestamp := msg.Timestamp + if msg.Timestamp.IsZero() { + timestamp = time.Now() + } + partitions := ps.msgs[msg.Topic] if partitions == nil { partitions = make(map[int32]*partitionSet) ps.msgs[msg.Topic] = partitions } + var size int + set := partitions[msg.Partition] if set == nil { - set = &partitionSet{setToSend: new(MessageSet)} + if ps.parent.conf.Version.IsAtLeast(V0_11_0_0) { + batch := &RecordBatch{ + FirstTimestamp: timestamp, + Version: 2, + ProducerID: -1, /* No producer id */ + Codec: ps.parent.conf.Producer.Compression, + } + set = &partitionSet{recordsToSend: newDefaultRecords(batch)} + size = recordBatchOverhead + } else { + set = &partitionSet{recordsToSend: newLegacyRecords(new(MessageSet))} + } partitions[msg.Partition] = set } set.msgs = append(set.msgs, msg) - msgToSend := &Message{Codec: CompressionNone, Key: key, Value: val} - if ps.parent.conf.Version.IsAtLeast(V0_10_0_0) { - if msg.Timestamp.IsZero() { - msgToSend.Timestamp = time.Now() - } else { - msgToSend.Timestamp = msg.Timestamp + if ps.parent.conf.Version.IsAtLeast(V0_11_0_0) { + // We are being conservative here to avoid having to prep encode the record + size += maximumRecordOverhead + rec := &Record{ + Key: key, + Value: val, + TimestampDelta: timestamp.Sub(set.recordsToSend.recordBatch.FirstTimestamp), + } + size += len(key) + len(val) + if len(msg.Headers) > 0 { + rec.Headers = make([]*RecordHeader, len(msg.Headers)) + for i, h := range msg.Headers { + rec.Headers[i] = &h + size += len(h.Key) + len(h.Value) + 2*binary.MaxVarintLen32 + } + } + set.recordsToSend.recordBatch.addRecord(rec) + } else { + msgToSend := &Message{Codec: CompressionNone, Key: key, Value: val} + if ps.parent.conf.Version.IsAtLeast(V0_10_0_0) { + msgToSend.Timestamp = timestamp + msgToSend.Version = 1 } - msgToSend.Version = 1 + set.recordsToSend.msgSet.addMessage(msgToSend) + size = producerMessageOverhead + len(key) + len(val) } - set.setToSend.addMessage(msgToSend) - size := producerMessageOverhead + len(key) + len(val) set.bufferBytes += size ps.bufferBytes += size ps.bufferCount++ @@ -79,17 +115,24 @@ func (ps *produceSet) buildRequest() *ProduceRequest { if ps.parent.conf.Version.IsAtLeast(V0_10_0_0) { req.Version = 2 } + if ps.parent.conf.Version.IsAtLeast(V0_11_0_0) { + req.Version = 3 + } for topic, partitionSet := range ps.msgs { for partition, set := range partitionSet { + if req.Version >= 3 { + req.AddBatch(topic, partition, set.recordsToSend.recordBatch) + continue + } if ps.parent.conf.Producer.Compression == CompressionNone { - req.AddSet(topic, partition, set.setToSend) + req.AddSet(topic, partition, set.recordsToSend.msgSet) } else { // When compression is enabled, the entire set for each partition is compressed // and sent as the payload of a single fake "message" with the appropriate codec // set and no key. When the server sees a message with a compression codec, it // decompresses the payload and treats the result as its message set. - payload, err := encode(set.setToSend, ps.parent.conf.MetricRegistry) + payload, err := encode(set.recordsToSend.msgSet, ps.parent.conf.MetricRegistry) if err != nil { Logger.Println(err) // if this happens, it's basically our fault. panic(err) @@ -98,11 +141,11 @@ func (ps *produceSet) buildRequest() *ProduceRequest { Codec: ps.parent.conf.Producer.Compression, Key: nil, Value: payload, - Set: set.setToSend, // Provide the underlying message set for accurate metrics + Set: set.recordsToSend.msgSet, // Provide the underlying message set for accurate metrics } if ps.parent.conf.Version.IsAtLeast(V0_10_0_0) { compMsg.Version = 1 - compMsg.Timestamp = set.setToSend.Messages[0].Msg.Timestamp + compMsg.Timestamp = set.recordsToSend.msgSet.Messages[0].Msg.Timestamp } req.AddMessage(topic, partition, compMsg) } @@ -135,14 +178,19 @@ func (ps *produceSet) dropPartition(topic string, partition int32) []*ProducerMe } func (ps *produceSet) wouldOverflow(msg *ProducerMessage) bool { + version := 1 + if ps.parent.conf.Version.IsAtLeast(V0_11_0_0) { + version = 2 + } + switch { // Would we overflow our maximum possible size-on-the-wire? 10KiB is arbitrary overhead for safety. - case ps.bufferBytes+msg.byteSize() >= int(MaxRequestSize-(10*1024)): + case ps.bufferBytes+msg.byteSize(version) >= int(MaxRequestSize-(10*1024)): return true // Would we overflow the size-limit of a compressed message-batch for this partition? case ps.parent.conf.Producer.Compression != CompressionNone && ps.msgs[msg.Topic] != nil && ps.msgs[msg.Topic][msg.Partition] != nil && - ps.msgs[msg.Topic][msg.Partition].bufferBytes+msg.byteSize() >= ps.parent.conf.Producer.MaxMessageBytes: + ps.msgs[msg.Topic][msg.Partition].bufferBytes+msg.byteSize(version) >= ps.parent.conf.Producer.MaxMessageBytes: return true // Would we overflow simply in number of messages? case ps.parent.conf.Producer.Flush.MaxMessages > 0 && ps.bufferCount >= ps.parent.conf.Producer.Flush.MaxMessages: diff --git a/produce_set_test.go b/produce_set_test.go index d016a10b7..0f96e8818 100644 --- a/produce_set_test.go +++ b/produce_set_test.go @@ -137,7 +137,7 @@ func TestProduceSetRequestBuilding(t *testing.T) { t.Error("Timeout not set properly") } - if len(req.msgSets) != 2 { + if len(req.records) != 2 { t.Error("Wrong number of topics in request") } } @@ -166,7 +166,7 @@ func TestProduceSetCompressedRequestBuilding(t *testing.T) { t.Error("Wrong request version") } - for _, msgBlock := range req.msgSets["t1"][0].Messages { + for _, msgBlock := range req.records["t1"][0].msgSet.Messages { msg := msgBlock.Msg err := msg.decodeSet() if err != nil { @@ -183,3 +183,40 @@ func TestProduceSetCompressedRequestBuilding(t *testing.T) { } } } + +func TestProduceSetV3RequestBuilding(t *testing.T) { + parent, ps := makeProduceSet() + parent.conf.Producer.RequiredAcks = WaitForAll + parent.conf.Producer.Timeout = 10 * time.Second + parent.conf.Version = V0_11_0_0 + + now := time.Now() + msg := &ProducerMessage{ + Topic: "t1", + Partition: 0, + Key: StringEncoder(TestMessage), + Value: StringEncoder(TestMessage), + Timestamp: now, + } + for i := 0; i < 10; i++ { + safeAddMessage(t, ps, msg) + msg.Timestamp = msg.Timestamp.Add(time.Second) + } + + req := ps.buildRequest() + + if req.Version != 3 { + t.Error("Wrong request version") + } + + batch := req.records["t1"][0].recordBatch + if batch.FirstTimestamp != now { + t.Errorf("Wrong first timestamp: %v", batch.FirstTimestamp) + } + for i := 0; i < 10; i++ { + rec := batch.Records[i] + if rec.TimestampDelta != time.Duration(i)*time.Second { + t.Errorf("Wrong timestamp delta: %v", rec.TimestampDelta) + } + } +} diff --git a/record.go b/record.go index f6ab5b674..cded308cf 100644 --- a/record.go +++ b/record.go @@ -1,9 +1,13 @@ package sarama -import "time" +import ( + "encoding/binary" + "time" +) const ( - controlMask = 0x20 + controlMask = 0x20 + maximumRecordOverhead = 5*binary.MaxVarintLen32 + binary.MaxVarintLen64 + 1 ) type RecordHeader struct { diff --git a/record_batch.go b/record_batch.go index d95f16092..3c148be58 100644 --- a/record_batch.go +++ b/record_batch.go @@ -191,6 +191,7 @@ func (b *RecordBatch) decode(pd packetDecoder) (err error) { return PacketDecodingError{fmt.Sprintf("invalid compression specified (%d)", b.Codec)} } + b.recordsLen = len(recBuffer) err = decode(recBuffer, recordsArray(b.Records)) if err == ErrInsufficientData { b.PartialTrailingRecord = true diff --git a/record_test.go b/record_test.go index 38ae8cfa6..68824edee 100644 --- a/record_test.go +++ b/record_test.go @@ -81,6 +81,7 @@ var recordBatchTestCases = []struct { Value: []byte{11, 12}, }}, }}, + recordsLen: 21, }, encoded: []byte{ 0, 0, 0, 0, 0, 0, 0, 0, // First Offset @@ -127,6 +128,7 @@ var recordBatchTestCases = []struct { Value: []byte{11, 12}, }}, }}, + recordsLen: 21, }, encoded: []byte{ 0, 0, 0, 0, 0, 0, 0, 0, // First Offset @@ -179,6 +181,7 @@ var recordBatchTestCases = []struct { Value: []byte{11, 12}, }}, }}, + recordsLen: 21, }, encoded: []byte{ 0, 0, 0, 0, 0, 0, 0, 0, // First Offset @@ -213,6 +216,7 @@ var recordBatchTestCases = []struct { Value: []byte{11, 12}, }}, }}, + recordsLen: 21, }, encoded: []byte{ 0, 0, 0, 0, 0, 0, 0, 0, // First Offset