Skip to content

Commit

Permalink
Fix inconsistency with input error counting (influxdata#7077)
Browse files Browse the repository at this point in the history
  • Loading branch information
ssoroka authored and idohalevi committed Sep 23, 2020
1 parent 4da1a63 commit fc98fbf
Show file tree
Hide file tree
Showing 11 changed files with 115 additions and 85 deletions.
10 changes: 2 additions & 8 deletions agent/accumulator.go
Original file line number Diff line number Diff line change
@@ -1,21 +1,16 @@
package agent

import (
"log"
"time"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/metric"
"github.com/influxdata/telegraf/selfstat"
)

var (
NErrors = selfstat.Register("agent", "gather_errors", map[string]string{})
)

type MetricMaker interface {
LogName() string
MakeMetric(metric telegraf.Metric) telegraf.Metric
Log() telegraf.Logger
}

type accumulator struct {
Expand Down Expand Up @@ -110,8 +105,7 @@ func (ac *accumulator) AddError(err error) {
if err == nil {
return
}
NErrors.Incr(1)
log.Printf("E! [%s] Error in plugin: %v", ac.maker.LogName(), err)
ac.maker.Log().Errorf("Error in plugin: %v", err)
}

func (ac *accumulator) SetPrecision(precision time.Duration) {
Expand Down
6 changes: 5 additions & 1 deletion agent/accumulator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"time"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal/models"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -59,7 +60,6 @@ func TestAccAddError(t *testing.T) {
a.AddError(fmt.Errorf("baz"))

errs := bytes.Split(errBuf.Bytes(), []byte{'\n'})
assert.EqualValues(t, int64(3), NErrors.Get())
require.Len(t, errs, 4) // 4 because of trailing newline
assert.Contains(t, string(errs[0]), "TestPlugin")
assert.Contains(t, string(errs[0]), "foo")
Expand Down Expand Up @@ -154,3 +154,7 @@ func (tm *TestMetricMaker) LogName() string {
func (tm *TestMetricMaker) MakeMetric(metric telegraf.Metric) telegraf.Metric {
return metric
}

func (tm *TestMetricMaker) Log() telegraf.Logger {
return models.NewLogger("TestPlugin", "test", "")
}
6 changes: 5 additions & 1 deletion agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,7 @@ func (a *Agent) Test(ctx context.Context, waitDuration time.Duration) error {
}
}

hasErrors := false
for _, input := range a.Config.Inputs {
select {
case <-ctx.Done():
Expand All @@ -215,15 +216,18 @@ func (a *Agent) Test(ctx context.Context, waitDuration time.Duration) error {
nulAcc.SetPrecision(a.Precision())
if err := input.Input.Gather(nulAcc); err != nil {
acc.AddError(err)
hasErrors = true
}

time.Sleep(500 * time.Millisecond)
if err := input.Input.Gather(acc); err != nil {
acc.AddError(err)
hasErrors = true
}
default:
if err := input.Input.Gather(acc); err != nil {
acc.AddError(err)
hasErrors = true
}
}
}
Expand All @@ -235,7 +239,7 @@ func (a *Agent) Test(ctx context.Context, waitDuration time.Duration) error {
a.stopServiceInputs()
}

if NErrors.Get() > 0 {
if hasErrors {
return fmt.Errorf("One or more input plugins had an error")
}
return nil
Expand Down
25 changes: 20 additions & 5 deletions internal/models/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,24 +5,39 @@ import (
"reflect"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/selfstat"
)

// Logger defines a logging structure for plugins.
type Logger struct {
Errs selfstat.Stat
Name string // Name is the plugin name, will be printed in the `[]`.
OnErrs []func()
Name string // Name is the plugin name, will be printed in the `[]`.
}

// NewLogger creates a new logger instance
func NewLogger(pluginType, name, alias string) *Logger {
return &Logger{
Name: logName(pluginType, name, alias),
}
}

// OnErr defines a callback that triggers only when errors are about to be written to the log
func (l *Logger) OnErr(f func()) {
l.OnErrs = append(l.OnErrs, f)
}

// Errorf logs an error message, patterned after log.Printf.
func (l *Logger) Errorf(format string, args ...interface{}) {
l.Errs.Incr(1)
for _, f := range l.OnErrs {
f()
}
log.Printf("E! ["+l.Name+"] "+format, args...)
}

// Error logs an error message, patterned after log.Print.
func (l *Logger) Error(args ...interface{}) {
l.Errs.Incr(1)
for _, f := range l.OnErrs {
f()
}
log.Print(append([]interface{}{"E! [" + l.Name + "] "}, args...)...)
}

Expand Down
60 changes: 7 additions & 53 deletions internal/models/log_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,63 +8,17 @@ import (
)

func TestErrorCounting(t *testing.T) {
iLog := Logger{Name: "inputs.test", Errs: selfstat.Register(
reg := selfstat.Register(
"gather",
"errors",
map[string]string{"input": "test"},
)}
)
iLog := Logger{Name: "inputs.test"}
iLog.OnErr(func() {
reg.Incr(1)
})
iLog.Error("something went wrong")
iLog.Errorf("something went wrong")

aLog := Logger{Name: "aggregators.test", Errs: selfstat.Register(
"aggregate",
"errors",
map[string]string{"aggregator": "test"},
)}
aLog.Name = "aggregators.test"
aLog.Error("another thing happened")

oLog := Logger{Name: "outputs.test", Errs: selfstat.Register(
"write",
"errors",
map[string]string{"output": "test"},
)}
oLog.Error("another thing happened")

pLog := Logger{Name: "processors.test", Errs: selfstat.Register(
"process",
"errors",
map[string]string{"processor": "test"},
)}
pLog.Error("another thing happened")

require.Equal(t, int64(2), iLog.Errs.Get())
require.Equal(t, int64(1), aLog.Errs.Get())
require.Equal(t, int64(1), oLog.Errs.Get())
require.Equal(t, int64(1), pLog.Errs.Get())
}

func TestLogging(t *testing.T) {
log := Logger{Name: "inputs.test", Errs: selfstat.Register(
"gather",
"errors",
map[string]string{"input": "test"},
)}

log.Errs.Set(0)

log.Debugf("something happened")
log.Debug("something happened")

log.Warnf("something happened")
log.Warn("something happened")
require.Equal(t, int64(0), log.Errs.Get())

log.Infof("something happened")
log.Info("something happened")
require.Equal(t, int64(0), log.Errs.Get())

log.Errorf("something happened")
log.Error("something happened")
require.Equal(t, int64(2), log.Errs.Get())
require.Equal(t, int64(2), reg.Get())
}
13 changes: 9 additions & 4 deletions internal/models/running_aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,11 @@ func NewRunningAggregator(aggregator telegraf.Aggregator, config *AggregatorConf
tags["alias"] = config.Alias
}

logger := &Logger{
Name: logName("aggregators", config.Name, config.Alias),
Errs: selfstat.Register("aggregate", "errors", tags),
}
aggErrorsRegister := selfstat.Register("aggregate", "errors", tags)
logger := NewLogger("aggregators", config.Name, config.Alias)
logger.OnErr(func() {
aggErrorsRegister.Incr(1)
})

setLogIfExist(aggregator, logger)

Expand Down Expand Up @@ -176,3 +177,7 @@ func (r *RunningAggregator) push(acc telegraf.Accumulator) {
elapsed := time.Since(start)
r.PushTime.Incr(elapsed.Nanoseconds())
}

func (r *RunningAggregator) Log() telegraf.Logger {
return r.log
}
19 changes: 14 additions & 5 deletions internal/models/running_input.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,10 @@ import (
"github.com/influxdata/telegraf/selfstat"
)

var GlobalMetricsGathered = selfstat.Register("agent", "metrics_gathered", map[string]string{})
var (
GlobalMetricsGathered = selfstat.Register("agent", "metrics_gathered", map[string]string{})
GlobalGatherErrors = selfstat.Register("agent", "gather_errors", map[string]string{})
)

type RunningInput struct {
Input telegraf.Input
Expand All @@ -26,10 +29,12 @@ func NewRunningInput(input telegraf.Input, config *InputConfig) *RunningInput {
tags["alias"] = config.Alias
}

logger := &Logger{
Name: logName("inputs", config.Name, config.Alias),
Errs: selfstat.Register("gather", "errors", tags),
}
inputErrorsRegister := selfstat.Register("gather", "errors", tags)
logger := NewLogger("inputs", config.Name, config.Alias)
logger.OnErr(func() {
inputErrorsRegister.Incr(1)
GlobalGatherErrors.Incr(1)
})
setLogIfExist(input, logger)

return &RunningInput{
Expand Down Expand Up @@ -116,3 +121,7 @@ func (r *RunningInput) Gather(acc telegraf.Accumulator) error {
func (r *RunningInput) SetDefaultTags(tags map[string]string) {
r.defaultTags = tags
}

func (r *RunningInput) Log() telegraf.Logger {
return r.log
}
31 changes: 31 additions & 0 deletions internal/models/running_input_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"testing"
"time"

"github.com/influxdata/telegraf/selfstat"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/metric"
"github.com/influxdata/telegraf/testutil"
Expand Down Expand Up @@ -256,6 +258,35 @@ func TestMakeMetricNameSuffix(t *testing.T) {
require.Equal(t, expected, m)
}

func TestMetricErrorCounters(t *testing.T) {
ri := NewRunningInput(&testInput{}, &InputConfig{
Name: "TestMetricErrorCounters",
})

getGatherErrors := func() int64 {
for _, r := range selfstat.Metrics() {
tag, hasTag := r.GetTag("input")
if r.Name() == "internal_gather" && hasTag && tag == "TestMetricErrorCounters" {
errCount, ok := r.GetField("errors")
if !ok {
t.Fatal("Expected error field")
}
return errCount.(int64)
}
}
return 0
}

before := getGatherErrors()

ri.Log().Error("Oh no")

after := getGatherErrors()

require.Greater(t, after, before)
require.GreaterOrEqual(t, int64(1), GlobalGatherErrors.Get())
}

type testInput struct{}

func (t *testInput) Description() string { return "" }
Expand Down
13 changes: 9 additions & 4 deletions internal/models/running_output.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,11 @@ func NewRunningOutput(
tags["alias"] = config.Alias
}

logger := &Logger{
Name: logName("outputs", config.Name, config.Alias),
Errs: selfstat.Register("write", "errors", tags),
}
writeErrorsRegister := selfstat.Register("write", "errors", tags)
logger := NewLogger("outputs", config.Name, config.Alias)
logger.OnErr(func() {
writeErrorsRegister.Incr(1)
})
setLogIfExist(output, logger)

if config.MetricBufferLimit > 0 {
Expand Down Expand Up @@ -240,3 +241,7 @@ func (r *RunningOutput) LogBufferStatus() {
nBuffer := r.buffer.Len()
r.log.Debugf("Buffer fullness: %d / %d metrics", nBuffer, r.MetricBufferLimit)
}

func (r *RunningOutput) Log() telegraf.Logger {
return r.log
}
13 changes: 9 additions & 4 deletions internal/models/running_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,11 @@ func NewRunningProcessor(processor telegraf.Processor, config *ProcessorConfig)
tags["alias"] = config.Alias
}

logger := &Logger{
Name: logName("processors", config.Name, config.Alias),
Errs: selfstat.Register("process", "errors", tags),
}
processErrorsRegister := selfstat.Register("process", "errors", tags)
logger := NewLogger("processors", config.Name, config.Alias)
logger.OnErr(func() {
processErrorsRegister.Incr(1)
})
setLogIfExist(processor, logger)

return &RunningProcessor{
Expand Down Expand Up @@ -97,3 +98,7 @@ func (rp *RunningProcessor) Apply(in ...telegraf.Metric) []telegraf.Metric {

return ret
}

func (r *RunningProcessor) Log() telegraf.Logger {
return r.log
}
4 changes: 4 additions & 0 deletions plugins/inputs/cloud_pubsub_push/pubsub_push_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,10 @@ func (tm *testMetricMaker) MakeMetric(metric telegraf.Metric) telegraf.Metric {
return metric
}

func (tm *testMetricMaker) Log() telegraf.Logger {
return models.NewLogger("test", "test", "")
}

type testOutput struct {
// if true, mock a write failure
failWrite bool
Expand Down

0 comments on commit fc98fbf

Please sign in to comment.