diff --git a/.chloggen/confmap-logger-wrapper-idea.yaml b/.chloggen/confmap-logger-wrapper-idea.yaml new file mode 100644 index 00000000000..ff35e92c2a3 --- /dev/null +++ b/.chloggen/confmap-logger-wrapper-idea.yaml @@ -0,0 +1,25 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver) +component: otelcol + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Enable logging during configuration resolution + +# One or more tracking issues or pull requests related to the change +issues: [10056] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] diff --git a/otelcol/buffered_core.go b/otelcol/buffered_core.go new file mode 100644 index 00000000000..447bf13cb52 --- /dev/null +++ b/otelcol/buffered_core.go @@ -0,0 +1,81 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +// This logger implements zapcore.Core and is based on zaptest/observer. + +package otelcol // import "go.opentelemetry.io/collector/otelcol" + +import ( + "fmt" + "sync" + + "go.uber.org/zap/zapcore" +) + +type loggedEntry struct { + zapcore.Entry + Context []zapcore.Field +} + +func newBufferedCore(enab zapcore.LevelEnabler) *bufferedCore { + return &bufferedCore{LevelEnabler: enab} +} + +var _ zapcore.Core = (*bufferedCore)(nil) + +type bufferedCore struct { + zapcore.LevelEnabler + mu sync.RWMutex + logs []loggedEntry + context []zapcore.Field + logsTaken bool +} + +func (bc *bufferedCore) Level() zapcore.Level { + return zapcore.LevelOf(bc.LevelEnabler) +} + +func (bc *bufferedCore) Check(ent zapcore.Entry, ce *zapcore.CheckedEntry) *zapcore.CheckedEntry { + if bc.Enabled(ent.Level) { + return ce.AddCore(ent, bc) + } + return ce +} + +func (bc *bufferedCore) With(fields []zapcore.Field) zapcore.Core { + return &bufferedCore{ + LevelEnabler: bc.LevelEnabler, + logs: bc.logs, + logsTaken: bc.logsTaken, + context: append(bc.context, fields...), + } +} + +func (bc *bufferedCore) Write(ent zapcore.Entry, fields []zapcore.Field) error { + bc.mu.Lock() + defer bc.mu.Unlock() + if bc.logsTaken { + return fmt.Errorf("the buffered logs have already been taken so writing is no longer supported") + } + all := make([]zapcore.Field, 0, len(fields)+len(bc.context)) + all = append(all, bc.context...) + all = append(all, fields...) + bc.logs = append(bc.logs, loggedEntry{ent, all}) + return nil +} + +func (bc *bufferedCore) Sync() error { + return nil +} + +func (bc *bufferedCore) TakeLogs() []loggedEntry { + if !bc.logsTaken { + bc.mu.Lock() + defer bc.mu.Unlock() + logs := bc.logs + bc.logs = nil + bc.logsTaken = true + return logs + } + return nil +} diff --git a/otelcol/buffered_core_test.go b/otelcol/buffered_core_test.go new file mode 100644 index 00000000000..6417b8a1d9d --- /dev/null +++ b/otelcol/buffered_core_test.go @@ -0,0 +1,107 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package otelcol + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.uber.org/zap/zapcore" +) + +func Test_bufferedCore_Level(t *testing.T) { + bc := newBufferedCore(zapcore.InfoLevel) + assert.Equal(t, zapcore.InfoLevel, bc.Level()) +} + +func Test_bufferedCore_Check(t *testing.T) { + t.Run("check passed", func(t *testing.T) { + bc := newBufferedCore(zapcore.InfoLevel) + e := zapcore.Entry{ + Level: zapcore.InfoLevel, + } + expected := &zapcore.CheckedEntry{} + expected = expected.AddCore(e, bc) + ce := bc.Check(e, nil) + assert.Equal(t, expected, ce) + }) + + t.Run("check did not pass", func(t *testing.T) { + bc := newBufferedCore(zapcore.InfoLevel) + e := zapcore.Entry{ + Level: zapcore.DebugLevel, + } + ce := bc.Check(e, nil) + assert.Nil(t, ce) + }) +} + +func Test_bufferedCore_With(t *testing.T) { + bc := newBufferedCore(zapcore.InfoLevel) + bc.logsTaken = true + bc.context = []zapcore.Field{ + {Key: "original", String: "context"}, + } + inputs := []zapcore.Field{ + {Key: "test", String: "passed"}, + } + expected := []zapcore.Field{ + {Key: "original", String: "context"}, + {Key: "test", String: "passed"}, + } + newBC := bc.With(inputs) + assert.Equal(t, expected, newBC.(*bufferedCore).context) + assert.True(t, newBC.(*bufferedCore).logsTaken) +} + +func Test_bufferedCore_Write(t *testing.T) { + bc := newBufferedCore(zapcore.InfoLevel) + e := zapcore.Entry{ + Level: zapcore.DebugLevel, + Message: "test", + } + fields := []zapcore.Field{ + {Key: "field1", String: "value1"}, + } + err := bc.Write(e, fields) + require.NoError(t, err) + + expected := loggedEntry{ + e, + fields, + } + require.Equal(t, 1, len(bc.logs)) + require.Equal(t, expected, bc.logs[0]) +} + +func Test_bufferedCore_Sync(t *testing.T) { + bc := newBufferedCore(zapcore.InfoLevel) + assert.NoError(t, bc.Sync()) +} + +func Test_bufferedCore_TakeLogs(t *testing.T) { + bc := newBufferedCore(zapcore.InfoLevel) + e := zapcore.Entry{ + Level: zapcore.DebugLevel, + Message: "test", + } + fields := []zapcore.Field{ + {Key: "field1", String: "value1"}, + } + err := bc.Write(e, fields) + require.NoError(t, err) + + expected := []loggedEntry{ + { + e, + fields, + }, + } + assert.Equal(t, expected, bc.TakeLogs()) + assert.Nil(t, bc.logs) + + assert.Error(t, bc.Write(e, fields)) + assert.Nil(t, bc.TakeLogs()) +} diff --git a/otelcol/collector.go b/otelcol/collector.go index bca6e9b86dc..fb36217cb11 100644 --- a/otelcol/collector.go +++ b/otelcol/collector.go @@ -7,6 +7,7 @@ package otelcol // import "go.opentelemetry.io/collector/otelcol" import ( "context" + "errors" "fmt" "os" "os/signal" @@ -15,6 +16,7 @@ import ( "go.uber.org/multierr" "go.uber.org/zap" + "go.uber.org/zap/zapcore" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/confmap" @@ -108,7 +110,9 @@ type Collector struct { // signalsChannel is used to receive termination signals from the OS. signalsChannel chan os.Signal // asyncErrorChannel is used to signal a fatal error from any component. - asyncErrorChannel chan error + asyncErrorChannel chan error + bc *bufferedCore + updateConfigProviderLogger func(core zapcore.Core) } // NewCollector creates and returns a new instance of Collector. @@ -116,7 +120,10 @@ func NewCollector(set CollectorSettings) (*Collector, error) { var err error configProvider := set.ConfigProvider - set.ConfigProviderSettings.ResolverSettings.ProviderSettings = confmap.ProviderSettings{Logger: zap.NewNop()} + bc := newBufferedCore(zapcore.DebugLevel) + cc := &collectorCore{core: bc} + options := append([]zap.Option{zap.WithCaller(true)}, set.LoggingOptions...) + set.ConfigProviderSettings.ResolverSettings.ProviderSettings = confmap.ProviderSettings{Logger: zap.New(cc, options...)} set.ConfigProviderSettings.ResolverSettings.ConverterSettings = confmap.ConverterSettings{} if configProvider == nil { @@ -134,9 +141,11 @@ func NewCollector(set CollectorSettings) (*Collector, error) { shutdownChan: make(chan struct{}), // Per signal.Notify documentation, a size of the channel equaled with // the number of signals getting notified on is recommended. - signalsChannel: make(chan os.Signal, 3), - asyncErrorChannel: make(chan error), - configProvider: configProvider, + signalsChannel: make(chan os.Signal, 3), + asyncErrorChannel: make(chan error), + configProvider: configProvider, + bc: bc, + updateConfigProviderLogger: cc.SetCore, }, nil } @@ -202,6 +211,18 @@ func (col *Collector) setupConfigurationComponents(ctx context.Context) error { if err != nil { return err } + if col.updateConfigProviderLogger != nil { + col.updateConfigProviderLogger(col.service.Logger().Core()) + } + if col.bc != nil { + x := col.bc.TakeLogs() + for _, log := range x { + ce := col.service.Logger().Core().Check(log.Entry, nil) + if ce != nil { + ce.Write(log.Context...) + } + } + } if !col.set.SkipSettingGRPCLogger { grpclog.SetLogger(col.service.Logger(), cfg.Service.Telemetry.Logs.Level) @@ -243,12 +264,40 @@ func (col *Collector) DryRun(ctx context.Context) error { return cfg.Validate() } +func newFallbackLogger(options []zap.Option) (*zap.Logger, error) { + ec := zap.NewProductionEncoderConfig() + ec.EncodeTime = zapcore.ISO8601TimeEncoder + zapCfg := &zap.Config{ + Level: zap.NewAtomicLevelAt(zapcore.DebugLevel), + Encoding: "console", + EncoderConfig: ec, + OutputPaths: []string{"stderr"}, + ErrorOutputPaths: []string{"stderr"}, + } + return zapCfg.Build(options...) +} + // Run starts the collector according to the given configuration, and waits for it to complete. // Consecutive calls to Run are not allowed, Run shouldn't be called once a collector is shut down. // Sets up the control logic for config reloading and shutdown. func (col *Collector) Run(ctx context.Context) error { if err := col.setupConfigurationComponents(ctx); err != nil { col.setCollectorState(StateClosed) + logger, loggerErr := newFallbackLogger(col.set.LoggingOptions) + if loggerErr != nil { + return errors.Join(err, fmt.Errorf("unable to create fallback logger: %w", loggerErr)) + } + + if col.bc != nil { + x := col.bc.TakeLogs() + for _, log := range x { + ce := logger.Core().Check(log.Entry, nil) + if ce != nil { + ce.Write(log.Context...) + } + } + } + return err } diff --git a/otelcol/collector_core.go b/otelcol/collector_core.go new file mode 100644 index 00000000000..b0a379fedc9 --- /dev/null +++ b/otelcol/collector_core.go @@ -0,0 +1,58 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package otelcol // import "go.opentelemetry.io/collector/otelcol" + +import ( + "sync" + + "go.uber.org/zap/zapcore" +) + +var _ zapcore.Core = (*collectorCore)(nil) + +type collectorCore struct { + core zapcore.Core + rw sync.RWMutex +} + +func (c *collectorCore) Enabled(l zapcore.Level) bool { + c.rw.RLock() + defer c.rw.RUnlock() + return c.core.Enabled(l) +} + +func (c *collectorCore) With(f []zapcore.Field) zapcore.Core { + c.rw.RLock() + defer c.rw.RUnlock() + return &collectorCore{ + core: c.core.With(f), + } +} + +func (c *collectorCore) Check(e zapcore.Entry, ce *zapcore.CheckedEntry) *zapcore.CheckedEntry { + c.rw.RLock() + defer c.rw.RUnlock() + if c.core.Enabled(e.Level) { + return ce.AddCore(e, c) + } + return ce +} + +func (c *collectorCore) Write(e zapcore.Entry, f []zapcore.Field) error { + c.rw.RLock() + defer c.rw.RUnlock() + return c.core.Write(e, f) +} + +func (c *collectorCore) Sync() error { + c.rw.RLock() + defer c.rw.RUnlock() + return c.core.Sync() +} + +func (c *collectorCore) SetCore(core zapcore.Core) { + c.rw.Lock() + defer c.rw.Unlock() + c.core = core +} diff --git a/otelcol/collector_core_test.go b/otelcol/collector_core_test.go new file mode 100644 index 00000000000..6b9deb031e6 --- /dev/null +++ b/otelcol/collector_core_test.go @@ -0,0 +1,89 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package otelcol + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.uber.org/zap/zapcore" +) + +func Test_collectorCore_Enabled(t *testing.T) { + cc := collectorCore{core: newBufferedCore(zapcore.InfoLevel)} + assert.True(t, cc.Enabled(zapcore.ErrorLevel)) + assert.False(t, cc.Enabled(zapcore.DebugLevel)) +} + +func Test_collectorCore_Check(t *testing.T) { + t.Run("check passed", func(t *testing.T) { + bc := newBufferedCore(zapcore.InfoLevel) + cc := collectorCore{core: bc} + e := zapcore.Entry{ + Level: zapcore.InfoLevel, + } + expected := &zapcore.CheckedEntry{} + expected = expected.AddCore(e, &cc) + ce := cc.Check(e, nil) + assert.Equal(t, expected, ce) + }) + + t.Run("check did not pass", func(t *testing.T) { + cc := collectorCore{core: newBufferedCore(zapcore.InfoLevel)} + e := zapcore.Entry{ + Level: zapcore.DebugLevel, + } + ce := cc.Check(e, nil) + assert.Nil(t, ce) + }) +} + +func Test_collectorCore_With(t *testing.T) { + cc := collectorCore{core: newBufferedCore(zapcore.InfoLevel)} + cc.core.(*bufferedCore).context = []zapcore.Field{ + {Key: "original", String: "context"}, + } + inputs := []zapcore.Field{ + {Key: "test", String: "passed"}, + } + expected := []zapcore.Field{ + {Key: "original", String: "context"}, + {Key: "test", String: "passed"}, + } + newCC := cc.With(inputs) + assert.Equal(t, expected, newCC.(*collectorCore).core.(*bufferedCore).context) +} + +func Test_collectorCore_Write(t *testing.T) { + cc := collectorCore{core: newBufferedCore(zapcore.InfoLevel)} + e := zapcore.Entry{ + Level: zapcore.DebugLevel, + Message: "test", + } + fields := []zapcore.Field{ + {Key: "field1", String: "value1"}, + } + err := cc.Write(e, fields) + require.NoError(t, err) + + expected := loggedEntry{ + e, + fields, + } + require.Equal(t, 1, len(cc.core.(*bufferedCore).logs)) + require.Equal(t, expected, cc.core.(*bufferedCore).logs[0]) +} + +func Test_collectorCore_Sync(t *testing.T) { + cc := collectorCore{core: newBufferedCore(zapcore.InfoLevel)} + assert.NoError(t, cc.Sync()) +} + +func Test_collectorCore_SetCore(t *testing.T) { + cc := collectorCore{core: newBufferedCore(zapcore.InfoLevel)} + newCore := newBufferedCore(zapcore.DebugLevel) + cc.SetCore(newCore) + assert.Equal(t, zapcore.DebugLevel, cc.core.(*bufferedCore).LevelEnabler) +} diff --git a/otelcol/collector_windows.go b/otelcol/collector_windows.go index cc0a3611628..3df08386bbf 100644 --- a/otelcol/collector_windows.go +++ b/otelcol/collector_windows.go @@ -202,7 +202,7 @@ func (w windowsEventLogCore) Sync() error { func withWindowsCore(elog *eventlog.Log, serviceConfig **service.Config) func(zapcore.Core) zapcore.Core { return func(core zapcore.Core) zapcore.Core { - if serviceConfig != nil { + if serviceConfig != nil && *serviceConfig != nil { for _, output := range (*serviceConfig).Telemetry.Logs.OutputPaths { if output != "stdout" && output != "stderr" { // A log file was specified in the configuration, so we should not use the Windows Event Log