diff --git a/CHANGELOG-developer.next.asciidoc b/CHANGELOG-developer.next.asciidoc index f1daee33282..0f0565bfb30 100644 --- a/CHANGELOG-developer.next.asciidoc +++ b/CHANGELOG-developer.next.asciidoc @@ -48,6 +48,7 @@ The list below covers the major changes between 7.0.0-rc2 and master only. - Remove `common.MapStrPointer` parameter from `cfgfile.Runnerfactory` interface. {pull}19135[19135] - Replace `ACKCount`, `ACKEvents`, and `ACKLastEvent` callbacks with `ACKHandler` and interface in `beat.ClientConfig`. {pull}19632[19632] - Remove global ACK handler support via `SetACKHandler` from publisher pipeline. {pull}19632[19632] +- Make implementing `Close` required for `reader.Reader` interfaces. {pull}20455[20455] ==== Bugfixes diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index d955c84e16f..2b514568c2f 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -616,6 +616,10 @@ field. You can revert this change by configuring tags for the module and omittin - Add event.ingested for Suricata module {pull}20220[20220] - Add support for custom header and headersecret for filebeat http_endpoint input {pull}20435[20435] - Convert httpjson to v2 input {pull}20226[20226] +- Add event.ingested to all Filebeat modules. {pull}20386[20386] +- Return error when log harvester tries to open a named pipe. {issue}18682[18682] {pull}20450[20450] +- Avoid goroutine leaks in Filebeat readers. {issue}19193[19193] {pull}20455[20455] + *Heartbeat* diff --git a/filebeat/input/log/harvester.go b/filebeat/input/log/harvester.go index 9dc93202951..94f5caa76f4 100644 --- a/filebeat/input/log/harvester.go +++ b/filebeat/input/log/harvester.go @@ -291,7 +291,10 @@ func (h *Harvester) Run() error { } h.stop() - h.log.Close() + err := h.reader.Close() + if err != nil { + logp.Err("Failed to stop harvester for file %s: %v", h.state.Source, err) + } }(h.state.Source) logp.Info("Harvester started for file: %s", h.state.Source) diff --git a/filebeat/input/log/log.go b/filebeat/input/log/log.go index 1a89c5bc8d1..60728143764 100644 --- a/filebeat/input/log/log.go +++ b/filebeat/input/log/log.go @@ -208,7 +208,8 @@ func (f *Log) wait() { } // Close closes the done channel but no th the file handler -func (f *Log) Close() { +func (f *Log) Close() error { close(f.done) // Note: File reader is not closed here because that leads to race conditions + return nil } diff --git a/libbeat/reader/debug/debug.go b/libbeat/reader/debug/debug.go index 5e0b1e7c93a..a3bb7cc93fd 100644 --- a/libbeat/reader/debug/debug.go +++ b/libbeat/reader/debug/debug.go @@ -46,7 +46,7 @@ type CheckFunc func(offset int64, buf []byte) bool // Is is useful is you want to detect if you have received garbage from a network volume. type Reader struct { log *logp.Logger - reader io.Reader + reader io.ReadCloser buffer bytes.Buffer minBufferSize int maxFailures int @@ -59,7 +59,7 @@ type Reader struct { // NewReader returns a debug reader. func NewReader( log *logp.Logger, - reader io.Reader, + reader io.ReadCloser, minBufferSize int, maxFailures int, predicate CheckFunc, @@ -115,6 +115,10 @@ func (r *Reader) Read(p []byte) (int, error) { return n, err } +func (r *Reader) Close() error { + return r.reader.Close() +} + func makeNullCheck(log *logp.Logger, minSize int) CheckFunc { // create a slice with null bytes to match on the buffer. pattern := make([]byte, minSize, minSize) @@ -159,7 +163,7 @@ func summarizeBufferInfo(idx int, buf []byte) (int, []byte) { // AppendReaders look into the current enabled log selector and will add any debug reader that match // the selectors. -func AppendReaders(reader io.Reader) (io.Reader, error) { +func AppendReaders(reader io.ReadCloser) (io.ReadCloser, error) { var err error if logp.HasSelector("detect_null_bytes") || logp.HasSelector("*") { diff --git a/libbeat/reader/debug/debug_test.go b/libbeat/reader/debug/debug_test.go index ac6b6878c9b..c913cc2f0e6 100644 --- a/libbeat/reader/debug/debug_test.go +++ b/libbeat/reader/debug/debug_test.go @@ -20,6 +20,7 @@ package debug import ( "bytes" "io" + "io/ioutil" "testing" "github.com/stretchr/testify/assert" @@ -78,8 +79,9 @@ func testCheckContent(t *testing.T) { s.WriteString("hello world") s.WriteByte(0x00) s.WriteString("hello world") + r := ioutil.NopCloser(&s) - reader, _ := NewReader(logp.L(), &s, 5, 3, check) + reader, _ := NewReader(logp.L(), r, 5, 3, check) _, err := reader.Read(make([]byte, 20)) if !assert.NoError(t, err) { @@ -91,7 +93,7 @@ func testCheckContent(t *testing.T) { func testConsumeAll(t *testing.T) { c, _ := common.RandomBytes(2000) - reader := bytes.NewReader(c) + reader := ioutil.NopCloser(bytes.NewReader(c)) var buf bytes.Buffer consumed := 0 debug, _ := NewReader(logp.L(), reader, 8, 20, makeNullCheck(logp.L(), 1)) @@ -106,8 +108,8 @@ func testConsumeAll(t *testing.T) { } func testEmptyBuffer(t *testing.T) { - var buf bytes.Buffer - debug, _ := NewReader(logp.L(), &buf, 8, 20, makeNullCheck(logp.L(), 1)) + buf := ioutil.NopCloser(&bytes.Buffer{}) + debug, _ := NewReader(logp.L(), buf, 8, 20, makeNullCheck(logp.L(), 1)) data := make([]byte, 33) n, err := debug.Read(data) assert.Equal(t, io.EOF, err) @@ -134,8 +136,9 @@ func testSilent(t *testing.T) { b.Write([]byte{'a', 'b', 'c', 'd', 0x00, 'e'}) b.Write([]byte{'a', 'b', 'c', 'd', 0x00, 'e'}) b.Write([]byte{'a', 'b', 'c', 'd', 0x00, 'e'}) + r := ioutil.NopCloser(&b) - debug, _ := NewReader(logp.L(), &b, 3, 2, check) + debug, _ := NewReader(logp.L(), r, 3, 2, check) consumed := 0 for consumed < b.Len() { n, _ := debug.Read(make([]byte, 3)) diff --git a/libbeat/reader/multiline/counter.go b/libbeat/reader/multiline/counter.go index bd410bc4ef7..602ac265885 100644 --- a/libbeat/reader/multiline/counter.go +++ b/libbeat/reader/multiline/counter.go @@ -18,6 +18,8 @@ package multiline import ( + "io" + "github.com/elastic/beats/v7/libbeat/reader" ) @@ -131,3 +133,12 @@ func (cr *counterReader) resetState() { func (cr *counterReader) setState(next func(cr *counterReader) (reader.Message, error)) { cr.state = next } + +func (cr *counterReader) Close() error { + cr.setState((*counterReader).readClosed) + return cr.reader.Close() +} + +func (cr *counterReader) readClosed() (reader.Message, error) { + return reader.Message{}, io.EOF +} diff --git a/libbeat/reader/multiline/multiline_test.go b/libbeat/reader/multiline/multiline_test.go index 2297fbc98b5..41959bb31ba 100644 --- a/libbeat/reader/multiline/multiline_test.go +++ b/libbeat/reader/multiline/multiline_test.go @@ -22,6 +22,7 @@ package multiline import ( "bytes" "errors" + "io/ioutil" "os" "strings" "testing" @@ -333,7 +334,7 @@ func createMultilineTestReader(t *testing.T, in *bytes.Buffer, cfg Config) reade } var r reader.Reader - r, err = readfile.NewEncodeReader(in, readfile.Config{ + r, err = readfile.NewEncodeReader(ioutil.NopCloser(in), readfile.Config{ Codec: enc, BufferSize: 4096, Terminator: readfile.LineFeed, diff --git a/libbeat/reader/multiline/pattern.go b/libbeat/reader/multiline/pattern.go index 116af80135a..93cd390da4d 100644 --- a/libbeat/reader/multiline/pattern.go +++ b/libbeat/reader/multiline/pattern.go @@ -20,6 +20,7 @@ package multiline import ( "errors" "fmt" + "io" "time" "github.com/elastic/beats/v7/libbeat/common/match" @@ -254,6 +255,15 @@ func (pr *patternReader) setState(next func(pr *patternReader) (reader.Message, pr.state = next } +func (pr *patternReader) Close() error { + pr.setState((*patternReader).readClosed) + return pr.reader.Close() +} + +func (pr *patternReader) readClosed() (reader.Message, error) { + return reader.Message{}, io.EOF +} + // matchers func afterMatcher(pat match.Matcher) (matcher, error) { return genPatternMatcher(pat, func(last, current []byte) []byte { diff --git a/libbeat/reader/reader.go b/libbeat/reader/reader.go index 5aadcd611eb..81ae4ad8241 100644 --- a/libbeat/reader/reader.go +++ b/libbeat/reader/reader.go @@ -19,6 +19,7 @@ package reader import ( "errors" + "io" ) // Reader is the interface that wraps the basic Next method for @@ -26,6 +27,7 @@ import ( // Next returns the message being read or and error. EOF is returned // if reader will not return any new message on subsequent calls. type Reader interface { + io.Closer Next() (Message, error) } diff --git a/libbeat/reader/readfile/encode.go b/libbeat/reader/readfile/encode.go index b5b526ad361..84442d9d0d5 100644 --- a/libbeat/reader/readfile/encode.go +++ b/libbeat/reader/readfile/encode.go @@ -43,7 +43,7 @@ type Config struct { // New creates a new Encode reader from input reader by applying // the given codec. -func NewEncodeReader(r io.Reader, config Config) (EncoderReader, error) { +func NewEncodeReader(r io.ReadCloser, config Config) (EncoderReader, error) { eReader, err := NewLineReader(r, config) return EncoderReader{eReader}, err } @@ -59,3 +59,7 @@ func (r EncoderReader) Next() (reader.Message, error) { Bytes: sz, }, err } + +func (r EncoderReader) Close() error { + return r.reader.Close() +} diff --git a/libbeat/reader/readfile/encode_test.go b/libbeat/reader/readfile/encode_test.go index 9d6205c229f..6c65c48846e 100644 --- a/libbeat/reader/readfile/encode_test.go +++ b/libbeat/reader/readfile/encode_test.go @@ -19,6 +19,7 @@ package readfile import ( "bytes" + "io/ioutil" "testing" "github.com/stretchr/testify/assert" @@ -46,7 +47,7 @@ func TestEncodeLines(t *testing.T) { for name, testCase := range testCases { t.Run(name, func(t *testing.T) { - r := bytes.NewReader(testCase.Input) + r := ioutil.NopCloser(bytes.NewReader(testCase.Input)) codec, err := encFactory(r) assert.Nil(t, err, "failed to initialize encoding: %v", err) diff --git a/libbeat/reader/readfile/limit.go b/libbeat/reader/readfile/limit.go index d547ff09dfe..fb907ba06a3 100644 --- a/libbeat/reader/readfile/limit.go +++ b/libbeat/reader/readfile/limit.go @@ -49,3 +49,7 @@ func (r *LimitReader) Next() (reader.Message, error) { } return message, err } + +func (r *LimitReader) Close() error { + return r.reader.Close() +} diff --git a/libbeat/reader/readfile/limit_test.go b/libbeat/reader/readfile/limit_test.go index e310e151cf8..08639495952 100644 --- a/libbeat/reader/readfile/limit_test.go +++ b/libbeat/reader/readfile/limit_test.go @@ -37,6 +37,8 @@ func (m *mockReader) Next() (reader.Message, error) { }, nil } +func (m *mockReader) Close() error { return nil } + var limitTests = []struct { line string maxBytes int diff --git a/libbeat/reader/readfile/line.go b/libbeat/reader/readfile/line.go index e9ba491f483..3d10eb7664d 100644 --- a/libbeat/reader/readfile/line.go +++ b/libbeat/reader/readfile/line.go @@ -34,7 +34,7 @@ const unlimited = 0 // using the configured codec. The reader keeps track of bytes consumed // from raw input stream for every decoded line. type LineReader struct { - reader io.Reader + reader io.ReadCloser bufferSize int maxBytes int // max bytes per line limit to avoid OOM with malformatted files nl []byte @@ -48,7 +48,7 @@ type LineReader struct { } // New creates a new reader object -func NewLineReader(input io.Reader, config Config) (*LineReader, error) { +func NewLineReader(input io.ReadCloser, config Config) (*LineReader, error) { encoder := config.Codec.NewEncoder() // Create newline char based on encoding @@ -271,3 +271,7 @@ func (r *LineReader) decode(end int) (int, error) { r.byteCount += start return start, err } + +func (r *LineReader) Close() error { + return r.reader.Close() +} diff --git a/libbeat/reader/readfile/line_test.go b/libbeat/reader/readfile/line_test.go index 10a1ff958b5..d91544162c5 100644 --- a/libbeat/reader/readfile/line_test.go +++ b/libbeat/reader/readfile/line_test.go @@ -23,6 +23,7 @@ import ( "bytes" "encoding/hex" "io" + "io/ioutil" "math/rand" "strings" "testing" @@ -97,7 +98,7 @@ func TestReaderEncodings(t *testing.T) { } // create line reader - reader, err := NewLineReader(buffer, Config{codec, 1024, LineFeed, unlimited}) + reader, err := NewLineReader(ioutil.NopCloser(buffer), Config{codec, 1024, LineFeed, unlimited}) if err != nil { t.Fatal("failed to initialize reader:", err) } @@ -157,7 +158,7 @@ func TestLineTerminators(t *testing.T) { buffer.Write([]byte("this is my second line")) buffer.Write(nl) - reader, err := NewLineReader(buffer, Config{codec, 1024, terminator, unlimited}) + reader, err := NewLineReader(ioutil.NopCloser(buffer), Config{codec, 1024, terminator, unlimited}) if err != nil { t.Errorf("failed to initialize reader: %v", err) continue @@ -229,7 +230,7 @@ func testReadLines(t *testing.T, inputLines [][]byte) { // initialize reader buffer := bytes.NewBuffer(inputStream) codec, _ := encoding.Plain(buffer) - reader, err := NewLineReader(buffer, Config{codec, buffer.Len(), LineFeed, unlimited}) + reader, err := NewLineReader(ioutil.NopCloser(buffer), Config{codec, buffer.Len(), LineFeed, unlimited}) if err != nil { t.Fatalf("Error initializing reader: %v", err) } @@ -349,7 +350,7 @@ func TestMaxBytesLimit(t *testing.T) { } // Create line reader - reader, err := NewLineReader(strings.NewReader(input), Config{codec, bufferSize, LineFeed, lineMaxLimit}) + reader, err := NewLineReader(ioutil.NopCloser(strings.NewReader(input)), Config{codec, bufferSize, LineFeed, lineMaxLimit}) if err != nil { t.Fatal("failed to initialize reader:", err) } diff --git a/libbeat/reader/readfile/strip_newline.go b/libbeat/reader/readfile/strip_newline.go index 97cc005da92..6c5e5c513d2 100644 --- a/libbeat/reader/readfile/strip_newline.go +++ b/libbeat/reader/readfile/strip_newline.go @@ -81,3 +81,7 @@ func (p *StripNewline) autoLineEndingChars(l []byte) int { } return 1 } + +func (p *StripNewline) Close() error { + return p.reader.Close() +} diff --git a/libbeat/reader/readfile/timeout.go b/libbeat/reader/readfile/timeout.go index fd3d1c8ba7c..8e6f7f06023 100644 --- a/libbeat/reader/readfile/timeout.go +++ b/libbeat/reader/readfile/timeout.go @@ -19,6 +19,7 @@ package readfile import ( "errors" + "io" "time" "github.com/elastic/beats/v7/libbeat/reader" @@ -36,6 +37,7 @@ type TimeoutReader struct { signal error running bool ch chan lineMessage + done chan struct{} } type lineMessage struct { @@ -54,6 +56,7 @@ func NewTimeoutReader(reader reader.Reader, signal error, t time.Duration) *Time signal: signal, timeout: t, ch: make(chan lineMessage, 1), + done: make(chan struct{}), } } @@ -68,9 +71,13 @@ func (r *TimeoutReader) Next() (reader.Message, error) { go func() { for { message, err := r.reader.Next() - r.ch <- lineMessage{message, err} - if err != nil { - break + select { + case <-r.done: + return + case r.ch <- lineMessage{message, err}: + if err != nil { + return + } } } }() @@ -85,5 +92,13 @@ func (r *TimeoutReader) Next() (reader.Message, error) { return msg.line, msg.err case <-timer.C: return reader.Message{}, r.signal + case <-r.done: + return reader.Message{}, io.EOF } } + +func (r *TimeoutReader) Close() error { + close(r.done) + + return r.reader.Close() +} diff --git a/libbeat/reader/readjson/docker_json.go b/libbeat/reader/readjson/docker_json.go index c38e0a1a51c..59dded97ec3 100644 --- a/libbeat/reader/readjson/docker_json.go +++ b/libbeat/reader/readjson/docker_json.go @@ -244,3 +244,7 @@ func stripNewLineWin(msg *reader.Message) { return r == '\n' || r == '\r' }) } + +func (p *DockerJSONReader) Close() error { + return p.reader.Close() +} diff --git a/libbeat/reader/readjson/docker_json_test.go b/libbeat/reader/readjson/docker_json_test.go index 23cc862d964..2c9e2e71104 100644 --- a/libbeat/reader/readjson/docker_json_test.go +++ b/libbeat/reader/readjson/docker_json_test.go @@ -365,3 +365,5 @@ func (m *mockReader) Next() (reader.Message, error) { Bytes: len(message), }, nil } + +func (m *mockReader) Close() error { return nil } diff --git a/libbeat/reader/readjson/json.go b/libbeat/reader/readjson/json.go index aca7c535148..b2c0e5e028f 100644 --- a/libbeat/reader/readjson/json.go +++ b/libbeat/reader/readjson/json.go @@ -111,6 +111,10 @@ func (r *JSONReader) Next() (reader.Message, error) { return message, nil } +func (r *JSONReader) Close() error { + return r.reader.Close() +} + func createJSONError(message string) common.MapStr { return common.MapStr{"message": message, "type": "json"} }