Skip to content

Commit

Permalink
[7.13](backport #26260) Handle data returned with io.EOF in LineReader (
Browse files Browse the repository at this point in the history
#26300)

* Handle data returned with io.EOF in LineReader (#26260)

The libbeat LineReader implementation did not handle the case where the underlying io.Reader
it was reading from returns bytes and io.EOF. It was discarding the data in this case.

As per the io.Reader contract:

    a Reader returning a non-zero number of bytes at the end of the input stream may return either err == EOF or err == nil.

This occurs often with the gzip.Reader. It returns a large chunk of data at the end of the file and io.EOF at the same time.

(cherry picked from commit 6896e77)

* Update CHANGELOG.next.asciidoc

Co-authored-by: Andrew Kroh <andrew.kroh@elastic.co>
  • Loading branch information
mergify[bot] and andrewkroh authored Jun 15, 2021
1 parent 7fc0443 commit f28af6f
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 5 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,8 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Fix event.type for zeek/ssl and duplicate event.category for zeek/connection {pull}20696[20696]
- Add json body check for sqs message. {pull}21727[21727]
- Drop aws.vpcflow.pkt_srcaddr and aws.vpcflow.pkt_dstaddr when equal to "-". {pull}22721[22721] {issue}22716[22716]
- Fix bug in aws-s3 input where the end of gzipped log files might have been discarded. {pull}26260[26260]
- o365: Avoid mapping exception for `Parameters` and `ExtendedProperties` fields of string type. {pull}26164[26164]

*Heartbeat*

Expand Down
5 changes: 5 additions & 0 deletions libbeat/reader/readfile/line.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,11 @@ func (r *LineReader) advance() error {
// Try to read more bytes into buffer
n, err := r.reader.Read(buf)

if err == io.EOF && n > 0 {
// Continue processing the returned bytes. The next call will yield EOF with 0 bytes.
err = nil
}

// Appends buffer also in case of err
r.inBuffer.Append(buf[:n])
if err != nil {
Expand Down
44 changes: 39 additions & 5 deletions libbeat/reader/readfile/line_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,19 +219,25 @@ func testReadLineLengths(t *testing.T, lineLengths []int) {
lines = append(lines, inputLine)
}

testReadLines(t, lines)
testReadLines(t, lines, false)
}

func testReadLines(t *testing.T, inputLines [][]byte) {
func testReadLines(t *testing.T, inputLines [][]byte, eofOnLastRead bool) {
var inputStream []byte
for _, line := range inputLines {
inputStream = append(inputStream, line...)
}

// initialize reader
buffer := bytes.NewBuffer(inputStream)
codec, _ := encoding.Plain(buffer)
reader, err := NewLineReader(ioutil.NopCloser(buffer), Config{codec, buffer.Len(), LineFeed, unlimited})

var r io.Reader = buffer
if eofOnLastRead {
r = &eofWithNonZeroNumberOfBytesReader{buf: buffer}
}

codec, _ := encoding.Plain(r)
reader, err := NewLineReader(ioutil.NopCloser(r), Config{codec, buffer.Len(), LineFeed, unlimited})
if err != nil {
t.Fatalf("Error initializing reader: %v", err)
}
Expand All @@ -255,7 +261,7 @@ func testReadLines(t *testing.T, inputLines [][]byte) {
}

func testReadLine(t *testing.T, line []byte) {
testReadLines(t, [][]byte{line})
testReadLines(t, [][]byte{line}, false)
}

func randomInt(r *rand.Rand, min, max int) int {
Expand Down Expand Up @@ -425,3 +431,31 @@ func TestBufferSize(t *testing.T) {
require.Equal(t, string(b[:n]), lines[i])
}
}

// eofWithNonZeroNumberOfBytesReader is an io.Reader implementation that at the
// end of the stream returns a non-zero number of bytes with io.EOF. This is
// allowed under the io.Reader interface contract and must be handled by the
// line reader.
type eofWithNonZeroNumberOfBytesReader struct {
buf *bytes.Buffer
}

func (r *eofWithNonZeroNumberOfBytesReader) Read(d []byte) (int, error) {
n, err := r.buf.Read(d)
if err != nil {
return n, err
}

// As per the io.Reader contract:
// "a Reader returning a non-zero number of bytes at the end of the input
// stream may return either err == EOF or err == nil."
if r.buf.Len() == 0 {
return n, io.EOF
}
return n, nil
}

// Verify handling of the io.Reader returning n > 0 with io.EOF.
func TestReadWithNonZeroNumberOfBytesAndEOF(t *testing.T) {
testReadLines(t, [][]byte{[]byte("Hello world!\n")}, true)
}

0 comments on commit f28af6f

Please sign in to comment.