Skip to content

Commit

Permalink
Fix parsing multiple metrics on the first line of tailed file (influx…
Browse files Browse the repository at this point in the history
…data#6289)

Fix unit test

Signed-off-by: Dani Louca <dlouca@splunk.com>
  • Loading branch information
danielnelson authored and dloucasfx committed Apr 14, 2020
1 parent 6ac3361 commit 37c8536
Show file tree
Hide file tree
Showing 2 changed files with 161 additions and 36 deletions.
76 changes: 42 additions & 34 deletions plugins/inputs/tail/tail.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@ import (
"sync"

"github.com/influxdata/tail"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal/globpath"
"github.com/influxdata/telegraf/plugins/inputs"
"github.com/influxdata/telegraf/plugins/parsers"
"github.com/influxdata/telegraf/plugins/parsers/csv"
)

const (
Expand Down Expand Up @@ -141,56 +141,64 @@ func (t *Tail) tailNewFiles(fromBeginning bool) error {

// create a goroutine for each "tailer"
t.wg.Add(1)
go t.receiver(parser, tailer)
go func() {
defer t.wg.Done()
t.receiver(parser, tailer)
}()
t.tailers[tailer.Filename] = tailer
}
}
return nil
}

// this is launched as a goroutine to continuously watch a tailed logfile
// ParseLine parses a line of text.
func parseLine(parser parsers.Parser, line string, firstLine bool) ([]telegraf.Metric, error) {
switch parser.(type) {
case *csv.Parser:
// The csv parser parses headers in Parse and skips them in ParseLine.
// As a temporary solution call Parse only when getting the first
// line from the file.
if firstLine {
return parser.Parse([]byte(line))
} else {
m, err := parser.ParseLine(line)
if err != nil {
return nil, err
}

if m != nil {
return []telegraf.Metric{m}, nil
}
return []telegraf.Metric{}, nil
}
default:
return parser.Parse([]byte(line))
}
}

// Receiver is launched as a goroutine to continuously watch a tailed logfile
// for changes, parse any incoming msgs, and add to the accumulator.
func (t *Tail) receiver(parser parsers.Parser, tailer *tail.Tail) {
defer t.wg.Done()

var firstLine = true
var metrics []telegraf.Metric
var m telegraf.Metric
var err error
var line *tail.Line
for line = range tailer.Lines {
for line := range tailer.Lines {
if line.Err != nil {
t.acc.AddError(fmt.Errorf("E! Error tailing file %s, Error: %s\n",
tailer.Filename, err))
t.acc.AddError(fmt.Errorf("error tailing file %s, Error: %s", tailer.Filename, line.Err))
continue
}
// Fix up files with Windows line endings.
text := strings.TrimRight(line.Text, "\r")

if firstLine {
metrics, err = parser.Parse([]byte(text))
if err == nil {
if len(metrics) == 0 {
firstLine = false
continue
} else {
m = metrics[0]
}
}
firstLine = false
} else {
m, err = parser.ParseLine(text)
metrics, err := parseLine(parser, text, firstLine)
if err != nil {
t.acc.AddError(fmt.Errorf("malformed log line in %s: [%s], Error: %s",
tailer.Filename, line.Text, err))
continue
}
firstLine = false

if err == nil {
if m != nil {
tags := m.Tags()
tags["path"] = tailer.Filename
t.acc.AddFields(m.Name(), m.Fields(), tags, m.Time())
}
} else {
t.acc.AddError(fmt.Errorf("E! Malformed log line in %s: [%s], Error: %s\n",
tailer.Filename, line.Text, err))
for _, metric := range metrics {
metric.AddTag("path", tailer.Filename)
t.acc.AddMetric(metric)
}
}

Expand Down
121 changes: 119 additions & 2 deletions plugins/inputs/tail/tail_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,13 @@ import (
"os"
"runtime"
"testing"
"time"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/parsers"
"github.com/influxdata/telegraf/plugins/parsers/csv"
"github.com/influxdata/telegraf/plugins/parsers/json"
"github.com/influxdata/telegraf/testutil"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -108,7 +111,7 @@ func TestTailBadLine(t *testing.T) {
require.NoError(t, err)

acc.WaitError(1)
assert.Contains(t, acc.Errors[0].Error(), "E! Malformed log line")
assert.Contains(t, acc.Errors[0].Error(), "malformed log line")
}

func TestTailDosLineendings(t *testing.T) {
Expand Down Expand Up @@ -139,3 +142,117 @@ func TestTailDosLineendings(t *testing.T) {
"usage_idle": float64(200),
})
}

// The csv parser should only parse the header line once per file.
func TestCSVHeadersParsedOnce(t *testing.T) {
tmpfile, err := ioutil.TempFile("", "")
require.NoError(t, err)
defer func() {
tmpfile.Close()
os.Remove(tmpfile.Name())
}()

_, err = tmpfile.WriteString(`
measurement,time_idle
cpu,42
cpu,42
`)
require.NoError(t, err)

plugin := NewTail()
plugin.FromBeginning = true
plugin.Files = []string{tmpfile.Name()}
plugin.SetParserFunc(func() (parsers.Parser, error) {
return &csv.Parser{
MeasurementColumn: "measurement",
HeaderRowCount: 1,
TimeFunc: func() time.Time { return time.Unix(0, 0) },
}, nil
})
defer plugin.Stop()

acc := testutil.Accumulator{}
err = plugin.Start(&acc)
require.NoError(t, err)
err = plugin.Gather(&acc)
require.NoError(t, err)
acc.Wait(2)
plugin.Stop()

expected := []telegraf.Metric{
testutil.MustMetric("cpu",
map[string]string{
"path": tmpfile.Name(),
},
map[string]interface{}{
"time_idle": 42,
"measurement": "cpu",
},
time.Unix(0, 0)),
testutil.MustMetric("cpu",
map[string]string{
"path": tmpfile.Name(),
},
map[string]interface{}{
"time_idle": 42,
"measurement": "cpu",
},
time.Unix(0, 0)),
}
testutil.RequireMetricsEqual(t, expected, acc.GetTelegrafMetrics())
}

// Ensure that the first line can produce multiple metrics (#6138)
func TestMultipleMetricsOnFirstLine(t *testing.T) {
tmpfile, err := ioutil.TempFile("", "")
require.NoError(t, err)
defer func() {
tmpfile.Close()
os.Remove(tmpfile.Name())
}()

_, err = tmpfile.WriteString(`
[{"time_idle": 42}, {"time_idle": 42}]
`)
require.NoError(t, err)

plugin := NewTail()
plugin.FromBeginning = true
plugin.Files = []string{tmpfile.Name()}
plugin.SetParserFunc(func() (parsers.Parser, error) {
return json.New(
&json.Config{
MetricName: "cpu",
})
})
defer plugin.Stop()

acc := testutil.Accumulator{}
err = plugin.Start(&acc)
require.NoError(t, err)
err = plugin.Gather(&acc)
require.NoError(t, err)
acc.Wait(2)
plugin.Stop()

expected := []telegraf.Metric{
testutil.MustMetric("cpu",
map[string]string{
"path": tmpfile.Name(),
},
map[string]interface{}{
"time_idle": 42.0,
},
time.Unix(0, 0)),
testutil.MustMetric("cpu",
map[string]string{
"path": tmpfile.Name(),
},
map[string]interface{}{
"time_idle": 42.0,
},
time.Unix(0, 0)),
}
testutil.RequireMetricsEqual(t, expected, acc.GetTelegrafMetrics(),
testutil.IgnoreTime())
}

0 comments on commit 37c8536

Please sign in to comment.