Skip to content

Commit

Permalink
[libbeat] Fix encoding and file offset issues in the disk queue (#26484
Browse files Browse the repository at this point in the history
…) (#26528)

(cherry picked from commit a3b447a)

Co-authored-by: Fae Charlton <fae.charlton@elastic.co>
  • Loading branch information
mergify[bot] and faec authored Jun 28, 2021
1 parent 26d9901 commit 8e10b96
Show file tree
Hide file tree
Showing 7 changed files with 132 additions and 36 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Fix `community_id` processor so that ports greater than 65535 aren't valid. {pull}25409[25409]
- Fix ILM alias creation when write alias exists and initial index does not exist {pull}26143[26143]
- In the script processor, the `decode_xml` and `decode_xml_wineventlog` processors are now available as `DecodeXML` and `DecodeXMLWineventlog` respectively.
- Fix encoding errors when using the disk queue on nested data with multi-byte characters {pull}26484[26484]

*Auditbeat*

Expand Down
6 changes: 3 additions & 3 deletions libbeat/publisher/queue/diskqueue/core_loop_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ func TestHandleWriterLoopResponse(t *testing.T) {

// Write to one segment (no segments should be moved to reading list)
dq.handleWriterLoopResponse(writerLoopResponse{
segments: []writerLoopResponseSegment{
segments: []writerLoopSegmentResponse{
{bytesWritten: 100},
},
})
Expand All @@ -250,7 +250,7 @@ func TestHandleWriterLoopResponse(t *testing.T) {

// Write to two segments (the first one should be moved to reading list)
dq.handleWriterLoopResponse(writerLoopResponse{
segments: []writerLoopResponseSegment{
segments: []writerLoopSegmentResponse{
{bytesWritten: 100},
{bytesWritten: 100},
},
Expand All @@ -270,7 +270,7 @@ func TestHandleWriterLoopResponse(t *testing.T) {

// Write to three segments (the first two should be moved to reading list)
dq.handleWriterLoopResponse(writerLoopResponse{
segments: []writerLoopResponseSegment{
segments: []writerLoopSegmentResponse{
{bytesWritten: 100},
{bytesWritten: 100},
{bytesWritten: 500},
Expand Down
26 changes: 13 additions & 13 deletions libbeat/publisher/queue/diskqueue/reader_loop.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package diskqueue
import (
"encoding/binary"
"fmt"
"io"
"os"
)

Expand Down Expand Up @@ -100,13 +101,12 @@ func (rl *readerLoop) processRequest(request readerLoopRequest) readerLoopRespon

// Open the file and seek to the starting position.
handle, err := request.segment.getReader(rl.settings)
rl.decoder.useJSON = request.segment.shouldUseJSON()
if err != nil {
return readerLoopResponse{err: err}
}
defer handle.Close()
// getReader positions us at the start of the data region, so we use
// a relative seek to advance to the request position.
_, err = handle.Seek(int64(request.startPosition), os.SEEK_CUR)
_, err = handle.Seek(int64(request.startPosition), io.SeekStart)
if err != nil {
return readerLoopResponse{err: err}
}
Expand Down Expand Up @@ -179,58 +179,58 @@ func (rl *readerLoop) nextFrame(
// Ensure we are allowed to read the frame header.
if maxLength < frameHeaderSize {
return nil, fmt.Errorf(
"Can't read next frame: remaining length %d is too low", maxLength)
"can't read next frame: remaining length %d is too low", maxLength)
}
// Wrap the handle to retry non-fatal errors and always return the full
// requested data length if possible.
reader := autoRetryReader{handle}
var frameLength uint32
err := binary.Read(reader, binary.LittleEndian, &frameLength)
if err != nil {
return nil, fmt.Errorf("Couldn't read data frame header: %w", err)
return nil, fmt.Errorf("couldn't read data frame header: %w", err)
}

// If the frame extends past the area we were told to read, return an error.
// This should never happen unless the segment file is corrupted.
if maxLength < uint64(frameLength) {
return nil, fmt.Errorf(
"Can't read next frame: frame size is %d but remaining data is only %d",
"can't read next frame: frame size is %d but remaining data is only %d",
frameLength, maxLength)
}
if frameLength <= frameMetadataSize {
// Valid enqueued data must have positive length
return nil, fmt.Errorf(
"Data frame with no data (length %d)", frameLength)
"data frame with no data (length %d)", frameLength)
}

// Read the actual frame data
dataLength := frameLength - frameMetadataSize
bytes := rl.decoder.Buffer(int(dataLength))
_, err = reader.Read(bytes)
if err != nil {
return nil, fmt.Errorf("Couldn't read data frame content: %w", err)
return nil, fmt.Errorf("couldn't read data frame content: %w", err)
}

// Read the footer (checksum + duplicate length)
var checksum uint32
err = binary.Read(reader, binary.LittleEndian, &checksum)
if err != nil {
return nil, fmt.Errorf("Couldn't read data frame checksum: %w", err)
return nil, fmt.Errorf("couldn't read data frame checksum: %w", err)
}
expected := computeChecksum(bytes)
if checksum != expected {
return nil, fmt.Errorf(
"Data frame checksum mismatch (%x != %x)", checksum, expected)
"data frame checksum mismatch (%x != %x)", checksum, expected)
}

var duplicateLength uint32
err = binary.Read(reader, binary.LittleEndian, &duplicateLength)
if err != nil {
return nil, fmt.Errorf("Couldn't read data frame footer: %w", err)
return nil, fmt.Errorf("couldn't read data frame footer: %w", err)
}
if duplicateLength != frameLength {
return nil, fmt.Errorf(
"Inconsistent data frame length (%d vs %d)",
"inconsistent data frame length (%d vs %d)",
frameLength, duplicateLength)
}

Expand All @@ -242,7 +242,7 @@ func (rl *readerLoop) nextFrame(
// TODO: Rather than pass this error back to the read request, which
// discards the rest of the segment, we should just log the error and
// advance to the next frame, which is likely still valid.
return nil, fmt.Errorf("Couldn't decode data frame: %w", err)
return nil, fmt.Errorf("couldn't decode data frame: %w", err)
}

frame := &readFrame{
Expand Down
23 changes: 17 additions & 6 deletions libbeat/publisher/queue/diskqueue/segments.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,10 @@ type queueSegment struct {
// If this segment was loaded from a previous session, schemaVersion
// points to the file schema version that was read from its header.
// This is only used by queueSegment.headerSize(), which is used in
// maybeReadPending to calculate the position of the first data frame.
// maybeReadPending to calculate the position of the first data frame,
// and by queueSegment.shouldUseJSON(), which is used in the reader
// loop to detect old segments that used JSON encoding instead of
// the current CBOR.
schemaVersion *uint32

// The number of bytes occupied by this segment on-disk, as of the most
Expand Down Expand Up @@ -198,6 +201,14 @@ func (segment *queueSegment) headerSize() uint64 {
return segmentHeaderSize
}

// The initial release of the disk queue used JSON to encode events
// on disk. Since then, we have switched to CBOR to address issues
// with encoding multi-byte characters, and for lower encoding
// overhead.
func (segment *queueSegment) shouldUseJSON() bool {
return segment.schemaVersion != nil && *segment.schemaVersion == 0
}

// Should only be called from the reader loop. If successful, returns an open
// file handle positioned at the beginning of the segment's data region.
func (segment *queueSegment) getReader(
Expand All @@ -207,14 +218,14 @@ func (segment *queueSegment) getReader(
file, err := os.Open(path)
if err != nil {
return nil, fmt.Errorf(
"Couldn't open segment %d: %w", segment.id, err)
"couldn't open segment %d: %w", segment.id, err)
}
// We don't need the header contents here, we just want to advance past the
// header region, so discard the return value.
_, err = readSegmentHeader(file)
if err != nil {
file.Close()
return nil, fmt.Errorf("Couldn't read segment header: %w", err)
return nil, fmt.Errorf("couldn't read segment header: %w", err)
}

return file, nil
Expand All @@ -231,7 +242,7 @@ func (segment *queueSegment) getWriter(
}
err = writeSegmentHeader(file, 0)
if err != nil {
return nil, fmt.Errorf("Couldn't write segment header: %w", err)
return nil, fmt.Errorf("couldn't write segment header: %w", err)
}

return file, nil
Expand Down Expand Up @@ -306,7 +317,7 @@ func readSegmentHeaderWithFrameCount(path string) (*segmentHeader, error) {
// the current frame to make sure the trailing length matches before
// advancing to the next frame (otherwise we might accept an impossible
// length).
_, err = file.Seek(int64(frameLength-8), os.SEEK_CUR)
_, err = file.Seek(int64(frameLength-8), io.SeekCurrent)
if err != nil {
break
}
Expand Down Expand Up @@ -341,7 +352,7 @@ func readSegmentHeader(in io.Reader) (*segmentHeader, error) {
return nil, err
}
if header.version > currentSegmentVersion {
return nil, fmt.Errorf("Unrecognized schema version %d", header.version)
return nil, fmt.Errorf("unrecognized schema version %d", header.version)
}
if header.version >= 1 {
err = binary.Read(in, binary.LittleEndian, &header.frameCount)
Expand Down
20 changes: 16 additions & 4 deletions libbeat/publisher/queue/diskqueue/serialize.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/outputs/codec"
"github.com/elastic/beats/v7/libbeat/publisher"
"github.com/elastic/go-structform/cborl"
"github.com/elastic/go-structform/gotype"
"github.com/elastic/go-structform/json"
)
Expand All @@ -40,7 +41,13 @@ type eventEncoder struct {
type eventDecoder struct {
buf []byte

parser *json.Parser
jsonParser *json.Parser
cborlParser *cborl.Parser

// Current serialization all uses CBOR. Set this flag when decoding
// from old (schema 0) segment files generated by the disk queue beta.
useJSON bool

unfolder *gotype.Unfolder
}

Expand All @@ -60,7 +67,7 @@ func newEventEncoder() *eventEncoder {
func (e *eventEncoder) reset() {
e.folder = nil

visitor := json.NewVisitor(&e.buf)
visitor := cborl.NewVisitor(&e.buf)
// This can't return an error: NewIterator is deterministic based on its
// input, and doesn't return an error when called with valid options. In
// this case the options are hard-coded to fixed values, so they are
Expand Down Expand Up @@ -109,7 +116,8 @@ func (d *eventDecoder) reset() {
unfolder, _ := gotype.NewUnfolder(nil)

d.unfolder = unfolder
d.parser = json.NewParser(unfolder)
d.jsonParser = json.NewParser(unfolder)
d.cborlParser = cborl.NewParser(unfolder)
}

// Buffer prepares the read buffer to hold the next event of n bytes.
Expand All @@ -131,7 +139,11 @@ func (d *eventDecoder) Decode() (publisher.Event, error) {
d.unfolder.SetTarget(&to)
defer d.unfolder.Reset()

err = d.parser.Parse(d.buf)
if d.useJSON {
err = d.jsonParser.Parse(d.buf)
} else {
err = d.cborlParser.Parse(d.buf)
}

if err != nil {
d.reset() // reset parser just in case
Expand Down
69 changes: 69 additions & 0 deletions libbeat/publisher/queue/diskqueue/serialize_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package diskqueue

import (
"testing"

"github.com/stretchr/testify/assert"

"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/publisher"
)

// A test to make sure serialization works correctly on multi-byte characters.
func TestSerialize(t *testing.T) {
testCases := []struct {
name string
value string
}{
{name: "Ascii only", value: "{\"name\": \"Momotaro\"}"},
{name: "Multi-byte", value: "{\"name\": \"桃太郎\"}"},
}

for _, test := range testCases {
encoder := newEventEncoder()
event := publisher.Event{
Content: beat.Event{
Fields: common.MapStr{
"test_field": test.value,
},
},
}
serialized, err := encoder.encode(&event)
if err != nil {
t.Fatalf("[%v] Couldn't encode event: %v", test.name, err)
}

// Use decoder to decode the serialized bytes.
decoder := newEventDecoder()
buf := decoder.Buffer(len(serialized))
copy(buf, serialized)
decoded, err := decoder.Decode()
if err != nil {
t.Fatalf("[%v] Couldn't decode serialized data: %v", test.name, err)
}

decodedValue, err := decoded.Content.Fields.GetValue("test_field")
if err != nil {
t.Fatalf("[%v] Couldn't get field 'test_field': %v", test.name, err)
}
assert.Equal(t, test.value, decodedValue)
}
}
Loading

0 comments on commit 8e10b96

Please sign in to comment.