From 4b51f755b4b5bb26a7adbb69f29b3e297741d22c Mon Sep 17 00:00:00 2001 From: Fae Charlton Date: Mon, 28 Jun 2021 11:05:55 -0400 Subject: [PATCH] [libbeat] Fix encoding and file offset issues in the disk queue (#26484) (cherry picked from commit a3b447ac4e0930cf0525f257a2e63ef1de4bc7b6) --- CHANGELOG.next.asciidoc | 1 + .../queue/diskqueue/core_loop_test.go | 6 +- .../publisher/queue/diskqueue/reader_loop.go | 26 +++---- libbeat/publisher/queue/diskqueue/segments.go | 23 +++++-- .../publisher/queue/diskqueue/serialize.go | 20 ++++-- .../queue/diskqueue/serialize_test.go | 69 +++++++++++++++++++ .../publisher/queue/diskqueue/writer_loop.go | 23 ++++--- 7 files changed, 132 insertions(+), 36 deletions(-) create mode 100644 libbeat/publisher/queue/diskqueue/serialize_test.go diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index dc2041a7ad9..97406e01dee 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -174,6 +174,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Fix `community_id` processor so that ports greater than 65535 aren't valid. {pull}25409[25409] - Fix ILM alias creation when write alias exists and initial index does not exist {pull}26143[26143] - In the script processor, the `decode_xml` and `decode_xml_wineventlog` processors are now available as `DecodeXML` and `DecodeXMLWineventlog` respectively. +- Fix encoding errors when using the disk queue on nested data with multi-byte characters {pull}26484[26484] *Auditbeat* diff --git a/libbeat/publisher/queue/diskqueue/core_loop_test.go b/libbeat/publisher/queue/diskqueue/core_loop_test.go index 3bf53e9b8a5..7c0d426ea61 100644 --- a/libbeat/publisher/queue/diskqueue/core_loop_test.go +++ b/libbeat/publisher/queue/diskqueue/core_loop_test.go @@ -235,7 +235,7 @@ func TestHandleWriterLoopResponse(t *testing.T) { // Write to one segment (no segments should be moved to reading list) dq.handleWriterLoopResponse(writerLoopResponse{ - segments: []writerLoopResponseSegment{ + segments: []writerLoopSegmentResponse{ {bytesWritten: 100}, }, }) @@ -250,7 +250,7 @@ func TestHandleWriterLoopResponse(t *testing.T) { // Write to two segments (the first one should be moved to reading list) dq.handleWriterLoopResponse(writerLoopResponse{ - segments: []writerLoopResponseSegment{ + segments: []writerLoopSegmentResponse{ {bytesWritten: 100}, {bytesWritten: 100}, }, @@ -270,7 +270,7 @@ func TestHandleWriterLoopResponse(t *testing.T) { // Write to three segments (the first two should be moved to reading list) dq.handleWriterLoopResponse(writerLoopResponse{ - segments: []writerLoopResponseSegment{ + segments: []writerLoopSegmentResponse{ {bytesWritten: 100}, {bytesWritten: 100}, {bytesWritten: 500}, diff --git a/libbeat/publisher/queue/diskqueue/reader_loop.go b/libbeat/publisher/queue/diskqueue/reader_loop.go index a31cc5b8a77..d6bb49494e0 100644 --- a/libbeat/publisher/queue/diskqueue/reader_loop.go +++ b/libbeat/publisher/queue/diskqueue/reader_loop.go @@ -20,6 +20,7 @@ package diskqueue import ( "encoding/binary" "fmt" + "io" "os" ) @@ -100,13 +101,12 @@ func (rl *readerLoop) processRequest(request readerLoopRequest) readerLoopRespon // Open the file and seek to the starting position. handle, err := request.segment.getReader(rl.settings) + rl.decoder.useJSON = request.segment.shouldUseJSON() if err != nil { return readerLoopResponse{err: err} } defer handle.Close() - // getReader positions us at the start of the data region, so we use - // a relative seek to advance to the request position. - _, err = handle.Seek(int64(request.startPosition), os.SEEK_CUR) + _, err = handle.Seek(int64(request.startPosition), io.SeekStart) if err != nil { return readerLoopResponse{err: err} } @@ -179,7 +179,7 @@ func (rl *readerLoop) nextFrame( // Ensure we are allowed to read the frame header. if maxLength < frameHeaderSize { return nil, fmt.Errorf( - "Can't read next frame: remaining length %d is too low", maxLength) + "can't read next frame: remaining length %d is too low", maxLength) } // Wrap the handle to retry non-fatal errors and always return the full // requested data length if possible. @@ -187,20 +187,20 @@ func (rl *readerLoop) nextFrame( var frameLength uint32 err := binary.Read(reader, binary.LittleEndian, &frameLength) if err != nil { - return nil, fmt.Errorf("Couldn't read data frame header: %w", err) + return nil, fmt.Errorf("couldn't read data frame header: %w", err) } // If the frame extends past the area we were told to read, return an error. // This should never happen unless the segment file is corrupted. if maxLength < uint64(frameLength) { return nil, fmt.Errorf( - "Can't read next frame: frame size is %d but remaining data is only %d", + "can't read next frame: frame size is %d but remaining data is only %d", frameLength, maxLength) } if frameLength <= frameMetadataSize { // Valid enqueued data must have positive length return nil, fmt.Errorf( - "Data frame with no data (length %d)", frameLength) + "data frame with no data (length %d)", frameLength) } // Read the actual frame data @@ -208,29 +208,29 @@ func (rl *readerLoop) nextFrame( bytes := rl.decoder.Buffer(int(dataLength)) _, err = reader.Read(bytes) if err != nil { - return nil, fmt.Errorf("Couldn't read data frame content: %w", err) + return nil, fmt.Errorf("couldn't read data frame content: %w", err) } // Read the footer (checksum + duplicate length) var checksum uint32 err = binary.Read(reader, binary.LittleEndian, &checksum) if err != nil { - return nil, fmt.Errorf("Couldn't read data frame checksum: %w", err) + return nil, fmt.Errorf("couldn't read data frame checksum: %w", err) } expected := computeChecksum(bytes) if checksum != expected { return nil, fmt.Errorf( - "Data frame checksum mismatch (%x != %x)", checksum, expected) + "data frame checksum mismatch (%x != %x)", checksum, expected) } var duplicateLength uint32 err = binary.Read(reader, binary.LittleEndian, &duplicateLength) if err != nil { - return nil, fmt.Errorf("Couldn't read data frame footer: %w", err) + return nil, fmt.Errorf("couldn't read data frame footer: %w", err) } if duplicateLength != frameLength { return nil, fmt.Errorf( - "Inconsistent data frame length (%d vs %d)", + "inconsistent data frame length (%d vs %d)", frameLength, duplicateLength) } @@ -242,7 +242,7 @@ func (rl *readerLoop) nextFrame( // TODO: Rather than pass this error back to the read request, which // discards the rest of the segment, we should just log the error and // advance to the next frame, which is likely still valid. - return nil, fmt.Errorf("Couldn't decode data frame: %w", err) + return nil, fmt.Errorf("couldn't decode data frame: %w", err) } frame := &readFrame{ diff --git a/libbeat/publisher/queue/diskqueue/segments.go b/libbeat/publisher/queue/diskqueue/segments.go index f666a1941c8..450139b2d04 100644 --- a/libbeat/publisher/queue/diskqueue/segments.go +++ b/libbeat/publisher/queue/diskqueue/segments.go @@ -91,7 +91,10 @@ type queueSegment struct { // If this segment was loaded from a previous session, schemaVersion // points to the file schema version that was read from its header. // This is only used by queueSegment.headerSize(), which is used in - // maybeReadPending to calculate the position of the first data frame. + // maybeReadPending to calculate the position of the first data frame, + // and by queueSegment.shouldUseJSON(), which is used in the reader + // loop to detect old segments that used JSON encoding instead of + // the current CBOR. schemaVersion *uint32 // The number of bytes occupied by this segment on-disk, as of the most @@ -198,6 +201,14 @@ func (segment *queueSegment) headerSize() uint64 { return segmentHeaderSize } +// The initial release of the disk queue used JSON to encode events +// on disk. Since then, we have switched to CBOR to address issues +// with encoding multi-byte characters, and for lower encoding +// overhead. +func (segment *queueSegment) shouldUseJSON() bool { + return segment.schemaVersion != nil && *segment.schemaVersion == 0 +} + // Should only be called from the reader loop. If successful, returns an open // file handle positioned at the beginning of the segment's data region. func (segment *queueSegment) getReader( @@ -207,14 +218,14 @@ func (segment *queueSegment) getReader( file, err := os.Open(path) if err != nil { return nil, fmt.Errorf( - "Couldn't open segment %d: %w", segment.id, err) + "couldn't open segment %d: %w", segment.id, err) } // We don't need the header contents here, we just want to advance past the // header region, so discard the return value. _, err = readSegmentHeader(file) if err != nil { file.Close() - return nil, fmt.Errorf("Couldn't read segment header: %w", err) + return nil, fmt.Errorf("couldn't read segment header: %w", err) } return file, nil @@ -231,7 +242,7 @@ func (segment *queueSegment) getWriter( } err = writeSegmentHeader(file, 0) if err != nil { - return nil, fmt.Errorf("Couldn't write segment header: %w", err) + return nil, fmt.Errorf("couldn't write segment header: %w", err) } return file, nil @@ -306,7 +317,7 @@ func readSegmentHeaderWithFrameCount(path string) (*segmentHeader, error) { // the current frame to make sure the trailing length matches before // advancing to the next frame (otherwise we might accept an impossible // length). - _, err = file.Seek(int64(frameLength-8), os.SEEK_CUR) + _, err = file.Seek(int64(frameLength-8), io.SeekCurrent) if err != nil { break } @@ -341,7 +352,7 @@ func readSegmentHeader(in io.Reader) (*segmentHeader, error) { return nil, err } if header.version > currentSegmentVersion { - return nil, fmt.Errorf("Unrecognized schema version %d", header.version) + return nil, fmt.Errorf("unrecognized schema version %d", header.version) } if header.version >= 1 { err = binary.Read(in, binary.LittleEndian, &header.frameCount) diff --git a/libbeat/publisher/queue/diskqueue/serialize.go b/libbeat/publisher/queue/diskqueue/serialize.go index 456f180665b..fc0c24de673 100644 --- a/libbeat/publisher/queue/diskqueue/serialize.go +++ b/libbeat/publisher/queue/diskqueue/serialize.go @@ -28,6 +28,7 @@ import ( "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/outputs/codec" "github.com/elastic/beats/v7/libbeat/publisher" + "github.com/elastic/go-structform/cborl" "github.com/elastic/go-structform/gotype" "github.com/elastic/go-structform/json" ) @@ -40,7 +41,13 @@ type eventEncoder struct { type eventDecoder struct { buf []byte - parser *json.Parser + jsonParser *json.Parser + cborlParser *cborl.Parser + + // Current serialization all uses CBOR. Set this flag when decoding + // from old (schema 0) segment files generated by the disk queue beta. + useJSON bool + unfolder *gotype.Unfolder } @@ -60,7 +67,7 @@ func newEventEncoder() *eventEncoder { func (e *eventEncoder) reset() { e.folder = nil - visitor := json.NewVisitor(&e.buf) + visitor := cborl.NewVisitor(&e.buf) // This can't return an error: NewIterator is deterministic based on its // input, and doesn't return an error when called with valid options. In // this case the options are hard-coded to fixed values, so they are @@ -109,7 +116,8 @@ func (d *eventDecoder) reset() { unfolder, _ := gotype.NewUnfolder(nil) d.unfolder = unfolder - d.parser = json.NewParser(unfolder) + d.jsonParser = json.NewParser(unfolder) + d.cborlParser = cborl.NewParser(unfolder) } // Buffer prepares the read buffer to hold the next event of n bytes. @@ -131,7 +139,11 @@ func (d *eventDecoder) Decode() (publisher.Event, error) { d.unfolder.SetTarget(&to) defer d.unfolder.Reset() - err = d.parser.Parse(d.buf) + if d.useJSON { + err = d.jsonParser.Parse(d.buf) + } else { + err = d.cborlParser.Parse(d.buf) + } if err != nil { d.reset() // reset parser just in case diff --git a/libbeat/publisher/queue/diskqueue/serialize_test.go b/libbeat/publisher/queue/diskqueue/serialize_test.go new file mode 100644 index 00000000000..888a42eca88 --- /dev/null +++ b/libbeat/publisher/queue/diskqueue/serialize_test.go @@ -0,0 +1,69 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package diskqueue + +import ( + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/elastic/beats/v7/libbeat/beat" + "github.com/elastic/beats/v7/libbeat/common" + "github.com/elastic/beats/v7/libbeat/publisher" +) + +// A test to make sure serialization works correctly on multi-byte characters. +func TestSerialize(t *testing.T) { + testCases := []struct { + name string + value string + }{ + {name: "Ascii only", value: "{\"name\": \"Momotaro\"}"}, + {name: "Multi-byte", value: "{\"name\": \"桃太郎\"}"}, + } + + for _, test := range testCases { + encoder := newEventEncoder() + event := publisher.Event{ + Content: beat.Event{ + Fields: common.MapStr{ + "test_field": test.value, + }, + }, + } + serialized, err := encoder.encode(&event) + if err != nil { + t.Fatalf("[%v] Couldn't encode event: %v", test.name, err) + } + + // Use decoder to decode the serialized bytes. + decoder := newEventDecoder() + buf := decoder.Buffer(len(serialized)) + copy(buf, serialized) + decoded, err := decoder.Decode() + if err != nil { + t.Fatalf("[%v] Couldn't decode serialized data: %v", test.name, err) + } + + decodedValue, err := decoded.Content.Fields.GetValue("test_field") + if err != nil { + t.Fatalf("[%v] Couldn't get field 'test_field': %v", test.name, err) + } + assert.Equal(t, test.value, decodedValue) + } +} diff --git a/libbeat/publisher/queue/diskqueue/writer_loop.go b/libbeat/publisher/queue/diskqueue/writer_loop.go index ce7bed6b2b0..31a03f9d27e 100644 --- a/libbeat/publisher/queue/diskqueue/writer_loop.go +++ b/libbeat/publisher/queue/diskqueue/writer_loop.go @@ -49,9 +49,9 @@ type writerLoopRequest struct { frames []segmentedFrame } -// A writerLoopResponseSegment specifies the number of frames and bytes +// A writerLoopSegmentResponse specifies the number of frames and bytes // written to a single segment as a result of a writerLoopRequest. -type writerLoopResponseSegment struct { +type writerLoopSegmentResponse struct { framesWritten uint32 bytesWritten uint64 } @@ -61,7 +61,7 @@ type writerLoopResponseSegment struct { // segment that appeared in the request, in the same order. If there is // more than one entry, then all but the last segment have been closed. type writerLoopResponse struct { - segments []writerLoopResponseSegment + segments []writerLoopSegmentResponse } type writerLoop struct { @@ -143,7 +143,7 @@ func (wl *writerLoop) processRequest( // responseEntry tracks the number of frames and bytes written to the // current segment. - var curSegment writerLoopResponseSegment + var curSegmentResponse writerLoopSegmentResponse // response var response writerLoopResponse outerLoop: @@ -157,14 +157,14 @@ outerLoop: // Update the header with the frame count (including the ones we // just wrote), try to sync to disk, then close the file. writeSegmentHeader(wl.outputFile, - wl.currentSegment.frameCount+curSegment.framesWritten) + wl.currentSegment.frameCount+curSegmentResponse.framesWritten) wl.outputFile.Sync() wl.outputFile.Close() wl.outputFile = nil // We are done with this segment, add the totals to the response and // reset the current counters. - response.segments = append(response.segments, curSegment) - curSegment = writerLoopResponseSegment{} + response.segments = append(response.segments, curSegmentResponse) + curSegmentResponse = writerLoopSegmentResponse{} } wl.currentSegment = frameRequest.segment file, err := wl.currentSegment.getWriterWithRetry( @@ -173,6 +173,9 @@ outerLoop: // This can only happen if the queue is being closed; abort. break } + // We're creating a new segment file, set the initial bytes written + // to the header size. + curSegmentResponse.bytesWritten = wl.currentSegment.headerSize() wl.outputFile = file } // Make sure our writer points to the current file handle. @@ -208,8 +211,8 @@ outerLoop: // abort while a frame is partially written, we only report up to the // last complete frame. (This almost never matters, but it allows for // more controlled recovery after a bad shutdown.) - curSegment.framesWritten++ - curSegment.bytesWritten += uint64(frameSize) + curSegmentResponse.framesWritten++ + curSegmentResponse.bytesWritten += uint64(frameSize) // Update the ACKs that will be sent at the end of the request. totalACKCount++ @@ -238,7 +241,7 @@ outerLoop: } // Add the final segment to the response and return it. - response.segments = append(response.segments, curSegment) + response.segments = append(response.segments, curSegmentResponse) return response }