Skip to content

Commit

Permalink
Convert kafka debug-logging to trace level
Browse files Browse the repository at this point in the history
  • Loading branch information
srebhan committed Jul 31, 2024
1 parent 031a225 commit 6291a3d
Show file tree
Hide file tree
Showing 5 changed files with 54 additions and 50 deletions.
31 changes: 18 additions & 13 deletions logger/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,19 +55,6 @@ func New(category, name, alias string) *logger {
return l
}

// SetLevel changes the log-level to the given one
func (l *logger) SetLogLevel(name string) error {
if name == "" {
return nil
}
level := telegraf.LogLevelFromString(name)
if level == telegraf.None {
return fmt.Errorf("invalid log-level %q", name)
}
l.level = &level
return nil
}

// SubLogger creates a new logger with the given name added as suffix
func (l *logger) SubLogger(name string) telegraf.Logger {
suffix := l.suffix
Expand Down Expand Up @@ -118,6 +105,24 @@ func (l *logger) Level() telegraf.LogLevel {
return instance.level
}

// SetLevel overrides the current log-level of the logger
func (l *logger) SetLevel(level telegraf.LogLevel) {
l.level = &level
}

// SetLevel changes the log-level to the given one
func (l *logger) SetLogLevel(name string) error {
if name == "" {
return nil
}
level := telegraf.LogLevelFromString(name)
if level == telegraf.None {
return fmt.Errorf("invalid log-level %q", name)
}
l.SetLevel(level)
return nil
}

// Register a callback triggered when errors are about to be written to the log
func (l *logger) RegisterErrorCallback(f func()) {
l.onError = append(l.onError, f)
Expand Down
34 changes: 21 additions & 13 deletions plugins/common/kafka/logger.go
Original file line number Diff line number Diff line change
@@ -1,33 +1,41 @@
package kafka

import (
"sync"

"github.com/IBM/sarama"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/logger"
)

type Logger struct {
}
var (
log = logger.New("sarama", "", "")
once sync.Once
)

// DebugLogger logs messages from sarama at the debug level.
type DebugLogger struct {
Log telegraf.Logger
}
type debugLogger struct{}

func (l *DebugLogger) Print(v ...interface{}) {
l.Log.Debug(v...)
func (l *debugLogger) Print(v ...interface{}) {
log.Trace(v...)
}

func (l *DebugLogger) Printf(format string, v ...interface{}) {
l.Log.Debugf(format, v...)
func (l *debugLogger) Printf(format string, v ...interface{}) {
log.Tracef(format, v...)
}

func (l *DebugLogger) Println(v ...interface{}) {
func (l *debugLogger) Println(v ...interface{}) {
l.Print(v...)
}

// SetLogger configures a debug logger for kafka (sarama)
func (k *Logger) SetLogger() {
sarama.Logger = &DebugLogger{Log: logger.New("sarama", "", "")}
func SetLogger(level telegraf.LogLevel) {
// Set-up the sarama logger only once
once.Do(func() {
sarama.Logger = &debugLogger{}
})
// Increase the log-level if needed.
if !log.Level().Includes(level) {
log.SetLevel(level)
}
}
7 changes: 2 additions & 5 deletions plugins/inputs/kafka_consumer/kafka_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,8 @@ type KafkaConsumer struct {
ConsumerFetchDefault config.Size `toml:"consumer_fetch_default"`
ConnectionStrategy string `toml:"connection_strategy"`
ResolveCanonicalBootstrapServersOnly bool `toml:"resolve_canonical_bootstrap_servers_only"`

Log telegraf.Logger `toml:"-"`
kafka.ReadConfig
kafka.Logger

Log telegraf.Logger `toml:"-"`

ConsumerCreator ConsumerGroupCreator `toml:"-"`
consumer ConsumerGroup
Expand Down Expand Up @@ -99,7 +96,7 @@ func (k *KafkaConsumer) SetParser(parser telegraf.Parser) {
}

func (k *KafkaConsumer) Init() error {
k.SetLogger()
kafka.SetLogger(k.Log.Level())

if k.MaxUndeliveredMessages == 0 {
k.MaxUndeliveredMessages = defaultMaxUndeliveredMessages
Expand Down
2 changes: 1 addition & 1 deletion plugins/inputs/kafka_consumer/kafka_consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func TestInit(t *testing.T) {
}{
{
name: "default config",
plugin: &KafkaConsumer{},
plugin: &KafkaConsumer{Log: testutil.Logger{}},
check: func(t *testing.T, plugin *KafkaConsumer) {
require.Equal(t, defaultConsumerGroup, plugin.ConsumerGroup)
require.Equal(t, defaultMaxUndeliveredMessages, plugin.MaxUndeliveredMessages)
Expand Down
30 changes: 12 additions & 18 deletions plugins/outputs/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,17 @@ var ValidTopicSuffixMethods = []string{
var zeroTime = time.Unix(0, 0)

type Kafka struct {
Brokers []string `toml:"brokers"`
Topic string `toml:"topic"`
TopicTag string `toml:"topic_tag"`
ExcludeTopicTag bool `toml:"exclude_topic_tag"`
TopicSuffix TopicSuffix `toml:"topic_suffix"`
RoutingTag string `toml:"routing_tag"`
RoutingKey string `toml:"routing_key"`
ProducerTimestamp string `toml:"producer_timestamp"`

Brokers []string `toml:"brokers"`
Topic string `toml:"topic"`
TopicTag string `toml:"topic_tag"`
ExcludeTopicTag bool `toml:"exclude_topic_tag"`
TopicSuffix TopicSuffix `toml:"topic_suffix"`
RoutingTag string `toml:"routing_tag"`
RoutingKey string `toml:"routing_key"`
ProducerTimestamp string `toml:"producer_timestamp"`
Log telegraf.Logger `toml:"-"`
proxy.Socks5ProxyConfig
kafka.WriteConfig

// Legacy TLS config options
// TLS client certificate
Expand All @@ -50,12 +51,6 @@ type Kafka struct {
// TLS certificate authority
CA string

kafka.WriteConfig

kafka.Logger

Log telegraf.Logger `toml:"-"`

saramaConfig *sarama.Config
producerFunc func(addrs []string, config *sarama.Config) (sarama.SyncProducer, error)
producer sarama.SyncProducer
Expand Down Expand Up @@ -123,10 +118,9 @@ func (k *Kafka) SetSerializer(serializer serializers.Serializer) {
}

func (k *Kafka) Init() error {
k.SetLogger()
kafka.SetLogger(k.Log.Level())

err := ValidateTopicSuffixMethod(k.TopicSuffix.Method)
if err != nil {
if err := ValidateTopicSuffixMethod(k.TopicSuffix.Method); err != nil {
return err
}
config := sarama.NewConfig()
Expand Down

0 comments on commit 6291a3d

Please sign in to comment.