Skip to content

Commit

Permalink
Notify extensions of the Collector's effective configuration (#6833)
Browse files Browse the repository at this point in the history
**Description:**

This adds an optional `ConfigWatcher` interface that extensions can
implement if they want to be notified of the effective configuration
that is used by the Collector.

I don't feel very strongly about any of the decisions I made in this PR,
so I am open to input if we would like to take a different approach
anywhere. I will leave some comments to explain the decisions I made.

**Link to tracking Issue:**
Closes
#6596

**Testing:**

I've made minimal unit test changes, but I expect to write more tests
once there is consensus on the direction for implementing this
functionality. I have done some manual testing to show that an extension
can get a YAML representation of the effective config using two YAML
input files.

---------

Co-authored-by: Evan Bradley <evan-bradley@users.noreply.github.com>
  • Loading branch information
evan-bradley and evan-bradley authored Aug 8, 2023
1 parent 3f9beca commit 49a090b
Show file tree
Hide file tree
Showing 12 changed files with 439 additions and 63 deletions.
16 changes: 16 additions & 0 deletions .chloggen/configwatcher-interface.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: extension

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add optional `ConfigWatcher` interface

# One or more tracking issues or pull requests related to the change
issues: [6596]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: Extensions implementing this interface will be notified of the Collector's effective config.
18 changes: 18 additions & 0 deletions .chloggen/extension-effective-config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: otelcol

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add optional `ConfmapProvider` interface for Config Providers

# One or more tracking issues or pull requests related to the change
issues: [6596]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: |
This allows providing the Collector's configuration as a marshaled confmap.Conf object
from a ConfigProvider
17 changes: 17 additions & 0 deletions .chloggen/servicesettings-confmap.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: service

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add `CollectorConf` field to `service.Settings`

# One or more tracking issues or pull requests related to the change
issues: [6596]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: |
This field is intended to be used by the Collector to pass its effective configuration to the service.
8 changes: 8 additions & 0 deletions extension/extension.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"fmt"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/confmap"
)

// Extension is the interface for objects hosted by the OpenTelemetry Collector that
Expand All @@ -32,6 +33,13 @@ type PipelineWatcher interface {
NotReady() error
}

// ConfigWatcher is an interface that should be implemented by an extension that
// wishes to be notified of the Collector's effective configuration.
type ConfigWatcher interface {
// NotifyConfig notifies the extension of the Collector's current effective configuration.
NotifyConfig(ctx context.Context, conf *confmap.Conf) error
}

// CreateSettings is passed to Factory.Create(...) function.
type CreateSettings struct {
// ID returns the ID of the component that will be created.
Expand Down
14 changes: 14 additions & 0 deletions otelcol/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"go.uber.org/zap"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/confmap"
"go.opentelemetry.io/collector/connector"
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/extension"
Expand Down Expand Up @@ -143,6 +144,17 @@ func (col *Collector) Shutdown() {
func (col *Collector) setupConfigurationComponents(ctx context.Context) error {
col.setCollectorState(StateStarting)

var conf *confmap.Conf

if cp, ok := col.set.ConfigProvider.(ConfmapProvider); ok {
var err error
conf, err = cp.GetConfmap(ctx)

if err != nil {
return fmt.Errorf("failed to resolve config: %w", err)
}
}

cfg, err := col.set.ConfigProvider.Get(ctx, col.set.Factories)
if err != nil {
return fmt.Errorf("failed to get config: %w", err)
Expand All @@ -154,6 +166,7 @@ func (col *Collector) setupConfigurationComponents(ctx context.Context) error {

col.service, err = service.New(ctx, service.Settings{
BuildInfo: col.set.BuildInfo,
CollectorConf: conf,
Receivers: receiver.NewBuilder(cfg.Receivers, col.set.Factories.Receivers),
Processors: processor.NewBuilder(cfg.Processors, col.set.Factories.Processors),
Exporters: exporter.NewBuilder(cfg.Exporters, col.set.Factories.Exporters),
Expand All @@ -174,6 +187,7 @@ func (col *Collector) setupConfigurationComponents(ctx context.Context) error {
return multierr.Combine(err, col.service.Shutdown(ctx))
}
col.setCollectorState(StateRunning)

return nil
}

Expand Down
45 changes: 45 additions & 0 deletions otelcol/collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ import (
"github.com/stretchr/testify/require"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/confmap"
"go.opentelemetry.io/collector/confmap/converter/expandconverter"
)

func TestStateString(t *testing.T) {
Expand Down Expand Up @@ -369,6 +371,31 @@ func TestCollectorDryRun(t *testing.T) {
require.Error(t, col.DryRun(context.Background()))
}

func TestPassConfmapToServiceFailure(t *testing.T) {
factories, err := nopFactories()
require.NoError(t, err)

cfgProvider, err := NewConfigProvider(ConfigProviderSettings{
ResolverSettings: confmap.ResolverSettings{
URIs: []string{filepath.Join("testdata", "otelcol-invalid.yaml")},
Providers: makeMapProvidersMap(newFailureProvider()),
Converters: []confmap.Converter{expandconverter.New()},
},
})
require.NoError(t, err)

set := CollectorSettings{
BuildInfo: component.NewDefaultBuildInfo(),
Factories: factories,
ConfigProvider: cfgProvider,
}
col, err := NewCollector(set)
require.NoError(t, err)

err = col.Run(context.Background())
require.Error(t, err)
}

func startCollector(ctx context.Context, t *testing.T, col *Collector) *sync.WaitGroup {
wg := &sync.WaitGroup{}
wg.Add(1)
Expand All @@ -378,3 +405,21 @@ func startCollector(ctx context.Context, t *testing.T, col *Collector) *sync.Wai
}()
return wg
}

type failureProvider struct{}

func newFailureProvider() confmap.Provider {
return &failureProvider{}
}

func (fmp *failureProvider) Retrieve(_ context.Context, _ string, _ confmap.WatcherFunc) (*confmap.Retrieved, error) {
return nil, errors.New("a failure occurred during configuration retrieval")
}

func (*failureProvider) Scheme() string {
return "file"
}

func (*failureProvider) Shutdown(context.Context) error {
return nil
}
25 changes: 25 additions & 0 deletions otelcol/configprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,26 @@ type ConfigProvider interface {
Shutdown(ctx context.Context) error
}

// ConfmapProvider is an optional interface to be implemented by ConfigProviders
// to provide confmap.Conf objects representing a marshaled version of the
// Collector's configuration.
//
// The purpose of this interface is that otelcol.ConfigProvider structs do not
// necessarily need to use confmap.Conf as their underlying config structure.
type ConfmapProvider interface {
// GetConfmap resolves the Collector's configuration and provides it as a confmap.Conf object.
//
// Should never be called concurrently with itself or any ConfigProvider method.
GetConfmap(ctx context.Context) (*confmap.Conf, error)
}

type configProvider struct {
mapResolver *confmap.Resolver
}

var _ ConfigProvider = &configProvider{}
var _ ConfmapProvider = &configProvider{}

// ConfigProviderSettings are the settings to configure the behavior of the ConfigProvider.
type ConfigProviderSettings struct {
// ResolverSettings are the settings to configure the behavior of the confmap.Resolver.
Expand Down Expand Up @@ -106,6 +122,15 @@ func (cm *configProvider) Shutdown(ctx context.Context) error {
return cm.mapResolver.Shutdown(ctx)
}

func (cm *configProvider) GetConfmap(ctx context.Context) (*confmap.Conf, error) {
conf, err := cm.mapResolver.Resolve(ctx)
if err != nil {
return nil, fmt.Errorf("cannot resolve the configuration: %w", err)
}

return conf, nil
}

func newDefaultConfigProviderSettings(uris []string) ConfigProviderSettings {
return ConfigProviderSettings{
ResolverSettings: confmap.ResolverSettings{
Expand Down
121 changes: 64 additions & 57 deletions otelcol/configprovider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,69 +11,36 @@ import (

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap/zapcore"
"gopkg.in/yaml.v3"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configtelemetry"
"go.opentelemetry.io/collector/confmap"
"go.opentelemetry.io/collector/confmap/provider/fileprovider"
"go.opentelemetry.io/collector/confmap/provider/yamlprovider"
"go.opentelemetry.io/collector/connector/connectortest"
"go.opentelemetry.io/collector/exporter/exportertest"
"go.opentelemetry.io/collector/extension/extensiontest"
"go.opentelemetry.io/collector/processor/processortest"
"go.opentelemetry.io/collector/receiver/receivertest"
"go.opentelemetry.io/collector/service"
"go.opentelemetry.io/collector/service/pipelines"
"go.opentelemetry.io/collector/service/telemetry"
)

var configNop = &Config{
Receivers: map[component.ID]component.Config{component.NewID("nop"): receivertest.NewNopFactory().CreateDefaultConfig()},
Processors: map[component.ID]component.Config{component.NewID("nop"): processortest.NewNopFactory().CreateDefaultConfig()},
Exporters: map[component.ID]component.Config{component.NewID("nop"): exportertest.NewNopFactory().CreateDefaultConfig()},
Connectors: map[component.ID]component.Config{component.NewIDWithName("nop", "con"): connectortest.NewNopFactory().CreateDefaultConfig()},
Extensions: map[component.ID]component.Config{component.NewID("nop"): extensiontest.NewNopFactory().CreateDefaultConfig()},
Service: service.Config{
Extensions: []component.ID{component.NewID("nop")},
Pipelines: pipelines.Config{
component.NewID("traces"): {
Receivers: []component.ID{component.NewID("nop")},
Processors: []component.ID{component.NewID("nop")},
Exporters: []component.ID{component.NewID("nop"), component.NewIDWithName("nop", "con")},
},
component.NewID("metrics"): {
Receivers: []component.ID{component.NewID("nop")},
Processors: []component.ID{component.NewID("nop")},
Exporters: []component.ID{component.NewID("nop")},
},
component.NewID("logs"): {
Receivers: []component.ID{component.NewID("nop"), component.NewIDWithName("nop", "con")},
Processors: []component.ID{component.NewID("nop")},
Exporters: []component.ID{component.NewID("nop")},
},
},
Telemetry: telemetry.Config{
Logs: telemetry.LogsConfig{
Level: zapcore.InfoLevel,
Development: false,
Encoding: "console",
Sampling: &telemetry.LogsSamplingConfig{
Initial: 100,
Thereafter: 100,
},
OutputPaths: []string{"stderr"},
ErrorOutputPaths: []string{"stderr"},
DisableCaller: false,
DisableStacktrace: false,
InitialFields: map[string]any(nil),
},
Metrics: telemetry.MetricsConfig{
Level: configtelemetry.LevelBasic,
Address: "localhost:8888",
},
},
},
func newConfig(yamlBytes []byte, factories Factories) (*Config, error) {
var stringMap = map[string]interface{}{}
err := yaml.Unmarshal(yamlBytes, stringMap)

if err != nil {
return nil, err
}

conf := confmap.NewFromStringMap(stringMap)

cfg, err := unmarshal(conf, factories)
if err != nil {
return nil, err
}

return &Config{
Receivers: cfg.Receivers.Configs(),
Processors: cfg.Processors.Configs(),
Exporters: cfg.Exporters.Configs(),
Connectors: cfg.Connectors.Configs(),
Extensions: cfg.Extensions.Configs(),
Service: cfg.Service,
}, nil
}

func TestConfigProviderYaml(t *testing.T) {
Expand All @@ -97,6 +64,10 @@ func TestConfigProviderYaml(t *testing.T) {

cfg, err := cp.Get(context.Background(), factories)
require.NoError(t, err)

configNop, err := newConfig(yamlBytes, factories)
require.NoError(t, err)

assert.EqualValues(t, configNop, cfg)
}

Expand All @@ -118,5 +89,41 @@ func TestConfigProviderFile(t *testing.T) {

cfg, err := cp.Get(context.Background(), factories)
require.NoError(t, err)

yamlBytes, err := os.ReadFile(filepath.Join("testdata", "otelcol-nop.yaml"))
require.NoError(t, err)

configNop, err := newConfig(yamlBytes, factories)
require.NoError(t, err)

assert.EqualValues(t, configNop, cfg)
}

func TestGetConfmap(t *testing.T) {
uriLocation := "file:" + filepath.Join("testdata", "otelcol-nop.yaml")
provider := fileprovider.New()
set := ConfigProviderSettings{
ResolverSettings: confmap.ResolverSettings{
URIs: []string{uriLocation},
Providers: map[string]confmap.Provider{provider.Scheme(): provider},
},
}

configBytes, err := os.ReadFile(filepath.Join("testdata", "otelcol-nop.yaml"))
require.NoError(t, err)

yamlMap := map[string]any{}
err = yaml.Unmarshal(configBytes, yamlMap)
require.NoError(t, err)

cp, err := NewConfigProvider(set)
require.NoError(t, err)

cmp, ok := cp.(ConfmapProvider)
require.True(t, ok)

cmap, err := cmp.GetConfmap(context.Background())
require.NoError(t, err)

assert.EqualValues(t, yamlMap, cmap.ToStringMap())
}
12 changes: 12 additions & 0 deletions service/extensions/extensions.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"go.uber.org/multierr"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/confmap"
"go.opentelemetry.io/collector/extension"
"go.opentelemetry.io/collector/service/internal/components"
"go.opentelemetry.io/collector/service/internal/zpages"
Expand Down Expand Up @@ -72,6 +73,17 @@ func (bes *Extensions) NotifyPipelineNotReady() error {
return errs
}

func (bes *Extensions) NotifyConfig(ctx context.Context, conf *confmap.Conf) error {
var errs error
for _, ext := range bes.extMap {
if cw, ok := ext.(extension.ConfigWatcher); ok {
clonedConf := confmap.NewFromStringMap(conf.ToStringMap())
errs = multierr.Append(errs, cw.NotifyConfig(ctx, clonedConf))
}
}
return errs
}

func (bes *Extensions) GetExtensions() map[component.ID]component.Component {
result := make(map[component.ID]component.Component, len(bes.extMap))
for extID, v := range bes.extMap {
Expand Down
Loading

0 comments on commit 49a090b

Please sign in to comment.