-
Notifications
You must be signed in to change notification settings - Fork 4.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[libbeat] Is there a goroutine leak in libbeat? #19193
Comments
Pinging @elastic/integrations-services (Team:Services) |
Thanks for investigating. This indeed looks like a go-routine leak. The internal go-routine quits on error (e.g. EOF), but in case the input is closed early we leak the routine. Edit: checking the code I think the LineReader would propagate an error forcing this routine to shut down when the input quits. I wonder if we can reproduce the issue with one file only. |
@urso Thanks for remind. Today, in order to reproduce, i did a test with only one file. I edit the // Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package readfile
import (
"errors"
"fmt"
"sync/atomic"
"time"
"github.com/elastic/beats/v7/libbeat/reader"
)
var (
errTimeout = errors.New("timeout")
tid uint64
)
// TimeoutReader will signal some configurable timeout error if no
// new line can be returned in time.
type TimeoutReader struct {
id uint64
reader reader.Reader
timeout time.Duration
signal error
running bool
ch chan lineMessage
}
type lineMessage struct {
line reader.Message
err error
}
// NewTimeoutReader returns a new timeout reader from an input line reader.
func NewTimeoutReader(reader reader.Reader, signal error, t time.Duration) *TimeoutReader {
if signal == nil {
signal = errTimeout
}
atomic.AddUint64(&tid, 1)
return &TimeoutReader{
id: tid,
reader: reader,
signal: signal,
timeout: t,
ch: make(chan lineMessage, 1),
}
}
var exit int64
func init() {
go func() {
for {
fmt.Printf("************the number of Next.Func1=%d************\n", exit)
time.Sleep(time.Second * 5)
}
}()
}
// Next returns the next line. If no line was returned before timeout, the
// configured timeout error is returned.
// For handline timeouts a goroutine is started for reading lines from
// configured line reader. Only when underlying reader returns an error, the
// goroutine will be finished.
func (r *TimeoutReader) Next() (reader.Message, error) {
if !r.running {
r.running = true
go func() {
atomic.AddInt64(&exit, 1)
fmt.Printf("*******<TimeoutReader: %d>'s routine start*********\n", r.id)
for {
message, err := r.reader.Next()
r.ch <- lineMessage{message, err}
if err != nil {
break
}
}
fmt.Printf("*******<TimeoutReader: %d>'s routine end*********\n", r.id)
atomic.AddInt64(&exit, -1)
}()
}
timer := time.NewTimer(r.timeout)
select {
case msg := <-r.ch:
if msg.err != nil {
r.running = false
}
timer.Stop()
return msg.line, msg.err
case <-timer.C:
return reader.Message{}, r.signal
}
} And i find something strange. paths: ./data
logging:
level: info
to_stderr: true
filebeat.inputs:
- type: container
enabled: true
paths: ${INPUTS_CONTAINER_PATH:/tmp/containers/f004a2d52d22fac5d68dfec0dd48a68f2db4f649a41b94e34a7e4b900a29ec5d/f004a2d52d22fac5d68dfec0dd48a68f2db4f649a41b94e34a7e4b900a29ec5d-json.log}
stream: ${INPUTS_CONTAINER_STREAM:all}
encoding: ${INPUTS_CONTAINER_ENCODING:utf-8}
ignore_older: ${INPUTS_CONTAINER_IGNORE_OLDER:2h}
max_bytes: ${INPUTS_CONTAINER_MAX_BYTES:51200}
multiline.pattern: '${TERMINUS_PATTERN:^\d{4}-\d{2}-\d{2}[^\d]+\d{2}:\d{2}:\d{2}[^\s]+\s+([Aa]lert|ALERT|[Tt]race|TRACE|[Dd]ebug|DEBUG|[Nn]otice|NOTICE|[Ii]nfo|INFO|[Ww]arn(?:ing)?|WARN(?:ING)?|[Ee]rr(?:or)?|ERR(?:OR)?|[Cc]rit(?:ical)?|CRIT(?:ICAL)?|[Ff]atal|FATAL|[Ss]evere|SEVERE|[Ee]merg(?:ency)?|EMERG(?:ENCY)?)[\s-]+\[(.*?)][\s-]+}'
multiline.negate: false
multiline.match: after
multiline.max_lines: ${INPUTS_CONTAINER_MULTILINE_MAX_LINES:500}
multiline.timeout: ${INPUTS_CONTAINER_MULTILINE_TIMEOUT:1s}
close_inactive: 3m
close_removed: true
close_timeout: 10s
clean_removed: true
queue:
mem:
events: ${QUEUE_MEM_EVENTS:1024}
flush.min_events: ${QUEUE_MEM_FLUSH_MIN_EVENTS:512}
flush.timeout: ${QUEUE_MEM_FLUSH_TIMEOUT:10s}
processors:
output.file:
path: /tmp/filebeat |
Hi all, i think i may found the reason of goroutine leak. For verification, i use a another goroutine to run the channel send. As a result, the func (r *TimeoutReader) Next() (reader.Message, error) {
if !r.running {
r.running = true
go func() {
for {
message, err := r.reader.Next()
go func() {
r.ch <- lineMessage{message, err}
}()
if err != nil {
break
}
}
}()
}
timer := time.NewTimer(r.timeout)
select {
case msg := <-r.ch:
if msg.err != nil {
r.running = false
}
timer.Stop()
return msg.line, msg.err
case <-timer.C:
return reader.Message{}, r.signal
}
} |
Thanks for investigating and all the details you shared. I think the issue is very clear now. Thinking about this and other potential issues I'd say the reader interface should also require |
## What does this PR do? This PR changes the `reader.Reader` interface to require readers implement a `Close` function. ## Why is it important? This lets Filebeat clean-up the goroutine in `TimeoutReader`. ## Related issues Closes #19193
This PR changes the `reader.Reader` interface to require readers implement a `Close` function. This lets Filebeat clean-up the goroutine in `TimeoutReader`. Closes elastic#19193 (cherry picked from commit 315a17e)
## What does this PR do? This PR changes the `reader.Reader` interface to require readers implement a `Close` function. ## Why is it important? This lets Filebeat clean-up the goroutine in `TimeoutReader`. ## Related issues Closes elastic#19193
Version: 7.6.2+
Hi all!
I'm doing a pressure test for filebeat with docker compose, but I found
github.com/elastic/beats/libbeat/reader/readfile.(*TimeoutReader).Next.func1
is continuous growing, just like a goroutine leak.The presure test condition is described as follows:
this is memory, cpu utilization and the gouroutine pprof:
this is the goroutine compares, and the duration is about 30minutes.
Then, inorder to verify my hypothesis, i add a variable to monitor the goroutine's number and do a test on my Mac. i change the
timeout.go
, the code is like this:Run filebeat, the variable
exitNum
don't decrease when theclose_timeout
mechanism triggerd. the output log is like this:and this is my
filebeat.yml
:The text was updated successfully, but these errors were encountered: