Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(plugins): Allow to override log-level per plugin #15677

Merged
merged 2 commits into from
Jul 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 29 additions & 27 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -971,26 +971,30 @@ func (c *Config) probeParser(parentcategory string, parentname string, table *as
}

func (c *Config) addParser(parentcategory, parentname string, table *ast.Table) (*models.RunningParser, error) {
var dataformat string
c.getFieldString(table, "data_format", &dataformat)
if dataformat == "" {
dataformat = setDefaultParser(parentcategory, parentname)
conf := &models.ParserConfig{
Parent: parentname,
}

var influxParserType string
c.getFieldString(table, "influx_parser_type", &influxParserType)
if dataformat == "influx" && influxParserType == "upstream" {
dataformat = "influx_upstream"
c.getFieldString(table, "data_format", &conf.DataFormat)
if conf.DataFormat == "" {
conf.DataFormat = setDefaultParser(parentcategory, parentname)
} else if conf.DataFormat == "influx" {
var influxParserType string
c.getFieldString(table, "influx_parser_type", &influxParserType)
if influxParserType == "upstream" {
conf.DataFormat = "influx_upstream"
}
}
c.getFieldString(table, "log_level", &conf.LogLevel)

creator, ok := parsers.Parsers[dataformat]
creator, ok := parsers.Parsers[conf.DataFormat]
if !ok {
return nil, fmt.Errorf("undefined but requested parser: %s", dataformat)
return nil, fmt.Errorf("undefined but requested parser: %s", conf.DataFormat)
}
parser := creator(parentname)

// Handle reset-mode of CSV parsers to stay backward compatible (see issue #12022)
if dataformat == "csv" && parentcategory == "inputs" {
if conf.DataFormat == "csv" && parentcategory == "inputs" {
if parentname == "exec" {
csvParser := parser.(*csv.Parser)
csvParser.ResetMode = "always"
Expand All @@ -1001,36 +1005,31 @@ func (c *Config) addParser(parentcategory, parentname string, table *ast.Table)
return nil, err
}

conf := &models.ParserConfig{
Parent: parentname,
DataFormat: dataformat,
}
running := models.NewRunningParser(parser, conf)
err := running.Init()
return running, err
}

func (c *Config) addSerializer(parentname string, table *ast.Table) (*models.RunningSerializer, error) {
var dataformat string
c.getFieldString(table, "data_format", &dataformat)
if dataformat == "" {
dataformat = "influx"
conf := &models.SerializerConfig{
Parent: parentname,
}
c.getFieldString(table, "data_format", &conf.DataFormat)
if conf.DataFormat == "" {
conf.DataFormat = "influx"
}
c.getFieldString(table, "log_level", &conf.LogLevel)

creator, ok := serializers.Serializers[dataformat]
creator, ok := serializers.Serializers[conf.DataFormat]
if !ok {
return nil, fmt.Errorf("undefined but requested serializer: %s", dataformat)
return nil, fmt.Errorf("undefined but requested serializer: %s", conf.DataFormat)
}
serializer := creator()

if err := c.toml.UnmarshalTable(table, serializer); err != nil {
return nil, err
}

conf := &models.SerializerConfig{
Parent: parentname,
DataFormat: dataformat,
}
running := models.NewRunningSerializer(serializer, conf)
err := running.Init()
return running, err
Expand Down Expand Up @@ -1336,6 +1335,7 @@ func (c *Config) buildAggregator(name string, tbl *ast.Table) (*models.Aggregato
c.getFieldString(tbl, "name_suffix", &conf.MeasurementSuffix)
c.getFieldString(tbl, "name_override", &conf.NameOverride)
c.getFieldString(tbl, "alias", &conf.Alias)
c.getFieldString(tbl, "log_level", &conf.LogLevel)

conf.Tags = make(map[string]string)
if node, ok := tbl.Fields["tags"]; ok {
Expand Down Expand Up @@ -1369,6 +1369,7 @@ func (c *Config) buildProcessor(category, name string, tbl *ast.Table) (*models.

c.getFieldInt64(tbl, "order", &conf.Order)
c.getFieldString(tbl, "alias", &conf.Alias)
c.getFieldString(tbl, "log_level", &conf.LogLevel)

if c.hasErrs() {
return nil, c.firstErr()
Expand Down Expand Up @@ -1478,6 +1479,7 @@ func (c *Config) buildInput(name string, tbl *ast.Table) (*models.InputConfig, e
c.getFieldString(tbl, "name_suffix", &cp.MeasurementSuffix)
c.getFieldString(tbl, "name_override", &cp.NameOverride)
c.getFieldString(tbl, "alias", &cp.Alias)
c.getFieldString(tbl, "log_level", &cp.LogLevel)

cp.Tags = make(map[string]string)
if node, ok := tbl.Fields["tags"]; ok {
Expand Down Expand Up @@ -1523,14 +1525,14 @@ func (c *Config) buildOutput(name string, tbl *ast.Table) (*models.OutputConfig,

c.getFieldDuration(tbl, "flush_interval", &oc.FlushInterval)
c.getFieldDuration(tbl, "flush_jitter", &oc.FlushJitter)

c.getFieldInt(tbl, "metric_buffer_limit", &oc.MetricBufferLimit)
c.getFieldInt(tbl, "metric_batch_size", &oc.MetricBatchSize)
c.getFieldString(tbl, "alias", &oc.Alias)
c.getFieldString(tbl, "name_override", &oc.NameOverride)
c.getFieldString(tbl, "name_suffix", &oc.NameSuffix)
c.getFieldString(tbl, "name_prefix", &oc.NamePrefix)
c.getFieldString(tbl, "startup_error_behavior", &oc.StartupErrorBehavior)
c.getFieldString(tbl, "log_level", &oc.LogLevel)

if c.hasErrs() {
return nil, c.firstErr()
Expand All @@ -1555,7 +1557,7 @@ func (c *Config) missingTomlField(_ reflect.Type, key string) error {
"fielddrop", "fieldexclude", "fieldinclude", "fieldpass", "flush_interval", "flush_jitter",
"grace",
"interval",
"lvm", // What is this used for?
"log_level", "lvm", // What is this used for?
"metric_batch_size", "metric_buffer_limit", "metricpass",
"name_override", "name_prefix", "name_suffix", "namedrop", "namedrop_separator", "namepass", "namepass_separator",
"order",
Expand Down
16 changes: 8 additions & 8 deletions docs/CONFIGURATION.md
Original file line number Diff line number Diff line change
Expand Up @@ -385,38 +385,32 @@ driven operation.
Parameters that can be used with any input plugin:

- **alias**: Name an instance of a plugin.

- **interval**:
Overrides the `interval` setting of the [agent][Agent] for the plugin. How
often to gather this metric. Normal plugins use a single global interval, but
if one particular input should be run less or more often, you can configure
that here.

- **precision**:
Overrides the `precision` setting of the [agent][Agent] for the plugin.
Collected metrics are rounded to the precision specified as an [interval][].

When this value is set on a service input, multiple events occurring at the
same timestamp may be merged by the output database.

- **collection_jitter**:
Overrides the `collection_jitter` setting of the [agent][Agent] for the
plugin. Collection jitter is used to jitter the collection by a random
[interval][]. The value must be non-zero to override the agent setting.

- **collection_offset**:
Overrides the `collection_offset` setting of the [agent][Agent] for the
plugin. Collection offset is used to shift the collection by the given
[interval][]. The value must be non-zero to override the agent setting.

- **name_override**: Override the base name of the measurement. (Default is
the name of the input).

- **name_prefix**: Specifies a prefix to attach to the measurement name.

- **name_suffix**: Specifies a suffix to attach to the measurement name.

- **tags**: A map of tags to apply to a specific input's measurements.
- **log_level**: Override the log-level for this plugin. Possible values are
`error`, `warn`, `info` and `debug`.

The [metric filtering][] parameters can be used to limit what metrics are
emitted from the input plugin.
Expand Down Expand Up @@ -502,6 +496,8 @@ Parameters that can be used with any output plugin:
- **name_override**: Override the original name of the measurement.
- **name_prefix**: Specifies a prefix to attach to the measurement name.
- **name_suffix**: Specifies a suffix to attach to the measurement name.
- **log_level**: Override the log-level for this plugin. Possible values are
`error`, `warn`, `info` and `debug`.

The [metric filtering][] parameters can be used to limit what metrics are
emitted from the output plugin.
Expand Down Expand Up @@ -540,6 +536,8 @@ Parameters that can be used with any processor plugin:
If this is not specified then processor execution order will be the order in
the config. Processors without "order" will take precedence over those
with a defined order.
- **log_level**: Override the log-level for this plugin. Possible values are
`error`, `warn`, `info` and `debug`.

The [metric filtering][] parameters can be used to limit what metrics are
handled by the processor. Excluded metrics are passed downstream to the next
Expand Down Expand Up @@ -592,6 +590,8 @@ Parameters that can be used with any aggregator plugin:
- **name_prefix**: Specifies a prefix to attach to the measurement name.
- **name_suffix**: Specifies a suffix to attach to the measurement name.
- **tags**: A map of tags to apply to the measurement - behavior varies based on aggregator.
- **log_level**: Override the log-level for this plugin. Possible values are
`error`, `warn`, `info` and `debug`.

The [metric filtering][] parameters can be used to limit what metrics are
handled by the aggregator. Excluded metrics are passed downstream to the next
Expand Down
14 changes: 14 additions & 0 deletions logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,20 @@ const (
Debug
)

func LogLevelFromString(name string) LogLevel {
switch name {
case "ERROR", "error":
return Error
case "WARN", "warn":
return Warn
case "INFO", "info":
return Info
case "DEBUG", "debug":
return Debug
}
return None
}

func (e LogLevel) String() string {
switch e {
case Error:
Expand Down
13 changes: 13 additions & 0 deletions logger/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,19 @@ func New(category, name, alias string) *logger {
return l
}

// SetLevel changes the log-level to the given one
func (l *logger) SetLogLevel(name string) error {
if name == "" {
return nil
}
level := telegraf.LogLevelFromString(name)
if level == telegraf.None {
return fmt.Errorf("invalid log-level %q", name)
}
l.level = &level
return nil
}

// SubLogger creates a new logger with the given name added as suffix
func (l *logger) SubLogger(name string) telegraf.Logger {
suffix := l.suffix
Expand Down
5 changes: 4 additions & 1 deletion models/running_aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,9 @@ func NewRunningAggregator(aggregator telegraf.Aggregator, config *AggregatorConf
logger.RegisterErrorCallback(func() {
aggErrorsRegister.Incr(1)
})

if err := logger.SetLogLevel(config.LogLevel); err != nil {
logger.Error(err)
}
SetLoggerOnPlugin(aggregator, logger)

return &RunningAggregator{
Expand Down Expand Up @@ -74,6 +76,7 @@ type AggregatorConfig struct {
Period time.Duration
Delay time.Duration
Grace time.Duration
LogLevel string

NameOverride string
MeasurementPrefix string
Expand Down
4 changes: 4 additions & 0 deletions models/running_input.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ func NewRunningInput(input telegraf.Input, config *InputConfig) *RunningInput {
inputErrorsRegister.Incr(1)
GlobalGatherErrors.Incr(1)
})
if err := logger.SetLogLevel(config.LogLevel); err != nil {
logger.Error(err)
}
SetLoggerOnPlugin(input, logger)

return &RunningInput{
Expand Down Expand Up @@ -85,6 +88,7 @@ type InputConfig struct {
CollectionOffset time.Duration
Precision time.Duration
StartupErrorBehavior string
LogLevel string

NameOverride string
MeasurementPrefix string
Expand Down
5 changes: 5 additions & 0 deletions models/running_output.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ type OutputConfig struct {

BufferStrategy string
BufferDirectory string

LogLevel string
}

// RunningOutput contains the output configuration
Expand Down Expand Up @@ -84,6 +86,9 @@ func NewRunningOutput(
logger.RegisterErrorCallback(func() {
writeErrorsRegister.Incr(1)
})
if err := logger.SetLogLevel(config.LogLevel); err != nil {
logger.Error(err)
}
SetLoggerOnPlugin(output, logger)

if config.MetricBufferLimit > 0 {
Expand Down
4 changes: 4 additions & 0 deletions models/running_parsers.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ func NewRunningParser(parser telegraf.Parser, config *ParserConfig) *RunningPars
logger.RegisterErrorCallback(func() {
parserErrorsRegister.Incr(1)
})
if err := logger.SetLogLevel(config.LogLevel); err != nil {
logger.Error(err)
}
SetLoggerOnPlugin(parser, logger)

return &RunningParser{
Expand All @@ -53,6 +56,7 @@ type ParserConfig struct {
Alias string
DataFormat string
DefaultTags map[string]string
LogLevel string
}

func (r *RunningParser) LogName() string {
Expand Down
14 changes: 9 additions & 5 deletions models/running_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,12 @@ func (rp RunningProcessors) Less(i, j int) bool { return rp[i].Config.Order < rp

// ProcessorConfig containing a name and filter
type ProcessorConfig struct {
Name string
Alias string
ID string
Order int64
Filter Filter
Name string
Alias string
ID string
Order int64
Filter Filter
LogLevel string
}

func NewRunningProcessor(processor telegraf.StreamingProcessor, config *ProcessorConfig) *RunningProcessor {
Expand All @@ -41,6 +42,9 @@ func NewRunningProcessor(processor telegraf.StreamingProcessor, config *Processo
logger.RegisterErrorCallback(func() {
processErrorsRegister.Incr(1)
})
if err := logger.SetLogLevel(config.LogLevel); err != nil {
logger.Error(err)
}
SetLoggerOnPlugin(processor, logger)

return &RunningProcessor{
Expand Down
4 changes: 4 additions & 0 deletions models/running_serializer.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ type SerializerConfig struct {
Alias string
DataFormat string
DefaultTags map[string]string
LogLevel string
}

type RunningSerializer struct {
Expand All @@ -38,6 +39,9 @@ func NewRunningSerializer(serializer serializers.Serializer, config *SerializerC
logger.RegisterErrorCallback(func() {
serializerErrorsRegister.Incr(1)
})
if err := logger.SetLogLevel(config.LogLevel); err != nil {
logger.Error(err)
}
SetLoggerOnPlugin(serializer, logger)

return &RunningSerializer{
Expand Down
Loading