Skip to content

Commit

Permalink
feat(plugins): Allow to override log-level per plugin (#15677)
Browse files Browse the repository at this point in the history
  • Loading branch information
srebhan authored Jul 30, 2024
1 parent 6af8321 commit f9f029e
Show file tree
Hide file tree
Showing 10 changed files with 94 additions and 41 deletions.
56 changes: 29 additions & 27 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -971,26 +971,30 @@ func (c *Config) probeParser(parentcategory string, parentname string, table *as
}

func (c *Config) addParser(parentcategory, parentname string, table *ast.Table) (*models.RunningParser, error) {
var dataformat string
c.getFieldString(table, "data_format", &dataformat)
if dataformat == "" {
dataformat = setDefaultParser(parentcategory, parentname)
conf := &models.ParserConfig{
Parent: parentname,
}

var influxParserType string
c.getFieldString(table, "influx_parser_type", &influxParserType)
if dataformat == "influx" && influxParserType == "upstream" {
dataformat = "influx_upstream"
c.getFieldString(table, "data_format", &conf.DataFormat)
if conf.DataFormat == "" {
conf.DataFormat = setDefaultParser(parentcategory, parentname)
} else if conf.DataFormat == "influx" {
var influxParserType string
c.getFieldString(table, "influx_parser_type", &influxParserType)
if influxParserType == "upstream" {
conf.DataFormat = "influx_upstream"
}
}
c.getFieldString(table, "log_level", &conf.LogLevel)

creator, ok := parsers.Parsers[dataformat]
creator, ok := parsers.Parsers[conf.DataFormat]
if !ok {
return nil, fmt.Errorf("undefined but requested parser: %s", dataformat)
return nil, fmt.Errorf("undefined but requested parser: %s", conf.DataFormat)
}
parser := creator(parentname)

// Handle reset-mode of CSV parsers to stay backward compatible (see issue #12022)
if dataformat == "csv" && parentcategory == "inputs" {
if conf.DataFormat == "csv" && parentcategory == "inputs" {
if parentname == "exec" {
csvParser := parser.(*csv.Parser)
csvParser.ResetMode = "always"
Expand All @@ -1001,36 +1005,31 @@ func (c *Config) addParser(parentcategory, parentname string, table *ast.Table)
return nil, err
}

conf := &models.ParserConfig{
Parent: parentname,
DataFormat: dataformat,
}
running := models.NewRunningParser(parser, conf)
err := running.Init()
return running, err
}

func (c *Config) addSerializer(parentname string, table *ast.Table) (*models.RunningSerializer, error) {
var dataformat string
c.getFieldString(table, "data_format", &dataformat)
if dataformat == "" {
dataformat = "influx"
conf := &models.SerializerConfig{
Parent: parentname,
}
c.getFieldString(table, "data_format", &conf.DataFormat)
if conf.DataFormat == "" {
conf.DataFormat = "influx"
}
c.getFieldString(table, "log_level", &conf.LogLevel)

creator, ok := serializers.Serializers[dataformat]
creator, ok := serializers.Serializers[conf.DataFormat]
if !ok {
return nil, fmt.Errorf("undefined but requested serializer: %s", dataformat)
return nil, fmt.Errorf("undefined but requested serializer: %s", conf.DataFormat)
}
serializer := creator()

if err := c.toml.UnmarshalTable(table, serializer); err != nil {
return nil, err
}

conf := &models.SerializerConfig{
Parent: parentname,
DataFormat: dataformat,
}
running := models.NewRunningSerializer(serializer, conf)
err := running.Init()
return running, err
Expand Down Expand Up @@ -1336,6 +1335,7 @@ func (c *Config) buildAggregator(name string, tbl *ast.Table) (*models.Aggregato
c.getFieldString(tbl, "name_suffix", &conf.MeasurementSuffix)
c.getFieldString(tbl, "name_override", &conf.NameOverride)
c.getFieldString(tbl, "alias", &conf.Alias)
c.getFieldString(tbl, "log_level", &conf.LogLevel)

conf.Tags = make(map[string]string)
if node, ok := tbl.Fields["tags"]; ok {
Expand Down Expand Up @@ -1369,6 +1369,7 @@ func (c *Config) buildProcessor(category, name string, tbl *ast.Table) (*models.

c.getFieldInt64(tbl, "order", &conf.Order)
c.getFieldString(tbl, "alias", &conf.Alias)
c.getFieldString(tbl, "log_level", &conf.LogLevel)

if c.hasErrs() {
return nil, c.firstErr()
Expand Down Expand Up @@ -1478,6 +1479,7 @@ func (c *Config) buildInput(name string, tbl *ast.Table) (*models.InputConfig, e
c.getFieldString(tbl, "name_suffix", &cp.MeasurementSuffix)
c.getFieldString(tbl, "name_override", &cp.NameOverride)
c.getFieldString(tbl, "alias", &cp.Alias)
c.getFieldString(tbl, "log_level", &cp.LogLevel)

cp.Tags = make(map[string]string)
if node, ok := tbl.Fields["tags"]; ok {
Expand Down Expand Up @@ -1523,14 +1525,14 @@ func (c *Config) buildOutput(name string, tbl *ast.Table) (*models.OutputConfig,

c.getFieldDuration(tbl, "flush_interval", &oc.FlushInterval)
c.getFieldDuration(tbl, "flush_jitter", &oc.FlushJitter)

c.getFieldInt(tbl, "metric_buffer_limit", &oc.MetricBufferLimit)
c.getFieldInt(tbl, "metric_batch_size", &oc.MetricBatchSize)
c.getFieldString(tbl, "alias", &oc.Alias)
c.getFieldString(tbl, "name_override", &oc.NameOverride)
c.getFieldString(tbl, "name_suffix", &oc.NameSuffix)
c.getFieldString(tbl, "name_prefix", &oc.NamePrefix)
c.getFieldString(tbl, "startup_error_behavior", &oc.StartupErrorBehavior)
c.getFieldString(tbl, "log_level", &oc.LogLevel)

if c.hasErrs() {
return nil, c.firstErr()
Expand All @@ -1555,7 +1557,7 @@ func (c *Config) missingTomlField(_ reflect.Type, key string) error {
"fielddrop", "fieldexclude", "fieldinclude", "fieldpass", "flush_interval", "flush_jitter",
"grace",
"interval",
"lvm", // What is this used for?
"log_level", "lvm", // What is this used for?
"metric_batch_size", "metric_buffer_limit", "metricpass",
"name_override", "name_prefix", "name_suffix", "namedrop", "namedrop_separator", "namepass", "namepass_separator",
"order",
Expand Down
16 changes: 8 additions & 8 deletions docs/CONFIGURATION.md
Original file line number Diff line number Diff line change
Expand Up @@ -385,38 +385,32 @@ driven operation.
Parameters that can be used with any input plugin:

- **alias**: Name an instance of a plugin.

- **interval**:
Overrides the `interval` setting of the [agent][Agent] for the plugin. How
often to gather this metric. Normal plugins use a single global interval, but
if one particular input should be run less or more often, you can configure
that here.

- **precision**:
Overrides the `precision` setting of the [agent][Agent] for the plugin.
Collected metrics are rounded to the precision specified as an [interval][].

When this value is set on a service input, multiple events occurring at the
same timestamp may be merged by the output database.

- **collection_jitter**:
Overrides the `collection_jitter` setting of the [agent][Agent] for the
plugin. Collection jitter is used to jitter the collection by a random
[interval][]. The value must be non-zero to override the agent setting.

- **collection_offset**:
Overrides the `collection_offset` setting of the [agent][Agent] for the
plugin. Collection offset is used to shift the collection by the given
[interval][]. The value must be non-zero to override the agent setting.

- **name_override**: Override the base name of the measurement. (Default is
the name of the input).

- **name_prefix**: Specifies a prefix to attach to the measurement name.

- **name_suffix**: Specifies a suffix to attach to the measurement name.

- **tags**: A map of tags to apply to a specific input's measurements.
- **log_level**: Override the log-level for this plugin. Possible values are
`error`, `warn`, `info` and `debug`.

The [metric filtering][] parameters can be used to limit what metrics are
emitted from the input plugin.
Expand Down Expand Up @@ -502,6 +496,8 @@ Parameters that can be used with any output plugin:
- **name_override**: Override the original name of the measurement.
- **name_prefix**: Specifies a prefix to attach to the measurement name.
- **name_suffix**: Specifies a suffix to attach to the measurement name.
- **log_level**: Override the log-level for this plugin. Possible values are
`error`, `warn`, `info` and `debug`.

The [metric filtering][] parameters can be used to limit what metrics are
emitted from the output plugin.
Expand Down Expand Up @@ -540,6 +536,8 @@ Parameters that can be used with any processor plugin:
If this is not specified then processor execution order will be the order in
the config. Processors without "order" will take precedence over those
with a defined order.
- **log_level**: Override the log-level for this plugin. Possible values are
`error`, `warn`, `info` and `debug`.

The [metric filtering][] parameters can be used to limit what metrics are
handled by the processor. Excluded metrics are passed downstream to the next
Expand Down Expand Up @@ -592,6 +590,8 @@ Parameters that can be used with any aggregator plugin:
- **name_prefix**: Specifies a prefix to attach to the measurement name.
- **name_suffix**: Specifies a suffix to attach to the measurement name.
- **tags**: A map of tags to apply to the measurement - behavior varies based on aggregator.
- **log_level**: Override the log-level for this plugin. Possible values are
`error`, `warn`, `info` and `debug`.

The [metric filtering][] parameters can be used to limit what metrics are
handled by the aggregator. Excluded metrics are passed downstream to the next
Expand Down
14 changes: 14 additions & 0 deletions logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,20 @@ const (
Debug
)

func LogLevelFromString(name string) LogLevel {
switch name {
case "ERROR", "error":
return Error
case "WARN", "warn":
return Warn
case "INFO", "info":
return Info
case "DEBUG", "debug":
return Debug
}
return None
}

func (e LogLevel) String() string {
switch e {
case Error:
Expand Down
13 changes: 13 additions & 0 deletions logger/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,19 @@ func New(category, name, alias string) *logger {
return l
}

// SetLevel changes the log-level to the given one
func (l *logger) SetLogLevel(name string) error {
if name == "" {
return nil
}
level := telegraf.LogLevelFromString(name)
if level == telegraf.None {
return fmt.Errorf("invalid log-level %q", name)
}
l.level = &level
return nil
}

// SubLogger creates a new logger with the given name added as suffix
func (l *logger) SubLogger(name string) telegraf.Logger {
suffix := l.suffix
Expand Down
5 changes: 4 additions & 1 deletion models/running_aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,9 @@ func NewRunningAggregator(aggregator telegraf.Aggregator, config *AggregatorConf
logger.RegisterErrorCallback(func() {
aggErrorsRegister.Incr(1)
})

if err := logger.SetLogLevel(config.LogLevel); err != nil {
logger.Error(err)
}
SetLoggerOnPlugin(aggregator, logger)

return &RunningAggregator{
Expand Down Expand Up @@ -74,6 +76,7 @@ type AggregatorConfig struct {
Period time.Duration
Delay time.Duration
Grace time.Duration
LogLevel string

NameOverride string
MeasurementPrefix string
Expand Down
4 changes: 4 additions & 0 deletions models/running_input.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ func NewRunningInput(input telegraf.Input, config *InputConfig) *RunningInput {
inputErrorsRegister.Incr(1)
GlobalGatherErrors.Incr(1)
})
if err := logger.SetLogLevel(config.LogLevel); err != nil {
logger.Error(err)
}
SetLoggerOnPlugin(input, logger)

return &RunningInput{
Expand Down Expand Up @@ -85,6 +88,7 @@ type InputConfig struct {
CollectionOffset time.Duration
Precision time.Duration
StartupErrorBehavior string
LogLevel string

NameOverride string
MeasurementPrefix string
Expand Down
5 changes: 5 additions & 0 deletions models/running_output.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ type OutputConfig struct {

BufferStrategy string
BufferDirectory string

LogLevel string
}

// RunningOutput contains the output configuration
Expand Down Expand Up @@ -84,6 +86,9 @@ func NewRunningOutput(
logger.RegisterErrorCallback(func() {
writeErrorsRegister.Incr(1)
})
if err := logger.SetLogLevel(config.LogLevel); err != nil {
logger.Error(err)
}
SetLoggerOnPlugin(output, logger)

if config.MetricBufferLimit > 0 {
Expand Down
4 changes: 4 additions & 0 deletions models/running_parsers.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ func NewRunningParser(parser telegraf.Parser, config *ParserConfig) *RunningPars
logger.RegisterErrorCallback(func() {
parserErrorsRegister.Incr(1)
})
if err := logger.SetLogLevel(config.LogLevel); err != nil {
logger.Error(err)
}
SetLoggerOnPlugin(parser, logger)

return &RunningParser{
Expand All @@ -53,6 +56,7 @@ type ParserConfig struct {
Alias string
DataFormat string
DefaultTags map[string]string
LogLevel string
}

func (r *RunningParser) LogName() string {
Expand Down
14 changes: 9 additions & 5 deletions models/running_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,12 @@ func (rp RunningProcessors) Less(i, j int) bool { return rp[i].Config.Order < rp

// ProcessorConfig containing a name and filter
type ProcessorConfig struct {
Name string
Alias string
ID string
Order int64
Filter Filter
Name string
Alias string
ID string
Order int64
Filter Filter
LogLevel string
}

func NewRunningProcessor(processor telegraf.StreamingProcessor, config *ProcessorConfig) *RunningProcessor {
Expand All @@ -41,6 +42,9 @@ func NewRunningProcessor(processor telegraf.StreamingProcessor, config *Processo
logger.RegisterErrorCallback(func() {
processErrorsRegister.Incr(1)
})
if err := logger.SetLogLevel(config.LogLevel); err != nil {
logger.Error(err)
}
SetLoggerOnPlugin(processor, logger)

return &RunningProcessor{
Expand Down
4 changes: 4 additions & 0 deletions models/running_serializer.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ type SerializerConfig struct {
Alias string
DataFormat string
DefaultTags map[string]string
LogLevel string
}

type RunningSerializer struct {
Expand All @@ -38,6 +39,9 @@ func NewRunningSerializer(serializer serializers.Serializer, config *SerializerC
logger.RegisterErrorCallback(func() {
serializerErrorsRegister.Incr(1)
})
if err := logger.SetLogLevel(config.LogLevel); err != nil {
logger.Error(err)
}
SetLoggerOnPlugin(serializer, logger)

return &RunningSerializer{
Expand Down

0 comments on commit f9f029e

Please sign in to comment.