From 49a090ba924b3fc08035c6a0ef9cb4ff272a9c0f Mon Sep 17 00:00:00 2001 From: Evan Bradley <11745660+evan-bradley@users.noreply.github.com> Date: Tue, 8 Aug 2023 12:40:08 -0400 Subject: [PATCH] Notify extensions of the Collector's effective configuration (#6833) **Description:** This adds an optional `ConfigWatcher` interface that extensions can implement if they want to be notified of the effective configuration that is used by the Collector. I don't feel very strongly about any of the decisions I made in this PR, so I am open to input if we would like to take a different approach anywhere. I will leave some comments to explain the decisions I made. **Link to tracking Issue:** Closes https://github.com/open-telemetry/opentelemetry-collector/issues/6596 **Testing:** I've made minimal unit test changes, but I expect to write more tests once there is consensus on the direction for implementing this functionality. I have done some manual testing to show that an extension can get a YAML representation of the effective config using two YAML input files. --------- Co-authored-by: Evan Bradley --- .chloggen/configwatcher-interface.yaml | 16 +++ .chloggen/extension-effective-config.yaml | 18 +++ .chloggen/servicesettings-confmap.yaml | 17 +++ extension/extension.go | 8 ++ otelcol/collector.go | 14 +++ otelcol/collector_test.go | 45 ++++++++ otelcol/configprovider.go | 25 +++++ otelcol/configprovider_test.go | 121 +++++++++++---------- service/extensions/extensions.go | 12 ++ service/extensions/extensions_test.go | 127 ++++++++++++++++++++++ service/service.go | 12 ++ service/service_test.go | 87 ++++++++++++++- 12 files changed, 439 insertions(+), 63 deletions(-) create mode 100755 .chloggen/configwatcher-interface.yaml create mode 100755 .chloggen/extension-effective-config.yaml create mode 100755 .chloggen/servicesettings-confmap.yaml diff --git a/.chloggen/configwatcher-interface.yaml b/.chloggen/configwatcher-interface.yaml new file mode 100755 index 00000000000..4e5b611e31b --- /dev/null +++ b/.chloggen/configwatcher-interface.yaml @@ -0,0 +1,16 @@ +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver) +component: extension + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Add optional `ConfigWatcher` interface + +# One or more tracking issues or pull requests related to the change +issues: [6596] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: Extensions implementing this interface will be notified of the Collector's effective config. diff --git a/.chloggen/extension-effective-config.yaml b/.chloggen/extension-effective-config.yaml new file mode 100755 index 00000000000..13b27ad5085 --- /dev/null +++ b/.chloggen/extension-effective-config.yaml @@ -0,0 +1,18 @@ +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver) +component: otelcol + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Add optional `ConfmapProvider` interface for Config Providers + +# One or more tracking issues or pull requests related to the change +issues: [6596] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: | + This allows providing the Collector's configuration as a marshaled confmap.Conf object + from a ConfigProvider diff --git a/.chloggen/servicesettings-confmap.yaml b/.chloggen/servicesettings-confmap.yaml new file mode 100755 index 00000000000..cd0d95fdd69 --- /dev/null +++ b/.chloggen/servicesettings-confmap.yaml @@ -0,0 +1,17 @@ +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver) +component: service + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Add `CollectorConf` field to `service.Settings` + +# One or more tracking issues or pull requests related to the change +issues: [6596] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: | + This field is intended to be used by the Collector to pass its effective configuration to the service. diff --git a/extension/extension.go b/extension/extension.go index 3f3b087cc78..6b8df571b81 100644 --- a/extension/extension.go +++ b/extension/extension.go @@ -8,6 +8,7 @@ import ( "fmt" "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/confmap" ) // Extension is the interface for objects hosted by the OpenTelemetry Collector that @@ -32,6 +33,13 @@ type PipelineWatcher interface { NotReady() error } +// ConfigWatcher is an interface that should be implemented by an extension that +// wishes to be notified of the Collector's effective configuration. +type ConfigWatcher interface { + // NotifyConfig notifies the extension of the Collector's current effective configuration. + NotifyConfig(ctx context.Context, conf *confmap.Conf) error +} + // CreateSettings is passed to Factory.Create(...) function. type CreateSettings struct { // ID returns the ID of the component that will be created. diff --git a/otelcol/collector.go b/otelcol/collector.go index 788c022e9f2..557ff6f18aa 100644 --- a/otelcol/collector.go +++ b/otelcol/collector.go @@ -18,6 +18,7 @@ import ( "go.uber.org/zap" "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/confmap" "go.opentelemetry.io/collector/connector" "go.opentelemetry.io/collector/exporter" "go.opentelemetry.io/collector/extension" @@ -143,6 +144,17 @@ func (col *Collector) Shutdown() { func (col *Collector) setupConfigurationComponents(ctx context.Context) error { col.setCollectorState(StateStarting) + var conf *confmap.Conf + + if cp, ok := col.set.ConfigProvider.(ConfmapProvider); ok { + var err error + conf, err = cp.GetConfmap(ctx) + + if err != nil { + return fmt.Errorf("failed to resolve config: %w", err) + } + } + cfg, err := col.set.ConfigProvider.Get(ctx, col.set.Factories) if err != nil { return fmt.Errorf("failed to get config: %w", err) @@ -154,6 +166,7 @@ func (col *Collector) setupConfigurationComponents(ctx context.Context) error { col.service, err = service.New(ctx, service.Settings{ BuildInfo: col.set.BuildInfo, + CollectorConf: conf, Receivers: receiver.NewBuilder(cfg.Receivers, col.set.Factories.Receivers), Processors: processor.NewBuilder(cfg.Processors, col.set.Factories.Processors), Exporters: exporter.NewBuilder(cfg.Exporters, col.set.Factories.Exporters), @@ -174,6 +187,7 @@ func (col *Collector) setupConfigurationComponents(ctx context.Context) error { return multierr.Combine(err, col.service.Shutdown(ctx)) } col.setCollectorState(StateRunning) + return nil } diff --git a/otelcol/collector_test.go b/otelcol/collector_test.go index e9ba23719cc..1a0ad8d0607 100644 --- a/otelcol/collector_test.go +++ b/otelcol/collector_test.go @@ -17,6 +17,8 @@ import ( "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/confmap" + "go.opentelemetry.io/collector/confmap/converter/expandconverter" ) func TestStateString(t *testing.T) { @@ -369,6 +371,31 @@ func TestCollectorDryRun(t *testing.T) { require.Error(t, col.DryRun(context.Background())) } +func TestPassConfmapToServiceFailure(t *testing.T) { + factories, err := nopFactories() + require.NoError(t, err) + + cfgProvider, err := NewConfigProvider(ConfigProviderSettings{ + ResolverSettings: confmap.ResolverSettings{ + URIs: []string{filepath.Join("testdata", "otelcol-invalid.yaml")}, + Providers: makeMapProvidersMap(newFailureProvider()), + Converters: []confmap.Converter{expandconverter.New()}, + }, + }) + require.NoError(t, err) + + set := CollectorSettings{ + BuildInfo: component.NewDefaultBuildInfo(), + Factories: factories, + ConfigProvider: cfgProvider, + } + col, err := NewCollector(set) + require.NoError(t, err) + + err = col.Run(context.Background()) + require.Error(t, err) +} + func startCollector(ctx context.Context, t *testing.T, col *Collector) *sync.WaitGroup { wg := &sync.WaitGroup{} wg.Add(1) @@ -378,3 +405,21 @@ func startCollector(ctx context.Context, t *testing.T, col *Collector) *sync.Wai }() return wg } + +type failureProvider struct{} + +func newFailureProvider() confmap.Provider { + return &failureProvider{} +} + +func (fmp *failureProvider) Retrieve(_ context.Context, _ string, _ confmap.WatcherFunc) (*confmap.Retrieved, error) { + return nil, errors.New("a failure occurred during configuration retrieval") +} + +func (*failureProvider) Scheme() string { + return "file" +} + +func (*failureProvider) Shutdown(context.Context) error { + return nil +} diff --git a/otelcol/configprovider.go b/otelcol/configprovider.go index b50b47efe4f..c266c9a47df 100644 --- a/otelcol/configprovider.go +++ b/otelcol/configprovider.go @@ -50,10 +50,26 @@ type ConfigProvider interface { Shutdown(ctx context.Context) error } +// ConfmapProvider is an optional interface to be implemented by ConfigProviders +// to provide confmap.Conf objects representing a marshaled version of the +// Collector's configuration. +// +// The purpose of this interface is that otelcol.ConfigProvider structs do not +// necessarily need to use confmap.Conf as their underlying config structure. +type ConfmapProvider interface { + // GetConfmap resolves the Collector's configuration and provides it as a confmap.Conf object. + // + // Should never be called concurrently with itself or any ConfigProvider method. + GetConfmap(ctx context.Context) (*confmap.Conf, error) +} + type configProvider struct { mapResolver *confmap.Resolver } +var _ ConfigProvider = &configProvider{} +var _ ConfmapProvider = &configProvider{} + // ConfigProviderSettings are the settings to configure the behavior of the ConfigProvider. type ConfigProviderSettings struct { // ResolverSettings are the settings to configure the behavior of the confmap.Resolver. @@ -106,6 +122,15 @@ func (cm *configProvider) Shutdown(ctx context.Context) error { return cm.mapResolver.Shutdown(ctx) } +func (cm *configProvider) GetConfmap(ctx context.Context) (*confmap.Conf, error) { + conf, err := cm.mapResolver.Resolve(ctx) + if err != nil { + return nil, fmt.Errorf("cannot resolve the configuration: %w", err) + } + + return conf, nil +} + func newDefaultConfigProviderSettings(uris []string) ConfigProviderSettings { return ConfigProviderSettings{ ResolverSettings: confmap.ResolverSettings{ diff --git a/otelcol/configprovider_test.go b/otelcol/configprovider_test.go index 5f0a97ea41f..a1c5be3eb49 100644 --- a/otelcol/configprovider_test.go +++ b/otelcol/configprovider_test.go @@ -11,69 +11,36 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "go.uber.org/zap/zapcore" + "gopkg.in/yaml.v3" - "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/config/configtelemetry" "go.opentelemetry.io/collector/confmap" "go.opentelemetry.io/collector/confmap/provider/fileprovider" "go.opentelemetry.io/collector/confmap/provider/yamlprovider" - "go.opentelemetry.io/collector/connector/connectortest" - "go.opentelemetry.io/collector/exporter/exportertest" - "go.opentelemetry.io/collector/extension/extensiontest" - "go.opentelemetry.io/collector/processor/processortest" - "go.opentelemetry.io/collector/receiver/receivertest" - "go.opentelemetry.io/collector/service" - "go.opentelemetry.io/collector/service/pipelines" - "go.opentelemetry.io/collector/service/telemetry" ) -var configNop = &Config{ - Receivers: map[component.ID]component.Config{component.NewID("nop"): receivertest.NewNopFactory().CreateDefaultConfig()}, - Processors: map[component.ID]component.Config{component.NewID("nop"): processortest.NewNopFactory().CreateDefaultConfig()}, - Exporters: map[component.ID]component.Config{component.NewID("nop"): exportertest.NewNopFactory().CreateDefaultConfig()}, - Connectors: map[component.ID]component.Config{component.NewIDWithName("nop", "con"): connectortest.NewNopFactory().CreateDefaultConfig()}, - Extensions: map[component.ID]component.Config{component.NewID("nop"): extensiontest.NewNopFactory().CreateDefaultConfig()}, - Service: service.Config{ - Extensions: []component.ID{component.NewID("nop")}, - Pipelines: pipelines.Config{ - component.NewID("traces"): { - Receivers: []component.ID{component.NewID("nop")}, - Processors: []component.ID{component.NewID("nop")}, - Exporters: []component.ID{component.NewID("nop"), component.NewIDWithName("nop", "con")}, - }, - component.NewID("metrics"): { - Receivers: []component.ID{component.NewID("nop")}, - Processors: []component.ID{component.NewID("nop")}, - Exporters: []component.ID{component.NewID("nop")}, - }, - component.NewID("logs"): { - Receivers: []component.ID{component.NewID("nop"), component.NewIDWithName("nop", "con")}, - Processors: []component.ID{component.NewID("nop")}, - Exporters: []component.ID{component.NewID("nop")}, - }, - }, - Telemetry: telemetry.Config{ - Logs: telemetry.LogsConfig{ - Level: zapcore.InfoLevel, - Development: false, - Encoding: "console", - Sampling: &telemetry.LogsSamplingConfig{ - Initial: 100, - Thereafter: 100, - }, - OutputPaths: []string{"stderr"}, - ErrorOutputPaths: []string{"stderr"}, - DisableCaller: false, - DisableStacktrace: false, - InitialFields: map[string]any(nil), - }, - Metrics: telemetry.MetricsConfig{ - Level: configtelemetry.LevelBasic, - Address: "localhost:8888", - }, - }, - }, +func newConfig(yamlBytes []byte, factories Factories) (*Config, error) { + var stringMap = map[string]interface{}{} + err := yaml.Unmarshal(yamlBytes, stringMap) + + if err != nil { + return nil, err + } + + conf := confmap.NewFromStringMap(stringMap) + + cfg, err := unmarshal(conf, factories) + if err != nil { + return nil, err + } + + return &Config{ + Receivers: cfg.Receivers.Configs(), + Processors: cfg.Processors.Configs(), + Exporters: cfg.Exporters.Configs(), + Connectors: cfg.Connectors.Configs(), + Extensions: cfg.Extensions.Configs(), + Service: cfg.Service, + }, nil } func TestConfigProviderYaml(t *testing.T) { @@ -97,6 +64,10 @@ func TestConfigProviderYaml(t *testing.T) { cfg, err := cp.Get(context.Background(), factories) require.NoError(t, err) + + configNop, err := newConfig(yamlBytes, factories) + require.NoError(t, err) + assert.EqualValues(t, configNop, cfg) } @@ -118,5 +89,41 @@ func TestConfigProviderFile(t *testing.T) { cfg, err := cp.Get(context.Background(), factories) require.NoError(t, err) + + yamlBytes, err := os.ReadFile(filepath.Join("testdata", "otelcol-nop.yaml")) + require.NoError(t, err) + + configNop, err := newConfig(yamlBytes, factories) + require.NoError(t, err) + assert.EqualValues(t, configNop, cfg) } + +func TestGetConfmap(t *testing.T) { + uriLocation := "file:" + filepath.Join("testdata", "otelcol-nop.yaml") + provider := fileprovider.New() + set := ConfigProviderSettings{ + ResolverSettings: confmap.ResolverSettings{ + URIs: []string{uriLocation}, + Providers: map[string]confmap.Provider{provider.Scheme(): provider}, + }, + } + + configBytes, err := os.ReadFile(filepath.Join("testdata", "otelcol-nop.yaml")) + require.NoError(t, err) + + yamlMap := map[string]any{} + err = yaml.Unmarshal(configBytes, yamlMap) + require.NoError(t, err) + + cp, err := NewConfigProvider(set) + require.NoError(t, err) + + cmp, ok := cp.(ConfmapProvider) + require.True(t, ok) + + cmap, err := cmp.GetConfmap(context.Background()) + require.NoError(t, err) + + assert.EqualValues(t, yamlMap, cmap.ToStringMap()) +} diff --git a/service/extensions/extensions.go b/service/extensions/extensions.go index 76d71fb7aa7..5ff92cd5852 100644 --- a/service/extensions/extensions.go +++ b/service/extensions/extensions.go @@ -12,6 +12,7 @@ import ( "go.uber.org/multierr" "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/confmap" "go.opentelemetry.io/collector/extension" "go.opentelemetry.io/collector/service/internal/components" "go.opentelemetry.io/collector/service/internal/zpages" @@ -72,6 +73,17 @@ func (bes *Extensions) NotifyPipelineNotReady() error { return errs } +func (bes *Extensions) NotifyConfig(ctx context.Context, conf *confmap.Conf) error { + var errs error + for _, ext := range bes.extMap { + if cw, ok := ext.(extension.ConfigWatcher); ok { + clonedConf := confmap.NewFromStringMap(conf.ToStringMap()) + errs = multierr.Append(errs, cw.NotifyConfig(ctx, clonedConf)) + } + } + return errs +} + func (bes *Extensions) GetExtensions() map[component.ID]component.Component { result := make(map[component.ID]component.Component, len(bes.extMap)) for extID, v := range bes.extMap { diff --git a/service/extensions/extensions_test.go b/service/extensions/extensions_test.go index ee44401be36..d244fe0e8dd 100644 --- a/service/extensions/extensions_test.go +++ b/service/extensions/extensions_test.go @@ -13,6 +13,7 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/collector/confmap" "go.opentelemetry.io/collector/extension" "go.opentelemetry.io/collector/extension/extensiontest" ) @@ -91,6 +92,132 @@ func TestBuildExtensions(t *testing.T) { } } +func TestNotifyConfig(t *testing.T) { + notificationError := errors.New("Error processing config") + nopExtensionFactory := extensiontest.NewNopFactory() + nopExtensionConfig := nopExtensionFactory.CreateDefaultConfig() + n1ExtensionFactory := newConfigWatcherExtensionFactory("notifiable1", func() error { return nil }) + n1ExtensionConfig := n1ExtensionFactory.CreateDefaultConfig() + n2ExtensionFactory := newConfigWatcherExtensionFactory("notifiable2", func() error { return nil }) + n2ExtensionConfig := n1ExtensionFactory.CreateDefaultConfig() + nErrExtensionFactory := newConfigWatcherExtensionFactory("notifiableErr", func() error { return notificationError }) + nErrExtensionConfig := nErrExtensionFactory.CreateDefaultConfig() + + tests := []struct { + name string + factories map[component.Type]extension.Factory + extensionsConfigs map[component.ID]component.Config + serviceExtensions []component.ID + wantErrMsg string + want error + }{ + { + name: "No notifiable extensions", + factories: map[component.Type]extension.Factory{ + "nop": nopExtensionFactory, + }, + extensionsConfigs: map[component.ID]component.Config{ + component.NewID("nop"): nopExtensionConfig, + }, + serviceExtensions: []component.ID{ + component.NewID("nop"), + }, + }, + { + name: "One notifiable extension", + factories: map[component.Type]extension.Factory{ + "notifiable1": n1ExtensionFactory, + }, + extensionsConfigs: map[component.ID]component.Config{ + component.NewID("notifiable1"): n1ExtensionConfig, + }, + serviceExtensions: []component.ID{ + component.NewID("notifiable1"), + }, + }, + { + name: "Multiple notifiable extensions", + factories: map[component.Type]extension.Factory{ + "notifiable1": n1ExtensionFactory, + "notifiable2": n2ExtensionFactory, + }, + extensionsConfigs: map[component.ID]component.Config{ + component.NewID("notifiable1"): n1ExtensionConfig, + component.NewID("notifiable2"): n2ExtensionConfig, + }, + serviceExtensions: []component.ID{ + component.NewID("notifiable1"), + component.NewID("notifiable2"), + }, + }, + { + name: "Errors in extension notification", + factories: map[component.Type]extension.Factory{ + "notifiableErr": nErrExtensionFactory, + }, + extensionsConfigs: map[component.ID]component.Config{ + component.NewID("notifiableErr"): nErrExtensionConfig, + }, + serviceExtensions: []component.ID{ + component.NewID("notifiableErr"), + }, + want: notificationError, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + extensions, err := New(context.Background(), Settings{ + Telemetry: componenttest.NewNopTelemetrySettings(), + BuildInfo: component.NewDefaultBuildInfo(), + Configs: tt.extensionsConfigs, + Factories: tt.factories, + }, tt.serviceExtensions) + assert.NoError(t, err) + errs := extensions.NotifyConfig(context.Background(), confmap.NewFromStringMap(map[string]interface{}{})) + assert.Equal(t, tt.want, errs) + }) + } +} + +type configWatcherExtension struct { + fn func() error +} + +func (comp *configWatcherExtension) Start(_ context.Context, _ component.Host) error { + return comp.fn() +} + +func (comp *configWatcherExtension) Shutdown(_ context.Context) error { + return comp.fn() +} + +func (comp *configWatcherExtension) NotifyConfig(_ context.Context, _ *confmap.Conf) error { + return comp.fn() +} + +func newConfigWatcherExtension(fn func() error) *configWatcherExtension { + comp := &configWatcherExtension{ + fn: fn, + } + + return comp + +} + +func newConfigWatcherExtensionFactory(name component.Type, fn func() error) extension.Factory { + return extension.NewFactory( + name, + func() component.Config { + return &struct{}{} + }, + func(ctx context.Context, set extension.CreateSettings, extension component.Config) (extension.Extension, error) { + return newConfigWatcherExtension(fn), nil + }, + component.StabilityLevelDevelopment, + ) +} + func newBadExtensionFactory() extension.Factory { return extension.NewFactory( "bf", diff --git a/service/service.go b/service/service.go index 318f99a868c..ed9540ec948 100644 --- a/service/service.go +++ b/service/service.go @@ -17,6 +17,7 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/config/configtelemetry" + "go.opentelemetry.io/collector/confmap" "go.opentelemetry.io/collector/connector" "go.opentelemetry.io/collector/exporter" "go.opentelemetry.io/collector/extension" @@ -36,6 +37,9 @@ type Settings struct { // BuildInfo provides collector start information. BuildInfo component.BuildInfo + // CollectorConf contains the Collector's current configuration + CollectorConf *confmap.Conf + // Receivers builder for receivers. Receivers *receiver.Builder @@ -68,6 +72,7 @@ type Service struct { telemetrySettings component.TelemetrySettings host *serviceHost telemetryInitializer *telemetryInitializer + collectorConf *confmap.Conf } func New(ctx context.Context, set Settings, cfg Config) (*Service, error) { @@ -89,6 +94,7 @@ func New(ctx context.Context, set Settings, cfg Config) (*Service, error) { asyncErrorChannel: set.AsyncErrorChannel, }, telemetryInitializer: newColTelemetry(useOtel, disableHighCard, extendedConfig), + collectorConf: set.CollectorConf, } var err error srv.telemetry, err = telemetry.New(ctx, telemetry.Settings{ZapOptions: set.LoggingOptions}, cfg.Telemetry) @@ -138,6 +144,12 @@ func (srv *Service) Start(ctx context.Context) error { return fmt.Errorf("failed to start extensions: %w", err) } + if srv.collectorConf != nil { + if err := srv.host.serviceExtensions.NotifyConfig(ctx, srv.collectorConf); err != nil { + return err + } + } + if err := srv.host.pipelines.StartAll(ctx, srv.host); err != nil { return fmt.Errorf("cannot start pipelines: %w", err) } diff --git a/service/service_test.go b/service/service_test.go index 84b07f1b50c..555a0d5a863 100644 --- a/service/service_test.go +++ b/service/service_test.go @@ -6,6 +6,7 @@ package service import ( "bufio" "context" + "errors" "net/http" "strings" "sync" @@ -21,6 +22,7 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/config/confignet" "go.opentelemetry.io/collector/config/configtelemetry" + "go.opentelemetry.io/collector/confmap" "go.opentelemetry.io/collector/connector/connectortest" "go.opentelemetry.io/collector/exporter/exportertest" "go.opentelemetry.io/collector/extension" @@ -356,6 +358,51 @@ func TestServiceTelemetryRestart(t *testing.T) { assert.NoError(t, srvTwo.Shutdown(context.Background())) } +func TestExtensionNotificationFailure(t *testing.T) { + set := newNopSettings() + cfg := newNopConfig() + + var extName component.Type = "configWatcher" + configWatcherExtensionFactory := newConfigWatcherExtensionFactory(extName) + set.Extensions = extension.NewBuilder( + map[component.ID]component.Config{component.NewID(extName): configWatcherExtensionFactory.CreateDefaultConfig()}, + map[component.Type]extension.Factory{extName: configWatcherExtensionFactory}) + cfg.Extensions = []component.ID{component.NewID(extName)} + + // Create a service + srv, err := New(context.Background(), set, cfg) + require.NoError(t, err) + + // Start the service + require.Error(t, srv.Start(context.Background())) + + // Shut down the service + require.NoError(t, srv.Shutdown(context.Background())) +} + +func TestNilCollectorEffectiveConfig(t *testing.T) { + set := newNopSettings() + set.CollectorConf = nil + cfg := newNopConfig() + + var extName component.Type = "configWatcher" + configWatcherExtensionFactory := newConfigWatcherExtensionFactory(extName) + set.Extensions = extension.NewBuilder( + map[component.ID]component.Config{component.NewID(extName): configWatcherExtensionFactory.CreateDefaultConfig()}, + map[component.Type]extension.Factory{extName: configWatcherExtensionFactory}) + cfg.Extensions = []component.ID{component.NewID(extName)} + + // Create a service + srv, err := New(context.Background(), set, cfg) + require.NoError(t, err) + + // Start the service + require.NoError(t, srv.Start(context.Background())) + + // Shut down the service + require.NoError(t, srv.Shutdown(context.Background())) +} + func assertResourceLabels(t *testing.T, res pcommon.Resource, expectedLabels map[string]labelValue) { for key, labelValue := range expectedLabels { lookupKey, ok := prometheusToOtelConv[key] @@ -446,12 +493,13 @@ func assertZPages(t *testing.T, zpagesAddr string) { func newNopSettings() Settings { return Settings{ - BuildInfo: component.NewDefaultBuildInfo(), - Receivers: receivertest.NewNopBuilder(), - Processors: processortest.NewNopBuilder(), - Exporters: exportertest.NewNopBuilder(), - Connectors: connectortest.NewNopBuilder(), - Extensions: extensiontest.NewNopBuilder(), + BuildInfo: component.NewDefaultBuildInfo(), + CollectorConf: confmap.New(), + Receivers: receivertest.NewNopBuilder(), + Processors: processortest.NewNopBuilder(), + Exporters: exportertest.NewNopBuilder(), + Connectors: connectortest.NewNopBuilder(), + Extensions: extensiontest.NewNopBuilder(), } } @@ -501,3 +549,30 @@ func newNopConfigPipelineConfigs(pipelineCfgs pipelines.Config) Config { }, } } + +type configWatcherExtension struct{} + +func (comp *configWatcherExtension) Start(_ context.Context, _ component.Host) error { + return nil +} + +func (comp *configWatcherExtension) Shutdown(_ context.Context) error { + return nil +} + +func (comp *configWatcherExtension) NotifyConfig(_ context.Context, _ *confmap.Conf) error { + return errors.New("Failed to resolve config") +} + +func newConfigWatcherExtensionFactory(name component.Type) extension.Factory { + return extension.NewFactory( + name, + func() component.Config { + return &struct{}{} + }, + func(ctx context.Context, set extension.CreateSettings, extension component.Config) (extension.Extension, error) { + return &configWatcherExtension{}, nil + }, + component.StabilityLevelDevelopment, + ) +}