From 8e43a8dcd7c694075eeeb20bc66446bc6b74c5a6 Mon Sep 17 00:00:00 2001 From: Sven Rebhan Date: Thu, 22 Feb 2024 17:50:47 +0100 Subject: [PATCH 01/13] Implement retry logic for startup errors in output plugins --- agent/agent.go | 25 +++++++----- config/config.go | 3 +- models/running_output.go | 87 ++++++++++++++++++++++++++++++++-------- 3 files changed, 87 insertions(+), 28 deletions(-) diff --git a/agent/agent.go b/agent/agent.go index f4cf8a2c39a1b..b3b0a9eecb1b9 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -793,10 +793,17 @@ func (a *Agent) startOutputs( src := make(chan telegraf.Metric, 100) unit := &outputUnit{src: src} for _, output := range outputs { - err := a.connectOutput(ctx, output) - if err != nil { - for _, output := range unit.outputs { + if err := a.connectOutput(ctx, output); err != nil { + var serr *telegraf.StartupError + if errors.As(err, &serr) && serr.RemovePlugin { + // If the model tells us to remove the plugin we do so without error + log.Printf("I! [agent] Failed to connect to [%s], error was %q; shutting down plugin...", output.LogName(), err) output.Close() + continue + } + + for _, unitOutput := range unit.outputs { + unitOutput.Close() } return nil, nil, fmt.Errorf("connecting output %s: %w", output.LogName(), err) } @@ -810,18 +817,14 @@ func (a *Agent) startOutputs( // connectOutputs connects to all outputs. func (a *Agent) connectOutput(ctx context.Context, output *models.RunningOutput) error { log.Printf("D! [agent] Attempting connection to [%s]", output.LogName()) - err := output.Output.Connect() - if err != nil { - log.Printf("E! [agent] Failed to connect to [%s], retrying in 15s, "+ - "error was %q", output.LogName(), err) + if err := output.Connect(); err != nil { + log.Printf("E! [agent] Failed to connect to [%s], retrying in 15s, error was %q", output.LogName(), err) - err := internal.SleepContext(ctx, 15*time.Second) - if err != nil { + if err := internal.SleepContext(ctx, 15*time.Second); err != nil { return err } - err = output.Output.Connect() - if err != nil { + if err = output.Connect(); err != nil { return fmt.Errorf("error connecting to output %q: %w", output.LogName(), err) } } diff --git a/config/config.go b/config/config.go index 394d22d966a7b..f4639bd4acc80 100644 --- a/config/config.go +++ b/config/config.go @@ -1481,6 +1481,7 @@ func (c *Config) buildOutput(name string, tbl *ast.Table) (*models.OutputConfig, c.getFieldString(tbl, "name_override", &oc.NameOverride) c.getFieldString(tbl, "name_suffix", &oc.NameSuffix) c.getFieldString(tbl, "name_prefix", &oc.NamePrefix) + c.getFieldString(tbl, "startup_error_behavior", &oc.StartupErrorBehavior) if c.hasErrs() { return nil, c.firstErr() @@ -1505,7 +1506,7 @@ func (c *Config) missingTomlField(_ reflect.Type, key string) error { "name_override", "name_prefix", "name_suffix", "namedrop", "namedrop_separator", "namepass", "namepass_separator", "order", "pass", "period", "precision", - "tagdrop", "tagexclude", "taginclude", "tagpass", "tags": + "tagdrop", "tagexclude", "taginclude", "tagpass", "tags", "startup_error_behavior": // Secret-store options to ignore case "id": diff --git a/models/running_output.go b/models/running_output.go index 36bd53abe11ec..bfca7d283d3d3 100644 --- a/models/running_output.go +++ b/models/running_output.go @@ -1,6 +1,7 @@ package models import ( + "errors" "sync" "sync/atomic" "time" @@ -19,10 +20,11 @@ const ( // OutputConfig containing name and filter type OutputConfig struct { - Name string - Alias string - ID string - Filter Filter + Name string + Alias string + ID string + StartupErrorBehavior string + Filter Filter FlushInterval time.Duration FlushJitter time.Duration @@ -53,6 +55,9 @@ type RunningOutput struct { buffer *Buffer log telegraf.Logger + started bool + retries uint64 + aggMutex sync.Mutex } @@ -119,6 +124,13 @@ func (r *RunningOutput) metricFiltered(metric telegraf.Metric) { metric.Drop() } +func (r *RunningOutput) ID() string { + if p, ok := r.Output.(telegraf.PluginWithID); ok { + return p.ID() + } + return r.Config.ID +} + func (r *RunningOutput) Init() error { if p, ok := r.Output.(telegraf.Initializer); ok { err := p.Init() @@ -129,11 +141,42 @@ func (r *RunningOutput) Init() error { return nil } -func (r *RunningOutput) ID() string { - if p, ok := r.Output.(telegraf.PluginWithID); ok { - return p.ID() +func (r *RunningOutput) Connect() error { + // Try to connect and exit early on success + err := r.Output.Connect() + if err == nil { + r.started = true + return nil + } + + // Check if the plugin reports a retry-able error, otherwise we exit. + var serr *telegraf.StartupError + if !errors.As(err, &serr) || !serr.Retry { + return err + } + serr.RemovePlugin = false + + // Handle the retry-able error depending on the configured behavior + switch r.Config.StartupErrorBehavior { + case "", "error": // fall-trough to return the actual error + case "retry": + r.log.Infof("Connect failed: %w; retrying...", err) + return nil + case "ignore": + serr.RemovePlugin = true + return serr + default: + r.log.Errorf("Invalid 'startup_error_behavior' setting %q", r.Config.StartupErrorBehavior) + } + + return err +} + +// Close closes the output +func (r *RunningOutput) Close() { + if err := r.Output.Close(); err != nil { + r.log.Errorf("Error closing output: %v", err) } - return r.Config.ID } // AddMetric adds a metric to the output. @@ -188,6 +231,16 @@ func (r *RunningOutput) AddMetric(metric telegraf.Metric) { // Write writes all metrics to the output, stopping when all have been sent on // or error. func (r *RunningOutput) Write() error { + // Try to connect if we are not yet started up + if !r.started { + r.retries++ + if err := r.Output.Connect(); err != nil { + return telegraf.ErrNotConnected + } + r.started = true + r.log.Debug("Successfully connected after %d attempts", r.retries) + } + if output, ok := r.Output.(telegraf.AggregatingOutput); ok { r.aggMutex.Lock() metrics := output.Push() @@ -220,6 +273,16 @@ func (r *RunningOutput) Write() error { // WriteBatch writes a single batch of metrics to the output. func (r *RunningOutput) WriteBatch() error { + // Try to connect if we are not yet started up + if !r.started { + r.retries++ + if err := r.Output.Connect(); err != nil { + return telegraf.ErrNotConnected + } + r.started = true + r.log.Debug("Successfully connected after %d attempts", r.retries) + } + batch := r.buffer.Batch(r.MetricBatchSize) if len(batch) == 0 { return nil @@ -235,14 +298,6 @@ func (r *RunningOutput) WriteBatch() error { return nil } -// Close closes the output -func (r *RunningOutput) Close() { - err := r.Output.Close() - if err != nil { - r.log.Errorf("Error closing output: %v", err) - } -} - func (r *RunningOutput) writeMetrics(metrics []telegraf.Metric) error { dropped := atomic.LoadInt64(&r.droppedMetrics) if dropped > 0 { From 915d692ff73715f212ba1377db943664e20f863f Mon Sep 17 00:00:00 2001 From: Sven Rebhan Date: Thu, 22 Feb 2024 17:51:04 +0100 Subject: [PATCH 02/13] Use retry framework in outputs.kafka --- plugins/outputs/kafka/kafka.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/plugins/outputs/kafka/kafka.go b/plugins/outputs/kafka/kafka.go index 184ed3b804ec9..0968bbad3d4e4 100644 --- a/plugins/outputs/kafka/kafka.go +++ b/plugins/outputs/kafka/kafka.go @@ -157,13 +157,16 @@ func (k *Kafka) Init() error { func (k *Kafka) Connect() error { producer, err := k.producerFunc(k.Brokers, k.saramaConfig) if err != nil { - return err + return &telegraf.StartupError{Err: err, Retry: true} } k.producer = producer return nil } func (k *Kafka) Close() error { + if k.producer == nil { + return nil + } return k.producer.Close() } From 31b9a8d73135f8265ec419aba1b71070f6e97037 Mon Sep 17 00:00:00 2001 From: Sven Rebhan Date: Thu, 22 Feb 2024 18:10:05 +0100 Subject: [PATCH 03/13] Allow the plugin to decide that it wants to be removed --- models/running_output.go | 1 - 1 file changed, 1 deletion(-) diff --git a/models/running_output.go b/models/running_output.go index bfca7d283d3d3..5743e8f1447a7 100644 --- a/models/running_output.go +++ b/models/running_output.go @@ -154,7 +154,6 @@ func (r *RunningOutput) Connect() error { if !errors.As(err, &serr) || !serr.Retry { return err } - serr.RemovePlugin = false // Handle the retry-able error depending on the configured behavior switch r.Config.StartupErrorBehavior { From 35b53678fe084216e88bed0d2c89db1a9d729f27 Mon Sep 17 00:00:00 2001 From: Sven Rebhan Date: Thu, 22 Feb 2024 18:12:31 +0100 Subject: [PATCH 04/13] Add telegraf errors --- errors.go | 24 ++++++++++++++++++++++++ 1 file changed, 24 insertions(+) create mode 100644 errors.go diff --git a/errors.go b/errors.go new file mode 100644 index 0000000000000..adfef70b4e195 --- /dev/null +++ b/errors.go @@ -0,0 +1,24 @@ +package telegraf + +import "errors" + +var ErrNotConnected = errors.New("not connected") + +// StartupError indicates an error that occurred during startup of a plugin +// e.g. due to connectivity issues or resources being not yet available. +// In case the 'Retry' flag is set, the startup of the plugin might be retried +// depending on the configured startup-error-behavior. The 'RemovePlugin' +// flag denotes if the agent should remove the plugin from further processing. +type StartupError struct { + Err error + Retry bool + RemovePlugin bool +} + +func (e *StartupError) Error() string { + return e.Err.Error() +} + +func (e *StartupError) Unwrap() error { + return e.Err +} From b6c91388afc2073d96c5c15ab782b2f89e76bbec Mon Sep 17 00:00:00 2001 From: Sven Rebhan Date: Thu, 22 Feb 2024 18:25:28 +0100 Subject: [PATCH 05/13] Add more stats for the outputs --- models/running_output.go | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/models/running_output.go b/models/running_output.go index 5743e8f1447a7..d3e032f95d6e9 100644 --- a/models/running_output.go +++ b/models/running_output.go @@ -49,6 +49,8 @@ type RunningOutput struct { MetricsFiltered selfstat.Stat WriteTime selfstat.Stat + BufferFullness selfstat.Stat + StartupErrors selfstat.Stat BatchReady chan time.Time @@ -109,6 +111,16 @@ func NewRunningOutput( "write_time_ns", tags, ), + BufferFullness: selfstat.Register( + "write", + "buffered_metrics", + tags, + ), + StartupErrors: selfstat.Register( + "write", + "startup_errors", + tags, + ), log: logger, } @@ -148,6 +160,7 @@ func (r *RunningOutput) Connect() error { r.started = true return nil } + r.StartupErrors.Incr(1) // Check if the plugin reports a retry-able error, otherwise we exit. var serr *telegraf.StartupError @@ -234,6 +247,7 @@ func (r *RunningOutput) Write() error { if !r.started { r.retries++ if err := r.Output.Connect(); err != nil { + r.StartupErrors.Incr(1) return telegraf.ErrNotConnected } r.started = true @@ -276,6 +290,7 @@ func (r *RunningOutput) WriteBatch() error { if !r.started { r.retries++ if err := r.Output.Connect(); err != nil { + r.StartupErrors.Incr(1) return telegraf.ErrNotConnected } r.started = true @@ -318,6 +333,7 @@ func (r *RunningOutput) writeMetrics(metrics []telegraf.Metric) error { func (r *RunningOutput) LogBufferStatus() { nBuffer := r.buffer.Len() r.log.Debugf("Buffer fullness: %d / %d metrics", nBuffer, r.MetricBufferLimit) + r.BufferFullness.Set(int64(nBuffer)) } func (r *RunningOutput) Log() telegraf.Logger { From f85b578f6efcd4b3708fb67b25c8d6b272bd727f Mon Sep 17 00:00:00 2001 From: Sven Rebhan Date: Thu, 22 Feb 2024 18:38:38 +0100 Subject: [PATCH 06/13] Improve stats and fix linter issues --- models/running_output.go | 8 ++++---- models/running_output_test.go | 2 ++ 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/models/running_output.go b/models/running_output.go index d3e032f95d6e9..a1b938b079c58 100644 --- a/models/running_output.go +++ b/models/running_output.go @@ -113,7 +113,7 @@ func NewRunningOutput( ), BufferFullness: selfstat.Register( "write", - "buffered_metrics", + "buffer_usage", tags, ), StartupErrors: selfstat.Register( @@ -172,7 +172,7 @@ func (r *RunningOutput) Connect() error { switch r.Config.StartupErrorBehavior { case "", "error": // fall-trough to return the actual error case "retry": - r.log.Infof("Connect failed: %w; retrying...", err) + r.log.Infof("Connect failed: %v; retrying...", err) return nil case "ignore": serr.RemovePlugin = true @@ -251,7 +251,7 @@ func (r *RunningOutput) Write() error { return telegraf.ErrNotConnected } r.started = true - r.log.Debug("Successfully connected after %d attempts", r.retries) + r.log.Debugf("Successfully connected after %d attempts", r.retries) } if output, ok := r.Output.(telegraf.AggregatingOutput); ok { @@ -294,7 +294,7 @@ func (r *RunningOutput) WriteBatch() error { return telegraf.ErrNotConnected } r.started = true - r.log.Debug("Successfully connected after %d attempts", r.retries) + r.log.Debugf("Successfully connected after %d attempts", r.retries) } batch := r.buffer.Batch(r.MetricBatchSize) diff --git a/models/running_output_test.go b/models/running_output_test.go index 013ecbe6031f5..c104a5e641e4d 100644 --- a/models/running_output_test.go +++ b/models/running_output_test.go @@ -481,12 +481,14 @@ func TestInternalMetrics(t *testing.T) { map[string]interface{}{ "buffer_limit": 10, "buffer_size": 0, + "buffer_usage": 0, "errors": 0, "metrics_added": 0, "metrics_dropped": 0, "metrics_filtered": 0, "metrics_written": 0, "write_time_ns": 0, + "startup_errors": 0, }, time.Unix(0, 0), ), From 2d6d9955e43e9e0c22f27efc72afc838a2c83361 Mon Sep 17 00:00:00 2001 From: Sven Rebhan Date: Mon, 4 Mar 2024 15:35:23 +0100 Subject: [PATCH 07/13] Remove redundant internal metric --- models/running_output.go | 7 ------- 1 file changed, 7 deletions(-) diff --git a/models/running_output.go b/models/running_output.go index a1b938b079c58..2339a57e01eed 100644 --- a/models/running_output.go +++ b/models/running_output.go @@ -49,7 +49,6 @@ type RunningOutput struct { MetricsFiltered selfstat.Stat WriteTime selfstat.Stat - BufferFullness selfstat.Stat StartupErrors selfstat.Stat BatchReady chan time.Time @@ -111,11 +110,6 @@ func NewRunningOutput( "write_time_ns", tags, ), - BufferFullness: selfstat.Register( - "write", - "buffer_usage", - tags, - ), StartupErrors: selfstat.Register( "write", "startup_errors", @@ -333,7 +327,6 @@ func (r *RunningOutput) writeMetrics(metrics []telegraf.Metric) error { func (r *RunningOutput) LogBufferStatus() { nBuffer := r.buffer.Len() r.log.Debugf("Buffer fullness: %d / %d metrics", nBuffer, r.MetricBufferLimit) - r.BufferFullness.Set(int64(nBuffer)) } func (r *RunningOutput) Log() telegraf.Logger { From 13af260255045d6d21a7c5168acd6c5129a7f1c0 Mon Sep 17 00:00:00 2001 From: Sven Rebhan Date: Mon, 4 Mar 2024 15:53:14 +0100 Subject: [PATCH 08/13] Use explicit fatal-error type --- agent/agent.go | 4 ++-- errors.go | 20 +++++++++++++++++--- models/running_output.go | 3 +-- 3 files changed, 20 insertions(+), 7 deletions(-) diff --git a/agent/agent.go b/agent/agent.go index b3b0a9eecb1b9..79ff287f543de 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -794,8 +794,8 @@ func (a *Agent) startOutputs( unit := &outputUnit{src: src} for _, output := range outputs { if err := a.connectOutput(ctx, output); err != nil { - var serr *telegraf.StartupError - if errors.As(err, &serr) && serr.RemovePlugin { + var fatalErr *telegraf.FatalError + if errors.As(err, &fatalErr) { // If the model tells us to remove the plugin we do so without error log.Printf("I! [agent] Failed to connect to [%s], error was %q; shutting down plugin...", output.LogName(), err) output.Close() diff --git a/errors.go b/errors.go index adfef70b4e195..2eb2695331213 100644 --- a/errors.go +++ b/errors.go @@ -10,9 +10,8 @@ var ErrNotConnected = errors.New("not connected") // depending on the configured startup-error-behavior. The 'RemovePlugin' // flag denotes if the agent should remove the plugin from further processing. type StartupError struct { - Err error - Retry bool - RemovePlugin bool + Err error + Retry bool } func (e *StartupError) Error() string { @@ -22,3 +21,18 @@ func (e *StartupError) Error() string { func (e *StartupError) Unwrap() error { return e.Err } + +// FatalError indicates a not-recoverable error in the plugin. The corresponding +// plugin should be remove by the agent stopping any further processing for that +// plugin instance. +type FatalError struct { + Err error +} + +func (e *FatalError) Error() string { + return e.Err.Error() +} + +func (e *FatalError) Unwrap() error { + return e.Err +} diff --git a/models/running_output.go b/models/running_output.go index 2339a57e01eed..817a7cf0b9e90 100644 --- a/models/running_output.go +++ b/models/running_output.go @@ -169,8 +169,7 @@ func (r *RunningOutput) Connect() error { r.log.Infof("Connect failed: %v; retrying...", err) return nil case "ignore": - serr.RemovePlugin = true - return serr + return &telegraf.FatalError{Err: serr} default: r.log.Errorf("Invalid 'startup_error_behavior' setting %q", r.Config.StartupErrorBehavior) } From e2e804d9ad3e41dc204c1391a3d7c79201af3405 Mon Sep 17 00:00:00 2001 From: Sven Rebhan Date: Mon, 4 Mar 2024 15:55:18 +0100 Subject: [PATCH 09/13] Move errors to internal to prevent external libraries from triggering plugin removal --- agent/agent.go | 2 +- errors.go => internal/errors.go | 2 +- models/running_output.go | 9 +++++---- plugins/outputs/kafka/kafka.go | 3 ++- 4 files changed, 9 insertions(+), 7 deletions(-) rename errors.go => internal/errors.go (98%) diff --git a/agent/agent.go b/agent/agent.go index 79ff287f543de..a60b813d44931 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -794,7 +794,7 @@ func (a *Agent) startOutputs( unit := &outputUnit{src: src} for _, output := range outputs { if err := a.connectOutput(ctx, output); err != nil { - var fatalErr *telegraf.FatalError + var fatalErr *internal.FatalError if errors.As(err, &fatalErr) { // If the model tells us to remove the plugin we do so without error log.Printf("I! [agent] Failed to connect to [%s], error was %q; shutting down plugin...", output.LogName(), err) diff --git a/errors.go b/internal/errors.go similarity index 98% rename from errors.go rename to internal/errors.go index 2eb2695331213..064236f4a9e5a 100644 --- a/errors.go +++ b/internal/errors.go @@ -1,4 +1,4 @@ -package telegraf +package internal import "errors" diff --git a/models/running_output.go b/models/running_output.go index 817a7cf0b9e90..950b377e5d17e 100644 --- a/models/running_output.go +++ b/models/running_output.go @@ -7,6 +7,7 @@ import ( "time" "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/selfstat" ) @@ -157,7 +158,7 @@ func (r *RunningOutput) Connect() error { r.StartupErrors.Incr(1) // Check if the plugin reports a retry-able error, otherwise we exit. - var serr *telegraf.StartupError + var serr *internal.StartupError if !errors.As(err, &serr) || !serr.Retry { return err } @@ -169,7 +170,7 @@ func (r *RunningOutput) Connect() error { r.log.Infof("Connect failed: %v; retrying...", err) return nil case "ignore": - return &telegraf.FatalError{Err: serr} + return &internal.FatalError{Err: serr} default: r.log.Errorf("Invalid 'startup_error_behavior' setting %q", r.Config.StartupErrorBehavior) } @@ -241,7 +242,7 @@ func (r *RunningOutput) Write() error { r.retries++ if err := r.Output.Connect(); err != nil { r.StartupErrors.Incr(1) - return telegraf.ErrNotConnected + return internal.ErrNotConnected } r.started = true r.log.Debugf("Successfully connected after %d attempts", r.retries) @@ -284,7 +285,7 @@ func (r *RunningOutput) WriteBatch() error { r.retries++ if err := r.Output.Connect(); err != nil { r.StartupErrors.Incr(1) - return telegraf.ErrNotConnected + return internal.ErrNotConnected } r.started = true r.log.Debugf("Successfully connected after %d attempts", r.retries) diff --git a/plugins/outputs/kafka/kafka.go b/plugins/outputs/kafka/kafka.go index 0968bbad3d4e4..13440f7cace0e 100644 --- a/plugins/outputs/kafka/kafka.go +++ b/plugins/outputs/kafka/kafka.go @@ -12,6 +12,7 @@ import ( "github.com/gofrs/uuid/v5" "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/plugins/common/kafka" "github.com/influxdata/telegraf/plugins/common/proxy" "github.com/influxdata/telegraf/plugins/outputs" @@ -157,7 +158,7 @@ func (k *Kafka) Init() error { func (k *Kafka) Connect() error { producer, err := k.producerFunc(k.Brokers, k.saramaConfig) if err != nil { - return &telegraf.StartupError{Err: err, Retry: true} + return &internal.StartupError{Err: err, Retry: true} } k.producer = producer return nil From 20f0a973fc7b7c22e2fcb95a968a07136ffab16d Mon Sep 17 00:00:00 2001 From: Sven Rebhan Date: Mon, 4 Mar 2024 16:30:53 +0100 Subject: [PATCH 10/13] Check startup-error-behavior for valid values --- models/running_output.go | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/models/running_output.go b/models/running_output.go index 950b377e5d17e..c09e0a3fc660e 100644 --- a/models/running_output.go +++ b/models/running_output.go @@ -2,6 +2,7 @@ package models import ( "errors" + "fmt" "sync" "sync/atomic" "time" @@ -139,6 +140,12 @@ func (r *RunningOutput) ID() string { } func (r *RunningOutput) Init() error { + switch r.Config.StartupErrorBehavior { + case "", "error", "retry", "ignore": + default: + return fmt.Errorf("invalid 'startup_error_behavior' setting %q", r.Config.StartupErrorBehavior) + } + if p, ok := r.Output.(telegraf.Initializer); ok { err := p.Init() if err != nil { From 44be6662d654aeeb79a1ff03728089651c9b4586 Mon Sep 17 00:00:00 2001 From: Sven Rebhan Date: Mon, 4 Mar 2024 16:31:10 +0100 Subject: [PATCH 11/13] Implement unit-tests --- models/running_output_test.go | 212 +++++++++++++++++++++++++++++++++- 1 file changed, 210 insertions(+), 2 deletions(-) diff --git a/models/running_output_test.go b/models/running_output_test.go index c104a5e641e4d..6520296847493 100644 --- a/models/running_output_test.go +++ b/models/running_output_test.go @@ -2,6 +2,7 @@ package models import ( "errors" + "fmt" "sync" "testing" "time" @@ -9,6 +10,7 @@ import ( "github.com/stretchr/testify/require" "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/selfstat" "github.com/influxdata/telegraf/testutil" ) @@ -481,7 +483,6 @@ func TestInternalMetrics(t *testing.T) { map[string]interface{}{ "buffer_limit": 10, "buffer_size": 0, - "buffer_usage": 0, "errors": 0, "metrics_added": 0, "metrics_dropped": 0, @@ -505,6 +506,200 @@ func TestInternalMetrics(t *testing.T) { testutil.RequireMetricsEqual(t, expected, actual, testutil.IgnoreTime()) } +func TestStartupBehaviorInvalid(t *testing.T) { + ro := NewRunningOutput( + &mockOutput{}, + &OutputConfig{ + Filter: Filter{}, + Name: "test_name", + Alias: "test_alias", + StartupErrorBehavior: "foo", + }, + 5, 10, + ) + require.ErrorContains(t, ro.Init(), "invalid 'startup_error_behavior'") +} + +func TestRetryableStartupBehaviorDefault(t *testing.T) { + serr := &internal.StartupError{ + Err: errors.New("retryable err"), + Retry: true, + } + ro := NewRunningOutput( + &mockOutput{ + startupErrorCount: 1, + startupError: serr, + }, + &OutputConfig{ + Filter: Filter{}, + Name: "test_name", + Alias: "test_alias", + }, + 5, 10, + ) + require.NoError(t, ro.Init()) + + // If Connect() fails, the agent will stop + require.ErrorIs(t, ro.Connect(), serr) + require.False(t, ro.started) +} + +func TestRetryableStartupBehaviorError(t *testing.T) { + serr := &internal.StartupError{ + Err: errors.New("retryable err"), + Retry: true, + } + ro := NewRunningOutput( + &mockOutput{ + startupErrorCount: 1, + startupError: serr, + }, + &OutputConfig{ + Filter: Filter{}, + Name: "test_name", + Alias: "test_alias", + StartupErrorBehavior: "error", + }, + 5, 10, + ) + require.NoError(t, ro.Init()) + + // If Connect() fails, the agent will stop + require.ErrorIs(t, ro.Connect(), serr) + require.False(t, ro.started) +} + +func TestRetryableStartupBehaviorRetry(t *testing.T) { + serr := &internal.StartupError{ + Err: errors.New("retryable err"), + Retry: true, + } + mo := &mockOutput{ + startupErrorCount: 2, + startupError: serr, + } + ro := NewRunningOutput( + mo, + &OutputConfig{ + Filter: Filter{}, + Name: "test_name", + Alias: "test_alias", + StartupErrorBehavior: "retry", + }, + 5, 10, + ) + require.NoError(t, ro.Init()) + + // For retry, Connect() should succeed even though there is an error but + // should return an error on Write() until we successfully connect. + require.NoError(t, ro.Connect(), serr) + require.False(t, ro.started) + + ro.AddMetric(testutil.TestMetric(1)) + require.ErrorIs(t, ro.Write(), internal.ErrNotConnected) + require.False(t, ro.started) + + ro.AddMetric(testutil.TestMetric(2)) + require.NoError(t, ro.Write()) + require.True(t, ro.started) + require.Equal(t, 1, mo.writes) + + ro.AddMetric(testutil.TestMetric(3)) + require.NoError(t, ro.Write()) + require.True(t, ro.started) + require.Equal(t, 2, mo.writes) +} + +func TestRetryableStartupBehaviorIgnore(t *testing.T) { + serr := &internal.StartupError{ + Err: errors.New("retryable err"), + Retry: true, + } + mo := &mockOutput{ + startupErrorCount: 2, + startupError: serr, + } + ro := NewRunningOutput( + mo, + &OutputConfig{ + Filter: Filter{}, + Name: "test_name", + Alias: "test_alias", + StartupErrorBehavior: "ignore", + }, + 5, 10, + ) + require.NoError(t, ro.Init()) + + // For ignore, Connect() should return a fatal error if connection fails. + // This will force the agent to remove the plugin. + var fatalErr *internal.FatalError + require.ErrorAs(t, ro.Connect(), &fatalErr) + require.ErrorIs(t, fatalErr, serr) + require.False(t, ro.started) +} + +func TestNonRetryableStartupBehaviorDefault(t *testing.T) { + serr := &internal.StartupError{ + Err: errors.New("non-retryable err"), + Retry: false, + } + + for _, behavior := range []string{"", "error", "retry", "ignore"} { + t.Run(behavior, func(t *testing.T) { + mo := &mockOutput{ + startupErrorCount: 2, + startupError: serr, + } + ro := NewRunningOutput( + mo, + &OutputConfig{ + Filter: Filter{}, + Name: "test_name", + Alias: "test_alias", + StartupErrorBehavior: behavior, + }, + 5, 10, + ) + require.NoError(t, ro.Init()) + + // Non-retryable error should pass through and in turn the agent + // will stop and exit. + require.ErrorIs(t, ro.Connect(), serr) + require.False(t, ro.started) + }) + } +} + +func TestUntypedtartupBehaviorIgnore(t *testing.T) { + serr := errors.New("untyped err") + + for _, behavior := range []string{"", "error", "retry", "ignore"} { + t.Run(behavior, func(t *testing.T) { + mo := &mockOutput{ + startupErrorCount: 2, + startupError: serr, + } + ro := NewRunningOutput( + mo, + &OutputConfig{ + Filter: Filter{}, + Name: "test_name", + Alias: "test_alias", + StartupErrorBehavior: behavior, + }, + 5, 10, + ) + require.NoError(t, ro.Init()) + + // Untyped error should pass through and in turn the agent will + // stop and exit. + require.ErrorIs(t, ro.Connect(), serr) + require.False(t, ro.started) + }) + } +} + type mockOutput struct { sync.Mutex @@ -512,10 +707,20 @@ type mockOutput struct { // if true, mock write failure failWrite bool + + startupError error + startupErrorCount int + writes int } func (m *mockOutput) Connect() error { - return nil + if m.startupErrorCount == 0 { + return nil + } + if m.startupErrorCount > 0 { + m.startupErrorCount-- + } + return m.startupError } func (m *mockOutput) Close() error { @@ -531,6 +736,9 @@ func (m *mockOutput) SampleConfig() string { } func (m *mockOutput) Write(metrics []telegraf.Metric) error { + fmt.Println("writing") + m.writes++ + m.Lock() defer m.Unlock() if m.failWrite { From d50556745a49faf4c21d0a148466c1bdc00cc5da Mon Sep 17 00:00:00 2001 From: Sven Rebhan Date: Mon, 11 Mar 2024 16:23:23 +0100 Subject: [PATCH 12/13] Implement partial startup --- internal/errors.go | 5 +++-- models/running_output.go | 13 +++++++++---- 2 files changed, 12 insertions(+), 6 deletions(-) diff --git a/internal/errors.go b/internal/errors.go index 064236f4a9e5a..a1f58c3eb2510 100644 --- a/internal/errors.go +++ b/internal/errors.go @@ -10,8 +10,9 @@ var ErrNotConnected = errors.New("not connected") // depending on the configured startup-error-behavior. The 'RemovePlugin' // flag denotes if the agent should remove the plugin from further processing. type StartupError struct { - Err error - Retry bool + Err error + Retry bool + Partial bool } func (e *StartupError) Error() string { diff --git a/models/running_output.go b/models/running_output.go index c09e0a3fc660e..8010cc8da6b82 100644 --- a/models/running_output.go +++ b/models/running_output.go @@ -248,11 +248,16 @@ func (r *RunningOutput) Write() error { if !r.started { r.retries++ if err := r.Output.Connect(); err != nil { - r.StartupErrors.Incr(1) - return internal.ErrNotConnected + var serr *internal.StartupError + if !errors.As(err, &serr) || !serr.Retry || !serr.Partial { + r.StartupErrors.Incr(1) + return internal.ErrNotConnected + } + r.log.Debugf("Partially connected after %d attempts", r.retries) + } else { + r.started = true + r.log.Debugf("Successfully connected after %d attempts", r.retries) } - r.started = true - r.log.Debugf("Successfully connected after %d attempts", r.retries) } if output, ok := r.Output.(telegraf.AggregatingOutput); ok { From 6875c010baedf1eb38a79ca6edf430f113c19ab6 Mon Sep 17 00:00:00 2001 From: Sven Rebhan Date: Mon, 11 Mar 2024 18:25:57 +0100 Subject: [PATCH 13/13] Add unit-test for partial startup --- models/running_output_test.go | 43 +++++++++++++++++++++++++++++++++++ 1 file changed, 43 insertions(+) diff --git a/models/running_output_test.go b/models/running_output_test.go index 6520296847493..db498a0c814e3 100644 --- a/models/running_output_test.go +++ b/models/running_output_test.go @@ -700,6 +700,49 @@ func TestUntypedtartupBehaviorIgnore(t *testing.T) { } } +func TestPartiallyStarted(t *testing.T) { + serr := &internal.StartupError{ + Err: errors.New("partial err"), + Retry: true, + Partial: true, + } + mo := &mockOutput{ + startupErrorCount: 2, + startupError: serr, + } + ro := NewRunningOutput( + mo, + &OutputConfig{ + Filter: Filter{}, + Name: "test_name", + Alias: "test_alias", + StartupErrorBehavior: "retry", + }, + 5, 10, + ) + require.NoError(t, ro.Init()) + + // For retry, Connect() should succeed even though there is an error but + // should return an error on Write() until we successfully connect. + require.NoError(t, ro.Connect(), serr) + require.False(t, ro.started) + + ro.AddMetric(testutil.TestMetric(1)) + require.NoError(t, ro.Write()) + require.False(t, ro.started) + require.Equal(t, 1, mo.writes) + + ro.AddMetric(testutil.TestMetric(2)) + require.NoError(t, ro.Write()) + require.True(t, ro.started) + require.Equal(t, 2, mo.writes) + + ro.AddMetric(testutil.TestMetric(3)) + require.NoError(t, ro.Write()) + require.True(t, ro.started) + require.Equal(t, 3, mo.writes) +} + type mockOutput struct { sync.Mutex