Skip to content

Commit

Permalink
Add limit to number of undelivered lines to read ahead in tail (influ…
Browse files Browse the repository at this point in the history
  • Loading branch information
danielnelson authored and Mathieu Lecarme committed Apr 17, 2020
1 parent 6508808 commit 2b60f02
Show file tree
Hide file tree
Showing 3 changed files with 108 additions and 46 deletions.
18 changes: 12 additions & 6 deletions plugins/inputs/tail/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,11 @@ see http://man7.org/linux/man-pages/man1/tail.1.html for more details.
The plugin expects messages in one of the
[Telegraf Input Data Formats](https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md).

### Configuration:
### Configuration

```toml
# Stream a log file, like the tail -f command
[[inputs.tail]]
## files to tail.
## File names or a pattern to tail.
## These accept standard unix glob matching rules, but with the addition of
## ** as a "super asterisk". ie:
## "/var/log/**.log" -> recursively find all .log files in /var/log
Expand All @@ -34,22 +33,29 @@ The plugin expects messages in one of the
## See https://github.com/gobwas/glob for more examples
##
files = ["/var/mymetrics.out"]

## Read file from beginning.
from_beginning = false
# from_beginning = false

## Whether file is a named pipe
pipe = false
# pipe = false

## Method used to watch for file updates. Can be either "inotify" or "poll".
# watch_method = "inotify"

## Maximum lines of the file to process that have not yet be written by the
## output. For best throughput set based on the number of metrics on each
## line and the size of the output's metric_batch_size.
# max_undelivered_lines = 1000

## Data format to consume.
## Each data format has its own unique set of configuration options, read
## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
data_format = "influx"
```

### Metrics:
### Metrics

Metrics are produced according to the `data_format` option. Additionally a
tag labeled `path` is added to the metric containing the filename being tailed.
98 changes: 68 additions & 30 deletions plugins/inputs/tail/tail.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
package tail

import (
"context"
"errors"
"strings"
"sync"

Expand All @@ -15,29 +17,34 @@ import (
)

const (
defaultWatchMethod = "inotify"
defaultWatchMethod = "inotify"
defaultMaxUndeliveredLines = 1000
)

var (
offsets = make(map[string]int64)
offsetsMutex = new(sync.Mutex)
)

type Tail struct {
Files []string
FromBeginning bool
Pipe bool
WatchMethod string
type empty struct{}
type semaphore chan empty

Log telegraf.Logger
type Tail struct {
Files []string `toml:"files"`
FromBeginning bool `toml:"from_beginning"`
Pipe bool `toml:"pipe"`
WatchMethod string `toml:"watch_method"`
MaxUndeliveredLines int `toml:"max_undelivered_lines"`

Log telegraf.Logger `toml:"-"`
tailers map[string]*tail.Tail
offsets map[string]int64
parserFunc parsers.ParserFunc
wg sync.WaitGroup
acc telegraf.Accumulator

sync.Mutex
ctx context.Context
cancel context.CancelFunc
acc telegraf.TrackingAccumulator
sem semaphore
}

func NewTail() *Tail {
Expand All @@ -49,13 +56,14 @@ func NewTail() *Tail {
offsetsMutex.Unlock()

return &Tail{
FromBeginning: false,
offsets: offsetsCopy,
FromBeginning: false,
MaxUndeliveredLines: 1000,
offsets: offsetsCopy,
}
}

const sampleConfig = `
## files to tail.
## File names or a pattern to tail.
## These accept standard unix glob matching rules, but with the addition of
## ** as a "super asterisk". ie:
## "/var/log/**.log" -> recursively find all .log files in /var/log
Expand All @@ -65,14 +73,21 @@ const sampleConfig = `
## See https://github.com/gobwas/glob for more examples
##
files = ["/var/mymetrics.out"]
## Read file from beginning.
from_beginning = false
# from_beginning = false
## Whether file is a named pipe
pipe = false
# pipe = false
## Method used to watch for file updates. Can be either "inotify" or "poll".
# watch_method = "inotify"
## Maximum lines of the file to process that have not yet be written by the
## output. For best throughput set based on the number of metrics on each
## line and the size of the output's metric_batch_size.
# max_undelivered_lines = 1000
## Data format to consume.
## Each data format has its own unique set of configuration options, read
## more about them here:
Expand All @@ -88,18 +103,36 @@ func (t *Tail) Description() string {
return "Stream a log file, like the tail -f command"
}

func (t *Tail) Gather(acc telegraf.Accumulator) error {
t.Lock()
defer t.Unlock()
func (t *Tail) Init() error {
if t.MaxUndeliveredLines == 0 {
return errors.New("max_undelivered_lines must be positive")
}
t.sem = make(semaphore, t.MaxUndeliveredLines)
return nil
}

func (t *Tail) Gather(acc telegraf.Accumulator) error {
return t.tailNewFiles(true)
}

func (t *Tail) Start(acc telegraf.Accumulator) error {
t.Lock()
defer t.Unlock()
t.acc = acc.WithTracking(t.MaxUndeliveredLines)

t.ctx, t.cancel = context.WithCancel(context.Background())

t.wg.Add(1)
go func() {
defer t.wg.Done()
for {
select {
case <-t.ctx.Done():
return
case <-t.acc.Delivered():
<-t.sem
}
}
}()

t.acc = acc
t.tailers = make(map[string]*tail.Tail)

err := t.tailNewFiles(t.FromBeginning)
Expand Down Expand Up @@ -175,6 +208,12 @@ func (t *Tail) tailNewFiles(fromBeginning bool) error {
go func() {
defer t.wg.Done()
t.receiver(parser, tailer)

t.Log.Debugf("Tail removed for %q", tailer.Filename)

if err := tailer.Err(); err != nil {
t.Log.Errorf("Tailing %q: %s", tailer.Filename, err.Error())
}
}()
t.tailers[tailer.Filename] = tailer
}
Expand Down Expand Up @@ -229,21 +268,19 @@ func (t *Tail) receiver(parser parsers.Parser, tailer *tail.Tail) {

for _, metric := range metrics {
metric.AddTag("path", tailer.Filename)
t.acc.AddMetric(metric)
}
}

t.Log.Debugf("Tail removed for %q", tailer.Filename)

if err := tailer.Err(); err != nil {
t.Log.Errorf("Tailing %q: %s", tailer.Filename, err.Error())
// Block until plugin is stopping or room is available to add metrics.
select {
case <-t.ctx.Done():
return
case t.sem <- empty{}:
t.acc.AddTrackingMetricGroup(metrics)
}
}
}

func (t *Tail) Stop() {
t.Lock()
defer t.Unlock()

for _, tailer := range t.tailers {
if !t.Pipe && !t.FromBeginning {
// store offset for resume
Expand All @@ -260,6 +297,7 @@ func (t *Tail) Stop() {
}
}

t.cancel()
t.wg.Wait()

// persist offsets
Expand Down
38 changes: 28 additions & 10 deletions plugins/inputs/tail/tail_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ func TestTailFromBeginning(t *testing.T) {
tmpfile, err := ioutil.TempFile("", "")
require.NoError(t, err)
defer os.Remove(tmpfile.Name())
defer tmpfile.Close()
_, err = tmpfile.WriteString("cpu,mytag=foo usage_idle=100\n")
require.NoError(t, err)

Expand All @@ -34,11 +35,13 @@ func TestTailFromBeginning(t *testing.T) {
tt.FromBeginning = true
tt.Files = []string{tmpfile.Name()}
tt.SetParserFunc(parsers.NewInfluxParser)
defer tt.Stop()
defer tmpfile.Close()

err = tt.Init()
require.NoError(t, err)

acc := testutil.Accumulator{}
require.NoError(t, tt.Start(&acc))
defer tt.Stop()
require.NoError(t, acc.GatherError(tt.Gather))

acc.Wait(1)
Expand All @@ -60,18 +63,21 @@ func TestTailFromEnd(t *testing.T) {
tmpfile, err := ioutil.TempFile("", "")
require.NoError(t, err)
defer os.Remove(tmpfile.Name())
defer tmpfile.Close()
_, err = tmpfile.WriteString("cpu,mytag=foo usage_idle=100\n")
require.NoError(t, err)

tt := NewTail()
tt.Log = testutil.Logger{}
tt.Files = []string{tmpfile.Name()}
tt.SetParserFunc(parsers.NewInfluxParser)
defer tt.Stop()
defer tmpfile.Close()

err = tt.Init()
require.NoError(t, err)

acc := testutil.Accumulator{}
require.NoError(t, tt.Start(&acc))
defer tt.Stop()
for _, tailer := range tt.tailers {
for n, err := tailer.Tell(); err == nil && n == 0; n, err = tailer.Tell() {
// wait for tailer to jump to end
Expand Down Expand Up @@ -99,17 +105,20 @@ func TestTailBadLine(t *testing.T) {
tmpfile, err := ioutil.TempFile("", "")
require.NoError(t, err)
defer os.Remove(tmpfile.Name())
defer tmpfile.Close()

tt := NewTail()
tt.Log = testutil.Logger{}
tt.FromBeginning = true
tt.Files = []string{tmpfile.Name()}
tt.SetParserFunc(parsers.NewInfluxParser)
defer tt.Stop()
defer tmpfile.Close()

err = tt.Init()
require.NoError(t, err)

acc := testutil.Accumulator{}
require.NoError(t, tt.Start(&acc))
defer tt.Stop()

buf := &bytes.Buffer{}
log.SetOutput(buf)
Expand All @@ -127,6 +136,7 @@ func TestTailDosLineendings(t *testing.T) {
tmpfile, err := ioutil.TempFile("", "")
require.NoError(t, err)
defer os.Remove(tmpfile.Name())
defer tmpfile.Close()
_, err = tmpfile.WriteString("cpu usage_idle=100\r\ncpu2 usage_idle=200\r\n")
require.NoError(t, err)

Expand All @@ -135,11 +145,13 @@ func TestTailDosLineendings(t *testing.T) {
tt.FromBeginning = true
tt.Files = []string{tmpfile.Name()}
tt.SetParserFunc(parsers.NewInfluxParser)
defer tt.Stop()
defer tmpfile.Close()

err = tt.Init()
require.NoError(t, err)

acc := testutil.Accumulator{}
require.NoError(t, tt.Start(&acc))
defer tt.Stop()
require.NoError(t, acc.GatherError(tt.Gather))

acc.Wait(2)
Expand Down Expand Up @@ -180,11 +192,14 @@ cpu,42
TimeFunc: func() time.Time { return time.Unix(0, 0) },
}, nil
})
defer plugin.Stop()

err = plugin.Init()
require.NoError(t, err)

acc := testutil.Accumulator{}
err = plugin.Start(&acc)
require.NoError(t, err)
defer plugin.Stop()
err = plugin.Gather(&acc)
require.NoError(t, err)
acc.Wait(2)
Expand Down Expand Up @@ -237,11 +252,14 @@ func TestMultipleMetricsOnFirstLine(t *testing.T) {
MetricName: "cpu",
})
})
defer plugin.Stop()

err = plugin.Init()
require.NoError(t, err)

acc := testutil.Accumulator{}
err = plugin.Start(&acc)
require.NoError(t, err)
defer plugin.Stop()
err = plugin.Gather(&acc)
require.NoError(t, err)
acc.Wait(2)
Expand Down

0 comments on commit 2b60f02

Please sign in to comment.