Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[libbeat] Fix encoding and file offset issues in the disk queue #26484

Merged
merged 8 commits into from
Jun 28, 2021
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Fix ILM alias creation when write alias exists and initial index does not exist {pull}26143[26143]
- Omit full index template from errors that occur while loading the template. {pull}25743[25743]
- In the script processor, the `decode_xml` and `decode_xml_wineventlog` processors are now available as `DecodeXML` and `DecodeXMLWineventlog` respectively.
- Fix encoding errors when using the disk queue on nested data with multi-byte characters {pull}26484[26484]

*Auditbeat*

Expand Down
6 changes: 3 additions & 3 deletions libbeat/publisher/queue/diskqueue/core_loop_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ func TestHandleWriterLoopResponse(t *testing.T) {

// Write to one segment (no segments should be moved to reading list)
dq.handleWriterLoopResponse(writerLoopResponse{
segments: []writerLoopResponseSegment{
segments: []writerLoopSegmentResponse{
{bytesWritten: 100},
},
})
Expand All @@ -250,7 +250,7 @@ func TestHandleWriterLoopResponse(t *testing.T) {

// Write to two segments (the first one should be moved to reading list)
dq.handleWriterLoopResponse(writerLoopResponse{
segments: []writerLoopResponseSegment{
segments: []writerLoopSegmentResponse{
{bytesWritten: 100},
{bytesWritten: 100},
},
Expand All @@ -270,7 +270,7 @@ func TestHandleWriterLoopResponse(t *testing.T) {

// Write to three segments (the first two should be moved to reading list)
dq.handleWriterLoopResponse(writerLoopResponse{
segments: []writerLoopResponseSegment{
segments: []writerLoopSegmentResponse{
{bytesWritten: 100},
{bytesWritten: 100},
{bytesWritten: 500},
Expand Down
26 changes: 13 additions & 13 deletions libbeat/publisher/queue/diskqueue/reader_loop.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package diskqueue
import (
"encoding/binary"
"fmt"
"io"
"os"
)

Expand Down Expand Up @@ -100,13 +101,12 @@ func (rl *readerLoop) processRequest(request readerLoopRequest) readerLoopRespon

// Open the file and seek to the starting position.
handle, err := request.segment.getReader(rl.settings)
rl.decoder.useJSON = request.segment.shouldUseJSON()
if err != nil {
return readerLoopResponse{err: err}
}
defer handle.Close()
// getReader positions us at the start of the data region, so we use
// a relative seek to advance to the request position.
_, err = handle.Seek(int64(request.startPosition), os.SEEK_CUR)
_, err = handle.Seek(int64(request.startPosition), io.SeekStart)
if err != nil {
return readerLoopResponse{err: err}
}
Expand Down Expand Up @@ -179,58 +179,58 @@ func (rl *readerLoop) nextFrame(
// Ensure we are allowed to read the frame header.
if maxLength < frameHeaderSize {
return nil, fmt.Errorf(
"Can't read next frame: remaining length %d is too low", maxLength)
"can't read next frame: remaining length %d is too low", maxLength)
}
// Wrap the handle to retry non-fatal errors and always return the full
// requested data length if possible.
reader := autoRetryReader{handle}
var frameLength uint32
err := binary.Read(reader, binary.LittleEndian, &frameLength)
if err != nil {
return nil, fmt.Errorf("Couldn't read data frame header: %w", err)
return nil, fmt.Errorf("couldn't read data frame header: %w", err)
}

// If the frame extends past the area we were told to read, return an error.
// This should never happen unless the segment file is corrupted.
if maxLength < uint64(frameLength) {
return nil, fmt.Errorf(
"Can't read next frame: frame size is %d but remaining data is only %d",
"can't read next frame: frame size is %d but remaining data is only %d",
frameLength, maxLength)
}
if frameLength <= frameMetadataSize {
// Valid enqueued data must have positive length
return nil, fmt.Errorf(
"Data frame with no data (length %d)", frameLength)
"data frame with no data (length %d)", frameLength)
}

// Read the actual frame data
dataLength := frameLength - frameMetadataSize
bytes := rl.decoder.Buffer(int(dataLength))
_, err = reader.Read(bytes)
if err != nil {
return nil, fmt.Errorf("Couldn't read data frame content: %w", err)
return nil, fmt.Errorf("couldn't read data frame content: %w", err)
}

// Read the footer (checksum + duplicate length)
var checksum uint32
err = binary.Read(reader, binary.LittleEndian, &checksum)
if err != nil {
return nil, fmt.Errorf("Couldn't read data frame checksum: %w", err)
return nil, fmt.Errorf("couldn't read data frame checksum: %w", err)
}
expected := computeChecksum(bytes)
if checksum != expected {
return nil, fmt.Errorf(
"Data frame checksum mismatch (%x != %x)", checksum, expected)
"data frame checksum mismatch (%x != %x)", checksum, expected)
}

var duplicateLength uint32
err = binary.Read(reader, binary.LittleEndian, &duplicateLength)
if err != nil {
return nil, fmt.Errorf("Couldn't read data frame footer: %w", err)
return nil, fmt.Errorf("couldn't read data frame footer: %w", err)
}
if duplicateLength != frameLength {
return nil, fmt.Errorf(
"Inconsistent data frame length (%d vs %d)",
"inconsistent data frame length (%d vs %d)",
frameLength, duplicateLength)
}

Expand All @@ -242,7 +242,7 @@ func (rl *readerLoop) nextFrame(
// TODO: Rather than pass this error back to the read request, which
// discards the rest of the segment, we should just log the error and
// advance to the next frame, which is likely still valid.
return nil, fmt.Errorf("Couldn't decode data frame: %w", err)
return nil, fmt.Errorf("couldn't decode data frame: %w", err)
}

frame := &readFrame{
Expand Down
23 changes: 17 additions & 6 deletions libbeat/publisher/queue/diskqueue/segments.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,10 @@ type queueSegment struct {
// If this segment was loaded from a previous session, schemaVersion
// points to the file schema version that was read from its header.
// This is only used by queueSegment.headerSize(), which is used in
// maybeReadPending to calculate the position of the first data frame.
// maybeReadPending to calculate the position of the first data frame,
// and by queueSegment.shouldUseJSON(), which is used in the reader
// loop to detect old segments that used JSON encoding instead of
// the current CBOR.
schemaVersion *uint32

// The number of bytes occupied by this segment on-disk, as of the most
Expand Down Expand Up @@ -198,6 +201,14 @@ func (segment *queueSegment) headerSize() uint64 {
return segmentHeaderSize
}

// The initial release of the disk queue used JSON to encode events
// on disk. Since then, we have switched to CBOR to address issues
// with encoding multi-byte characters, and for lower encoding
// overhead.
func (segment *queueSegment) shouldUseJSON() bool {
return segment.schemaVersion != nil && *segment.schemaVersion == 0
}

// Should only be called from the reader loop. If successful, returns an open
// file handle positioned at the beginning of the segment's data region.
func (segment *queueSegment) getReader(
Expand All @@ -207,14 +218,14 @@ func (segment *queueSegment) getReader(
file, err := os.Open(path)
if err != nil {
return nil, fmt.Errorf(
"Couldn't open segment %d: %w", segment.id, err)
"couldn't open segment %d: %w", segment.id, err)
}
// We don't need the header contents here, we just want to advance past the
// header region, so discard the return value.
_, err = readSegmentHeader(file)
if err != nil {
file.Close()
return nil, fmt.Errorf("Couldn't read segment header: %w", err)
return nil, fmt.Errorf("couldn't read segment header: %w", err)
}

return file, nil
Expand All @@ -231,7 +242,7 @@ func (segment *queueSegment) getWriter(
}
err = writeSegmentHeader(file, 0)
if err != nil {
return nil, fmt.Errorf("Couldn't write segment header: %w", err)
return nil, fmt.Errorf("couldn't write segment header: %w", err)
}

return file, nil
Expand Down Expand Up @@ -306,7 +317,7 @@ func readSegmentHeaderWithFrameCount(path string) (*segmentHeader, error) {
// the current frame to make sure the trailing length matches before
// advancing to the next frame (otherwise we might accept an impossible
// length).
_, err = file.Seek(int64(frameLength-8), os.SEEK_CUR)
_, err = file.Seek(int64(frameLength-8), io.SeekCurrent)
if err != nil {
break
}
Expand Down Expand Up @@ -341,7 +352,7 @@ func readSegmentHeader(in io.Reader) (*segmentHeader, error) {
return nil, err
}
if header.version > currentSegmentVersion {
return nil, fmt.Errorf("Unrecognized schema version %d", header.version)
return nil, fmt.Errorf("unrecognized schema version %d", header.version)
}
if header.version >= 1 {
err = binary.Read(in, binary.LittleEndian, &header.frameCount)
Expand Down
20 changes: 16 additions & 4 deletions libbeat/publisher/queue/diskqueue/serialize.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/outputs/codec"
"github.com/elastic/beats/v7/libbeat/publisher"
"github.com/elastic/go-structform/cborl"
"github.com/elastic/go-structform/gotype"
"github.com/elastic/go-structform/json"
)
Expand All @@ -40,7 +41,13 @@ type eventEncoder struct {
type eventDecoder struct {
buf []byte

parser *json.Parser
jsonParser *json.Parser
cborlParser *cborl.Parser

// Current serialization all uses CBOR. Set this flag when decoding
// from old (schema 0) segment files generated by the disk queue beta.
useJSON bool

unfolder *gotype.Unfolder
}

Expand All @@ -60,7 +67,7 @@ func newEventEncoder() *eventEncoder {
func (e *eventEncoder) reset() {
e.folder = nil

visitor := json.NewVisitor(&e.buf)
visitor := cborl.NewVisitor(&e.buf)
// This can't return an error: NewIterator is deterministic based on its
// input, and doesn't return an error when called with valid options. In
// this case the options are hard-coded to fixed values, so they are
Expand Down Expand Up @@ -109,7 +116,8 @@ func (d *eventDecoder) reset() {
unfolder, _ := gotype.NewUnfolder(nil)

d.unfolder = unfolder
d.parser = json.NewParser(unfolder)
d.jsonParser = json.NewParser(unfolder)
d.cborlParser = cborl.NewParser(unfolder)
}

// Buffer prepares the read buffer to hold the next event of n bytes.
Expand All @@ -131,7 +139,11 @@ func (d *eventDecoder) Decode() (publisher.Event, error) {
d.unfolder.SetTarget(&to)
defer d.unfolder.Reset()

err = d.parser.Parse(d.buf)
if d.useJSON {
err = d.jsonParser.Parse(d.buf)
} else {
err = d.cborlParser.Parse(d.buf)
}

if err != nil {
d.reset() // reset parser just in case
Expand Down
63 changes: 63 additions & 0 deletions libbeat/publisher/queue/diskqueue/serialize_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package diskqueue

import (
"testing"

"github.com/stretchr/testify/assert"

"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/publisher"
)

// A test to make sure serialization works correctly on multi-byte characters.
func TestSerializeMultiByte(t *testing.T) {
asciiOnly := "{\"name\": \"Momotaro\"}"
multiBytes := "{\"name\": \"桃太郎\"}"
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's create a table driven test and separate the 2 cases into separate tests.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done


encoder := newEventEncoder()
event := publisher.Event{
Content: beat.Event{
Fields: common.MapStr{
"ascii_only": asciiOnly,
"multi_bytes": multiBytes,
},
},
}
serialized, err := encoder.encode(&event)
if err != nil {
t.Fatalf("Couldn't encode event: %v", err)
}

// Use decoder to decode the serialized bytes.
decoder := newEventDecoder()
buf := decoder.Buffer(len(serialized))
copy(buf, serialized)
decoded, err := decoder.Decode()
if err != nil {
t.Fatalf("Couldn't decode serialized data: %v", err)
}

decodedAsciiOnly, _ := decoded.Content.Fields.GetValue("ascii_only")
assert.Equal(t, asciiOnly, decodedAsciiOnly)

decodedMultiBytes, _ := decoded.Content.Fields.GetValue("multi_bytes")
assert.Equal(t, multiBytes, decodedMultiBytes)
}
Loading