diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 4b1e5f1150c..eb88a802d4d 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -167,6 +167,8 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Fix event.type for zeek/ssl and duplicate event.category for zeek/connection {pull}20696[20696] - Add json body check for sqs message. {pull}21727[21727] - Drop aws.vpcflow.pkt_srcaddr and aws.vpcflow.pkt_dstaddr when equal to "-". {pull}22721[22721] {issue}22716[22716] +- Fix bug in aws-s3 input where the end of gzipped log files might have been discarded. {pull}26260[26260] +- o365: Avoid mapping exception for `Parameters` and `ExtendedProperties` fields of string type. {pull}26164[26164] *Heartbeat* diff --git a/libbeat/reader/readfile/line.go b/libbeat/reader/readfile/line.go index 3d10eb7664d..c36b524dde2 100644 --- a/libbeat/reader/readfile/line.go +++ b/libbeat/reader/readfile/line.go @@ -138,6 +138,11 @@ func (r *LineReader) advance() error { // Try to read more bytes into buffer n, err := r.reader.Read(buf) + if err == io.EOF && n > 0 { + // Continue processing the returned bytes. The next call will yield EOF with 0 bytes. + err = nil + } + // Appends buffer also in case of err r.inBuffer.Append(buf[:n]) if err != nil { diff --git a/libbeat/reader/readfile/line_test.go b/libbeat/reader/readfile/line_test.go index df28f3345b9..4f20e4de161 100644 --- a/libbeat/reader/readfile/line_test.go +++ b/libbeat/reader/readfile/line_test.go @@ -219,10 +219,10 @@ func testReadLineLengths(t *testing.T, lineLengths []int) { lines = append(lines, inputLine) } - testReadLines(t, lines) + testReadLines(t, lines, false) } -func testReadLines(t *testing.T, inputLines [][]byte) { +func testReadLines(t *testing.T, inputLines [][]byte, eofOnLastRead bool) { var inputStream []byte for _, line := range inputLines { inputStream = append(inputStream, line...) @@ -230,8 +230,14 @@ func testReadLines(t *testing.T, inputLines [][]byte) { // initialize reader buffer := bytes.NewBuffer(inputStream) - codec, _ := encoding.Plain(buffer) - reader, err := NewLineReader(ioutil.NopCloser(buffer), Config{codec, buffer.Len(), LineFeed, unlimited}) + + var r io.Reader = buffer + if eofOnLastRead { + r = &eofWithNonZeroNumberOfBytesReader{buf: buffer} + } + + codec, _ := encoding.Plain(r) + reader, err := NewLineReader(ioutil.NopCloser(r), Config{codec, buffer.Len(), LineFeed, unlimited}) if err != nil { t.Fatalf("Error initializing reader: %v", err) } @@ -255,7 +261,7 @@ func testReadLines(t *testing.T, inputLines [][]byte) { } func testReadLine(t *testing.T, line []byte) { - testReadLines(t, [][]byte{line}) + testReadLines(t, [][]byte{line}, false) } func randomInt(r *rand.Rand, min, max int) int { @@ -425,3 +431,31 @@ func TestBufferSize(t *testing.T) { require.Equal(t, string(b[:n]), lines[i]) } } + +// eofWithNonZeroNumberOfBytesReader is an io.Reader implementation that at the +// end of the stream returns a non-zero number of bytes with io.EOF. This is +// allowed under the io.Reader interface contract and must be handled by the +// line reader. +type eofWithNonZeroNumberOfBytesReader struct { + buf *bytes.Buffer +} + +func (r *eofWithNonZeroNumberOfBytesReader) Read(d []byte) (int, error) { + n, err := r.buf.Read(d) + if err != nil { + return n, err + } + + // As per the io.Reader contract: + // "a Reader returning a non-zero number of bytes at the end of the input + // stream may return either err == EOF or err == nil." + if r.buf.Len() == 0 { + return n, io.EOF + } + return n, nil +} + +// Verify handling of the io.Reader returning n > 0 with io.EOF. +func TestReadWithNonZeroNumberOfBytesAndEOF(t *testing.T) { + testReadLines(t, [][]byte{[]byte("Hello world!\n")}, true) +}