From 74c2cf3a0da9ab50d9f1dc41523706d7e3c79d3b Mon Sep 17 00:00:00 2001 From: Tyler Helmuth <12352919+TylerHelmuth@users.noreply.github.com> Date: Fri, 10 May 2024 02:22:33 -0600 Subject: [PATCH] [otelcol] Add a custom zapcore.Core for confmap logging (#10056) #### Description Provides a logger to confmap that buffers logs in memory until the primary logger can be used. Once the primary logger exists, places that used the original logger are given the updated Core. If an error occurs that would shut down the collector before the primary logger could be created, the logs are written to stdout/err using a fallback logger. Alternative to https://github.com/open-telemetry/opentelemetry-collector/pull/10008 I've pushed the testing I did to show how the logger successfully updates. Before config resolution the debug log in confmap is not printed, but afterwards it is. test config: ```yaml receivers: nop: exporters: otlphttp: endpoint: http://0.0.0.0:4317 headers: # Not set x-test: ${env:TEMP3} debug: # set to "detailed" verbosity: $TEMP service: telemetry: logs: level: debug pipelines: traces: receivers: - nop exporters: - debug ``` ![image](https://github.com/open-telemetry/opentelemetry-collector/assets/12352919/6a17993f-1f97-4c54-9165-5c34dd58d108) #### Link to tracking issue Related to https://github.com/open-telemetry/opentelemetry-collector/issues/9162 Related to https://github.com/open-telemetry/opentelemetry-collector/issues/5615 #### Testing If we like this approach I'll add tests #### Documentation --------- Co-authored-by: Dan Jaglowski Co-authored-by: Pablo Baeyens --- .chloggen/confmap-logger-wrapper-idea.yaml | 25 +++++ otelcol/buffered_core.go | 81 ++++++++++++++++ otelcol/buffered_core_test.go | 107 +++++++++++++++++++++ otelcol/collector.go | 59 +++++++++++- otelcol/collector_core.go | 58 +++++++++++ otelcol/collector_core_test.go | 89 +++++++++++++++++ otelcol/collector_windows.go | 2 +- 7 files changed, 415 insertions(+), 6 deletions(-) create mode 100644 .chloggen/confmap-logger-wrapper-idea.yaml create mode 100644 otelcol/buffered_core.go create mode 100644 otelcol/buffered_core_test.go create mode 100644 otelcol/collector_core.go create mode 100644 otelcol/collector_core_test.go diff --git a/.chloggen/confmap-logger-wrapper-idea.yaml b/.chloggen/confmap-logger-wrapper-idea.yaml new file mode 100644 index 00000000000..ff35e92c2a3 --- /dev/null +++ b/.chloggen/confmap-logger-wrapper-idea.yaml @@ -0,0 +1,25 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver) +component: otelcol + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Enable logging during configuration resolution + +# One or more tracking issues or pull requests related to the change +issues: [10056] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] diff --git a/otelcol/buffered_core.go b/otelcol/buffered_core.go new file mode 100644 index 00000000000..447bf13cb52 --- /dev/null +++ b/otelcol/buffered_core.go @@ -0,0 +1,81 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +// This logger implements zapcore.Core and is based on zaptest/observer. + +package otelcol // import "go.opentelemetry.io/collector/otelcol" + +import ( + "fmt" + "sync" + + "go.uber.org/zap/zapcore" +) + +type loggedEntry struct { + zapcore.Entry + Context []zapcore.Field +} + +func newBufferedCore(enab zapcore.LevelEnabler) *bufferedCore { + return &bufferedCore{LevelEnabler: enab} +} + +var _ zapcore.Core = (*bufferedCore)(nil) + +type bufferedCore struct { + zapcore.LevelEnabler + mu sync.RWMutex + logs []loggedEntry + context []zapcore.Field + logsTaken bool +} + +func (bc *bufferedCore) Level() zapcore.Level { + return zapcore.LevelOf(bc.LevelEnabler) +} + +func (bc *bufferedCore) Check(ent zapcore.Entry, ce *zapcore.CheckedEntry) *zapcore.CheckedEntry { + if bc.Enabled(ent.Level) { + return ce.AddCore(ent, bc) + } + return ce +} + +func (bc *bufferedCore) With(fields []zapcore.Field) zapcore.Core { + return &bufferedCore{ + LevelEnabler: bc.LevelEnabler, + logs: bc.logs, + logsTaken: bc.logsTaken, + context: append(bc.context, fields...), + } +} + +func (bc *bufferedCore) Write(ent zapcore.Entry, fields []zapcore.Field) error { + bc.mu.Lock() + defer bc.mu.Unlock() + if bc.logsTaken { + return fmt.Errorf("the buffered logs have already been taken so writing is no longer supported") + } + all := make([]zapcore.Field, 0, len(fields)+len(bc.context)) + all = append(all, bc.context...) + all = append(all, fields...) + bc.logs = append(bc.logs, loggedEntry{ent, all}) + return nil +} + +func (bc *bufferedCore) Sync() error { + return nil +} + +func (bc *bufferedCore) TakeLogs() []loggedEntry { + if !bc.logsTaken { + bc.mu.Lock() + defer bc.mu.Unlock() + logs := bc.logs + bc.logs = nil + bc.logsTaken = true + return logs + } + return nil +} diff --git a/otelcol/buffered_core_test.go b/otelcol/buffered_core_test.go new file mode 100644 index 00000000000..6417b8a1d9d --- /dev/null +++ b/otelcol/buffered_core_test.go @@ -0,0 +1,107 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package otelcol + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.uber.org/zap/zapcore" +) + +func Test_bufferedCore_Level(t *testing.T) { + bc := newBufferedCore(zapcore.InfoLevel) + assert.Equal(t, zapcore.InfoLevel, bc.Level()) +} + +func Test_bufferedCore_Check(t *testing.T) { + t.Run("check passed", func(t *testing.T) { + bc := newBufferedCore(zapcore.InfoLevel) + e := zapcore.Entry{ + Level: zapcore.InfoLevel, + } + expected := &zapcore.CheckedEntry{} + expected = expected.AddCore(e, bc) + ce := bc.Check(e, nil) + assert.Equal(t, expected, ce) + }) + + t.Run("check did not pass", func(t *testing.T) { + bc := newBufferedCore(zapcore.InfoLevel) + e := zapcore.Entry{ + Level: zapcore.DebugLevel, + } + ce := bc.Check(e, nil) + assert.Nil(t, ce) + }) +} + +func Test_bufferedCore_With(t *testing.T) { + bc := newBufferedCore(zapcore.InfoLevel) + bc.logsTaken = true + bc.context = []zapcore.Field{ + {Key: "original", String: "context"}, + } + inputs := []zapcore.Field{ + {Key: "test", String: "passed"}, + } + expected := []zapcore.Field{ + {Key: "original", String: "context"}, + {Key: "test", String: "passed"}, + } + newBC := bc.With(inputs) + assert.Equal(t, expected, newBC.(*bufferedCore).context) + assert.True(t, newBC.(*bufferedCore).logsTaken) +} + +func Test_bufferedCore_Write(t *testing.T) { + bc := newBufferedCore(zapcore.InfoLevel) + e := zapcore.Entry{ + Level: zapcore.DebugLevel, + Message: "test", + } + fields := []zapcore.Field{ + {Key: "field1", String: "value1"}, + } + err := bc.Write(e, fields) + require.NoError(t, err) + + expected := loggedEntry{ + e, + fields, + } + require.Equal(t, 1, len(bc.logs)) + require.Equal(t, expected, bc.logs[0]) +} + +func Test_bufferedCore_Sync(t *testing.T) { + bc := newBufferedCore(zapcore.InfoLevel) + assert.NoError(t, bc.Sync()) +} + +func Test_bufferedCore_TakeLogs(t *testing.T) { + bc := newBufferedCore(zapcore.InfoLevel) + e := zapcore.Entry{ + Level: zapcore.DebugLevel, + Message: "test", + } + fields := []zapcore.Field{ + {Key: "field1", String: "value1"}, + } + err := bc.Write(e, fields) + require.NoError(t, err) + + expected := []loggedEntry{ + { + e, + fields, + }, + } + assert.Equal(t, expected, bc.TakeLogs()) + assert.Nil(t, bc.logs) + + assert.Error(t, bc.Write(e, fields)) + assert.Nil(t, bc.TakeLogs()) +} diff --git a/otelcol/collector.go b/otelcol/collector.go index bca6e9b86dc..fb36217cb11 100644 --- a/otelcol/collector.go +++ b/otelcol/collector.go @@ -7,6 +7,7 @@ package otelcol // import "go.opentelemetry.io/collector/otelcol" import ( "context" + "errors" "fmt" "os" "os/signal" @@ -15,6 +16,7 @@ import ( "go.uber.org/multierr" "go.uber.org/zap" + "go.uber.org/zap/zapcore" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/confmap" @@ -108,7 +110,9 @@ type Collector struct { // signalsChannel is used to receive termination signals from the OS. signalsChannel chan os.Signal // asyncErrorChannel is used to signal a fatal error from any component. - asyncErrorChannel chan error + asyncErrorChannel chan error + bc *bufferedCore + updateConfigProviderLogger func(core zapcore.Core) } // NewCollector creates and returns a new instance of Collector. @@ -116,7 +120,10 @@ func NewCollector(set CollectorSettings) (*Collector, error) { var err error configProvider := set.ConfigProvider - set.ConfigProviderSettings.ResolverSettings.ProviderSettings = confmap.ProviderSettings{Logger: zap.NewNop()} + bc := newBufferedCore(zapcore.DebugLevel) + cc := &collectorCore{core: bc} + options := append([]zap.Option{zap.WithCaller(true)}, set.LoggingOptions...) + set.ConfigProviderSettings.ResolverSettings.ProviderSettings = confmap.ProviderSettings{Logger: zap.New(cc, options...)} set.ConfigProviderSettings.ResolverSettings.ConverterSettings = confmap.ConverterSettings{} if configProvider == nil { @@ -134,9 +141,11 @@ func NewCollector(set CollectorSettings) (*Collector, error) { shutdownChan: make(chan struct{}), // Per signal.Notify documentation, a size of the channel equaled with // the number of signals getting notified on is recommended. - signalsChannel: make(chan os.Signal, 3), - asyncErrorChannel: make(chan error), - configProvider: configProvider, + signalsChannel: make(chan os.Signal, 3), + asyncErrorChannel: make(chan error), + configProvider: configProvider, + bc: bc, + updateConfigProviderLogger: cc.SetCore, }, nil } @@ -202,6 +211,18 @@ func (col *Collector) setupConfigurationComponents(ctx context.Context) error { if err != nil { return err } + if col.updateConfigProviderLogger != nil { + col.updateConfigProviderLogger(col.service.Logger().Core()) + } + if col.bc != nil { + x := col.bc.TakeLogs() + for _, log := range x { + ce := col.service.Logger().Core().Check(log.Entry, nil) + if ce != nil { + ce.Write(log.Context...) + } + } + } if !col.set.SkipSettingGRPCLogger { grpclog.SetLogger(col.service.Logger(), cfg.Service.Telemetry.Logs.Level) @@ -243,12 +264,40 @@ func (col *Collector) DryRun(ctx context.Context) error { return cfg.Validate() } +func newFallbackLogger(options []zap.Option) (*zap.Logger, error) { + ec := zap.NewProductionEncoderConfig() + ec.EncodeTime = zapcore.ISO8601TimeEncoder + zapCfg := &zap.Config{ + Level: zap.NewAtomicLevelAt(zapcore.DebugLevel), + Encoding: "console", + EncoderConfig: ec, + OutputPaths: []string{"stderr"}, + ErrorOutputPaths: []string{"stderr"}, + } + return zapCfg.Build(options...) +} + // Run starts the collector according to the given configuration, and waits for it to complete. // Consecutive calls to Run are not allowed, Run shouldn't be called once a collector is shut down. // Sets up the control logic for config reloading and shutdown. func (col *Collector) Run(ctx context.Context) error { if err := col.setupConfigurationComponents(ctx); err != nil { col.setCollectorState(StateClosed) + logger, loggerErr := newFallbackLogger(col.set.LoggingOptions) + if loggerErr != nil { + return errors.Join(err, fmt.Errorf("unable to create fallback logger: %w", loggerErr)) + } + + if col.bc != nil { + x := col.bc.TakeLogs() + for _, log := range x { + ce := logger.Core().Check(log.Entry, nil) + if ce != nil { + ce.Write(log.Context...) + } + } + } + return err } diff --git a/otelcol/collector_core.go b/otelcol/collector_core.go new file mode 100644 index 00000000000..b0a379fedc9 --- /dev/null +++ b/otelcol/collector_core.go @@ -0,0 +1,58 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package otelcol // import "go.opentelemetry.io/collector/otelcol" + +import ( + "sync" + + "go.uber.org/zap/zapcore" +) + +var _ zapcore.Core = (*collectorCore)(nil) + +type collectorCore struct { + core zapcore.Core + rw sync.RWMutex +} + +func (c *collectorCore) Enabled(l zapcore.Level) bool { + c.rw.RLock() + defer c.rw.RUnlock() + return c.core.Enabled(l) +} + +func (c *collectorCore) With(f []zapcore.Field) zapcore.Core { + c.rw.RLock() + defer c.rw.RUnlock() + return &collectorCore{ + core: c.core.With(f), + } +} + +func (c *collectorCore) Check(e zapcore.Entry, ce *zapcore.CheckedEntry) *zapcore.CheckedEntry { + c.rw.RLock() + defer c.rw.RUnlock() + if c.core.Enabled(e.Level) { + return ce.AddCore(e, c) + } + return ce +} + +func (c *collectorCore) Write(e zapcore.Entry, f []zapcore.Field) error { + c.rw.RLock() + defer c.rw.RUnlock() + return c.core.Write(e, f) +} + +func (c *collectorCore) Sync() error { + c.rw.RLock() + defer c.rw.RUnlock() + return c.core.Sync() +} + +func (c *collectorCore) SetCore(core zapcore.Core) { + c.rw.Lock() + defer c.rw.Unlock() + c.core = core +} diff --git a/otelcol/collector_core_test.go b/otelcol/collector_core_test.go new file mode 100644 index 00000000000..6b9deb031e6 --- /dev/null +++ b/otelcol/collector_core_test.go @@ -0,0 +1,89 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package otelcol + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.uber.org/zap/zapcore" +) + +func Test_collectorCore_Enabled(t *testing.T) { + cc := collectorCore{core: newBufferedCore(zapcore.InfoLevel)} + assert.True(t, cc.Enabled(zapcore.ErrorLevel)) + assert.False(t, cc.Enabled(zapcore.DebugLevel)) +} + +func Test_collectorCore_Check(t *testing.T) { + t.Run("check passed", func(t *testing.T) { + bc := newBufferedCore(zapcore.InfoLevel) + cc := collectorCore{core: bc} + e := zapcore.Entry{ + Level: zapcore.InfoLevel, + } + expected := &zapcore.CheckedEntry{} + expected = expected.AddCore(e, &cc) + ce := cc.Check(e, nil) + assert.Equal(t, expected, ce) + }) + + t.Run("check did not pass", func(t *testing.T) { + cc := collectorCore{core: newBufferedCore(zapcore.InfoLevel)} + e := zapcore.Entry{ + Level: zapcore.DebugLevel, + } + ce := cc.Check(e, nil) + assert.Nil(t, ce) + }) +} + +func Test_collectorCore_With(t *testing.T) { + cc := collectorCore{core: newBufferedCore(zapcore.InfoLevel)} + cc.core.(*bufferedCore).context = []zapcore.Field{ + {Key: "original", String: "context"}, + } + inputs := []zapcore.Field{ + {Key: "test", String: "passed"}, + } + expected := []zapcore.Field{ + {Key: "original", String: "context"}, + {Key: "test", String: "passed"}, + } + newCC := cc.With(inputs) + assert.Equal(t, expected, newCC.(*collectorCore).core.(*bufferedCore).context) +} + +func Test_collectorCore_Write(t *testing.T) { + cc := collectorCore{core: newBufferedCore(zapcore.InfoLevel)} + e := zapcore.Entry{ + Level: zapcore.DebugLevel, + Message: "test", + } + fields := []zapcore.Field{ + {Key: "field1", String: "value1"}, + } + err := cc.Write(e, fields) + require.NoError(t, err) + + expected := loggedEntry{ + e, + fields, + } + require.Equal(t, 1, len(cc.core.(*bufferedCore).logs)) + require.Equal(t, expected, cc.core.(*bufferedCore).logs[0]) +} + +func Test_collectorCore_Sync(t *testing.T) { + cc := collectorCore{core: newBufferedCore(zapcore.InfoLevel)} + assert.NoError(t, cc.Sync()) +} + +func Test_collectorCore_SetCore(t *testing.T) { + cc := collectorCore{core: newBufferedCore(zapcore.InfoLevel)} + newCore := newBufferedCore(zapcore.DebugLevel) + cc.SetCore(newCore) + assert.Equal(t, zapcore.DebugLevel, cc.core.(*bufferedCore).LevelEnabler) +} diff --git a/otelcol/collector_windows.go b/otelcol/collector_windows.go index cc0a3611628..3df08386bbf 100644 --- a/otelcol/collector_windows.go +++ b/otelcol/collector_windows.go @@ -202,7 +202,7 @@ func (w windowsEventLogCore) Sync() error { func withWindowsCore(elog *eventlog.Log, serviceConfig **service.Config) func(zapcore.Core) zapcore.Core { return func(core zapcore.Core) zapcore.Core { - if serviceConfig != nil { + if serviceConfig != nil && *serviceConfig != nil { for _, output := range (*serviceConfig).Telemetry.Logs.OutputPaths { if output != "stdout" && output != "stderr" { // A log file was specified in the configuration, so we should not use the Windows Event Log