Skip to content

Commit

Permalink
feat(outputs): Add framework to retry on startup errors (#14884)
Browse files Browse the repository at this point in the history
  • Loading branch information
srebhan authored Mar 26, 2024
1 parent 4344972 commit aa030b5
Show file tree
Hide file tree
Showing 6 changed files with 405 additions and 30 deletions.
25 changes: 14 additions & 11 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -793,10 +793,17 @@ func (a *Agent) startOutputs(
src := make(chan telegraf.Metric, 100)
unit := &outputUnit{src: src}
for _, output := range outputs {
err := a.connectOutput(ctx, output)
if err != nil {
for _, output := range unit.outputs {
if err := a.connectOutput(ctx, output); err != nil {
var fatalErr *internal.FatalError
if errors.As(err, &fatalErr) {
// If the model tells us to remove the plugin we do so without error
log.Printf("I! [agent] Failed to connect to [%s], error was %q; shutting down plugin...", output.LogName(), err)
output.Close()
continue
}

for _, unitOutput := range unit.outputs {
unitOutput.Close()
}
return nil, nil, fmt.Errorf("connecting output %s: %w", output.LogName(), err)
}
Expand All @@ -810,18 +817,14 @@ func (a *Agent) startOutputs(
// connectOutputs connects to all outputs.
func (a *Agent) connectOutput(ctx context.Context, output *models.RunningOutput) error {
log.Printf("D! [agent] Attempting connection to [%s]", output.LogName())
err := output.Output.Connect()
if err != nil {
log.Printf("E! [agent] Failed to connect to [%s], retrying in 15s, "+
"error was %q", output.LogName(), err)
if err := output.Connect(); err != nil {
log.Printf("E! [agent] Failed to connect to [%s], retrying in 15s, error was %q", output.LogName(), err)

err := internal.SleepContext(ctx, 15*time.Second)
if err != nil {
if err := internal.SleepContext(ctx, 15*time.Second); err != nil {
return err
}

err = output.Output.Connect()
if err != nil {
if err = output.Connect(); err != nil {
return fmt.Errorf("error connecting to output %q: %w", output.LogName(), err)
}
}
Expand Down
3 changes: 2 additions & 1 deletion config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -1486,6 +1486,7 @@ func (c *Config) buildOutput(name string, tbl *ast.Table) (*models.OutputConfig,
c.getFieldString(tbl, "name_override", &oc.NameOverride)
c.getFieldString(tbl, "name_suffix", &oc.NameSuffix)
c.getFieldString(tbl, "name_prefix", &oc.NamePrefix)
c.getFieldString(tbl, "startup_error_behavior", &oc.StartupErrorBehavior)

if c.hasErrs() {
return nil, c.firstErr()
Expand All @@ -1510,7 +1511,7 @@ func (c *Config) missingTomlField(_ reflect.Type, key string) error {
"name_override", "name_prefix", "name_suffix", "namedrop", "namedrop_separator", "namepass", "namepass_separator",
"order",
"pass", "period", "precision",
"tagdrop", "tagexclude", "taginclude", "tagpass", "tags":
"tagdrop", "tagexclude", "taginclude", "tagpass", "tags", "startup_error_behavior":

// Secret-store options to ignore
case "id":
Expand Down
39 changes: 39 additions & 0 deletions internal/errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package internal

import "errors"

var ErrNotConnected = errors.New("not connected")

// StartupError indicates an error that occurred during startup of a plugin
// e.g. due to connectivity issues or resources being not yet available.
// In case the 'Retry' flag is set, the startup of the plugin might be retried
// depending on the configured startup-error-behavior. The 'RemovePlugin'
// flag denotes if the agent should remove the plugin from further processing.
type StartupError struct {
Err error
Retry bool
Partial bool
}

func (e *StartupError) Error() string {
return e.Err.Error()
}

func (e *StartupError) Unwrap() error {
return e.Err
}

// FatalError indicates a not-recoverable error in the plugin. The corresponding
// plugin should be remove by the agent stopping any further processing for that
// plugin instance.
type FatalError struct {
Err error
}

func (e *FatalError) Error() string {
return e.Err.Error()
}

func (e *FatalError) Unwrap() error {
return e.Err
}
107 changes: 91 additions & 16 deletions models/running_output.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
package models

import (
"errors"
"fmt"
"sync"
"sync/atomic"
"time"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/selfstat"
)

Expand All @@ -19,10 +22,11 @@ const (

// OutputConfig containing name and filter
type OutputConfig struct {
Name string
Alias string
ID string
Filter Filter
Name string
Alias string
ID string
StartupErrorBehavior string
Filter Filter

FlushInterval time.Duration
FlushJitter time.Duration
Expand All @@ -47,12 +51,16 @@ type RunningOutput struct {

MetricsFiltered selfstat.Stat
WriteTime selfstat.Stat
StartupErrors selfstat.Stat

BatchReady chan time.Time

buffer *Buffer
log telegraf.Logger

started bool
retries uint64

aggMutex sync.Mutex
}

Expand Down Expand Up @@ -104,6 +112,11 @@ func NewRunningOutput(
"write_time_ns",
tags,
),
StartupErrors: selfstat.Register(
"write",
"startup_errors",
tags,
),
log: logger,
}

Expand All @@ -119,7 +132,20 @@ func (r *RunningOutput) metricFiltered(metric telegraf.Metric) {
metric.Drop()
}

func (r *RunningOutput) ID() string {
if p, ok := r.Output.(telegraf.PluginWithID); ok {
return p.ID()
}
return r.Config.ID
}

func (r *RunningOutput) Init() error {
switch r.Config.StartupErrorBehavior {
case "", "error", "retry", "ignore":
default:
return fmt.Errorf("invalid 'startup_error_behavior' setting %q", r.Config.StartupErrorBehavior)
}

if p, ok := r.Output.(telegraf.Initializer); ok {
err := p.Init()
if err != nil {
Expand All @@ -129,11 +155,41 @@ func (r *RunningOutput) Init() error {
return nil
}

func (r *RunningOutput) ID() string {
if p, ok := r.Output.(telegraf.PluginWithID); ok {
return p.ID()
func (r *RunningOutput) Connect() error {
// Try to connect and exit early on success
err := r.Output.Connect()
if err == nil {
r.started = true
return nil
}
r.StartupErrors.Incr(1)

// Check if the plugin reports a retry-able error, otherwise we exit.
var serr *internal.StartupError
if !errors.As(err, &serr) || !serr.Retry {
return err
}

// Handle the retry-able error depending on the configured behavior
switch r.Config.StartupErrorBehavior {
case "", "error": // fall-trough to return the actual error
case "retry":
r.log.Infof("Connect failed: %v; retrying...", err)
return nil
case "ignore":
return &internal.FatalError{Err: serr}
default:
r.log.Errorf("Invalid 'startup_error_behavior' setting %q", r.Config.StartupErrorBehavior)
}

return err
}

// Close closes the output
func (r *RunningOutput) Close() {
if err := r.Output.Close(); err != nil {
r.log.Errorf("Error closing output: %v", err)
}
return r.Config.ID
}

// AddMetric adds a metric to the output.
Expand Down Expand Up @@ -188,6 +244,22 @@ func (r *RunningOutput) AddMetric(metric telegraf.Metric) {
// Write writes all metrics to the output, stopping when all have been sent on
// or error.
func (r *RunningOutput) Write() error {
// Try to connect if we are not yet started up
if !r.started {
r.retries++
if err := r.Output.Connect(); err != nil {
var serr *internal.StartupError
if !errors.As(err, &serr) || !serr.Retry || !serr.Partial {
r.StartupErrors.Incr(1)
return internal.ErrNotConnected
}
r.log.Debugf("Partially connected after %d attempts", r.retries)
} else {
r.started = true
r.log.Debugf("Successfully connected after %d attempts", r.retries)
}
}

if output, ok := r.Output.(telegraf.AggregatingOutput); ok {
r.aggMutex.Lock()
metrics := output.Push()
Expand Down Expand Up @@ -220,6 +292,17 @@ func (r *RunningOutput) Write() error {

// WriteBatch writes a single batch of metrics to the output.
func (r *RunningOutput) WriteBatch() error {
// Try to connect if we are not yet started up
if !r.started {
r.retries++
if err := r.Output.Connect(); err != nil {
r.StartupErrors.Incr(1)
return internal.ErrNotConnected
}
r.started = true
r.log.Debugf("Successfully connected after %d attempts", r.retries)
}

batch := r.buffer.Batch(r.MetricBatchSize)
if len(batch) == 0 {
return nil
Expand All @@ -235,14 +318,6 @@ func (r *RunningOutput) WriteBatch() error {
return nil
}

// Close closes the output
func (r *RunningOutput) Close() {
err := r.Output.Close()
if err != nil {
r.log.Errorf("Error closing output: %v", err)
}
}

func (r *RunningOutput) writeMetrics(metrics []telegraf.Metric) error {
dropped := atomic.LoadInt64(&r.droppedMetrics)
if dropped > 0 {
Expand Down
Loading

0 comments on commit aa030b5

Please sign in to comment.