Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[pkg/stanza] Simplify the way input operator configs are handled #14078

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions pkg/stanza/adapter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,6 @@ type ConverterConfig struct {
WorkerCount int `mapstructure:"worker_count"`
}

// InputConfig is an alias that allows unmarshaling outside of mapstructure
// This is meant to be used only for the input operator
type InputConfig map[string]interface{}

// decodeOperatorConfigs is an unmarshaling workaround for stanza operators
// This is needed only until stanza operators are migrated to mapstructure
func (cfg BaseConfig) DecodeOperatorConfigs() ([]operator.Config, error) {
Expand Down
10 changes: 3 additions & 7 deletions pkg/stanza/adapter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ type LogReceiverType interface {
Type() config.Type
CreateDefaultConfig() config.Receiver
BaseConfig(config.Receiver) BaseConfig
DecodeInputConfig(config.Receiver) (*operator.Config, error)
InputConfig(config.Receiver) operator.Config
}

// NewFactory creates a factory for a Stanza-based receiver
Expand All @@ -50,18 +50,14 @@ func createLogsReceiver(logReceiverType LogReceiverType) component.CreateLogsRec
cfg config.Receiver,
nextConsumer consumer.Logs,
) (component.LogsReceiver, error) {
inputCfg, err := logReceiverType.DecodeInputConfig(cfg)
if err != nil {
return nil, err
}

inputCfg := logReceiverType.InputConfig(cfg)
baseCfg := logReceiverType.BaseConfig(cfg)
operatorCfgs, err := baseCfg.DecodeOperatorConfigs()
if err != nil {
return nil, err
}

operators := append([]operator.Config{*inputCfg}, operatorCfgs...)
operators := append([]operator.Config{inputCfg}, operatorCfgs...)

emitterOpts := []LogEmitterOption{
LogEmitterWithLogger(params.Logger.Sugar()),
Expand Down
11 changes: 0 additions & 11 deletions pkg/stanza/adapter/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,17 +51,6 @@ func TestCreateReceiver(t *testing.T) {
require.NotNil(t, receiver, "receiver creation failed")
})

t.Run("DecodeInputConfigFailure", func(t *testing.T) {
factory := NewFactory(TestReceiverType{}, component.StabilityLevelInDevelopment)
badCfg := factory.CreateDefaultConfig().(*TestConfig)
badCfg.Input = map[string]interface{}{
"type": "unknown",
}
receiver, err := factory.CreateLogsReceiver(context.Background(), componenttest.NewNopReceiverCreateSettings(), badCfg, consumertest.NewNop())
require.Error(t, err, "receiver creation should fail if input config isn't valid")
require.Nil(t, receiver, "receiver creation should fail if input config isn't valid")
})

t.Run("DecodeOperatorConfigsFailureMissingFields", func(t *testing.T) {
factory := NewFactory(TestReceiverType{}, component.StabilityLevelInDevelopment)
badCfg := factory.CreateDefaultConfig().(*TestConfig)
Expand Down
31 changes: 8 additions & 23 deletions pkg/stanza/adapter/mocks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,15 +47,11 @@ type UnstartableOperator struct {
helper.OutputOperator
}

func newUnstartableParams() map[string]interface{} {
return map[string]interface{}{"type": "unstartable_operator"}
}

// NewUnstartableConfig creates new output config
func NewUnstartableConfig() *UnstartableConfig {
return &UnstartableConfig{
// newUnstartableConfig creates new output config
func NewUnstartableConfig() operator.Config {
return operator.NewConfig(&UnstartableConfig{
OutputConfig: helper.NewOutputConfig("unstartable_operator", "unstartable_operator"),
}
})
}

// Build will build an unstartable operator
Expand Down Expand Up @@ -87,7 +83,7 @@ const testType = "test"

type TestConfig struct {
BaseConfig `mapstructure:",squash"`
Input InputConfig `mapstructure:",remain"`
Input operator.Config `mapstructure:",squash"`
}
type TestReceiverType struct{}

Expand All @@ -105,25 +101,14 @@ func (f TestReceiverType) CreateDefaultConfig() config.Receiver {
FlushInterval: 100 * time.Millisecond,
},
},
Input: InputConfig{},
Input: operator.NewConfig(noop.NewConfig()),
}
}

func (f TestReceiverType) BaseConfig(cfg config.Receiver) BaseConfig {
return cfg.(*TestConfig).BaseConfig
}

func (f TestReceiverType) DecodeInputConfig(cfg config.Receiver) (*operator.Config, error) {
testConfig := cfg.(*TestConfig)

// Allow tests to run without implementing input config
if testConfig.Input["type"] == nil {
return &operator.Config{Builder: noop.NewConfig()}, nil
}

// Allow tests to explicitly prompt a failure
if testConfig.Input["type"] == "unknown" {
return nil, errors.New("unknown input type")
}
return &operator.Config{Builder: NewUnstartableConfig()}, nil
func (f TestReceiverType) InputConfig(cfg config.Receiver) operator.Config {
return cfg.(*TestConfig).Input
}
2 changes: 1 addition & 1 deletion pkg/stanza/adapter/receiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func TestHandleStartError(t *testing.T) {
factory := NewFactory(TestReceiverType{}, component.StabilityLevelInDevelopment)

cfg := factory.CreateDefaultConfig().(*TestConfig)
cfg.Input = newUnstartableParams()
cfg.Input = NewUnstartableConfig()

receiver, err := factory.CreateLogsReceiver(context.Background(), componenttest.NewNopReceiverCreateSettings(), cfg, mockConsumer)
require.NoError(t, err, "receiver should successfully build")
Expand Down
5 changes: 5 additions & 0 deletions pkg/stanza/operator/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@ type Config struct {
Builder
}

// NewConfig wraps the builder interface in a concrete struct
func NewConfig(b Builder) Config {
return Config{Builder: b}
}

// Builder is an entity that can build a single operator
type Builder interface {
ID() string
Expand Down
7 changes: 3 additions & 4 deletions receiver/filelogreceiver/filelog.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,7 @@ type FileLogConfig struct {
adapter.BaseConfig `mapstructure:",squash"`
}

// DecodeInputConfig unmarshals the input operator
func (f ReceiverType) DecodeInputConfig(cfg config.Receiver) (*operator.Config, error) {
logConfig := cfg.(*FileLogConfig)
return &operator.Config{Builder: &logConfig.InputConfig}, nil
// InputConfig unmarshals the input operator
func (f ReceiverType) InputConfig(cfg config.Receiver) operator.Config {
return operator.NewConfig(&cfg.(*FileLogConfig).InputConfig)
}
7 changes: 3 additions & 4 deletions receiver/journaldreceiver/journald.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,7 @@ type JournaldConfig struct {
InputConfig journald.Config `mapstructure:",squash"`
}

// DecodeInputConfig unmarshals the input operator
func (f ReceiverType) DecodeInputConfig(cfg config.Receiver) (*operator.Config, error) {
logConfig := cfg.(*JournaldConfig)
return &operator.Config{Builder: &logConfig.InputConfig}, nil
// InputConfig unmarshals the input operator
func (f ReceiverType) InputConfig(cfg config.Receiver) operator.Config {
return operator.NewConfig(&cfg.(*JournaldConfig).InputConfig)
}
2 changes: 1 addition & 1 deletion receiver/journaldreceiver/journald_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func TestLoadConfig(t *testing.T) {
assert.Equal(t, testdataConfigYaml(), cfg.Receivers[config.NewComponentID("journald")])
}

func TestDecodeInputConfigFailure(t *testing.T) {
func TestInputConfigFailure(t *testing.T) {
sink := new(consumertest.LogsSink)
factory := NewFactory()
badCfg := &JournaldConfig{
Expand Down
7 changes: 3 additions & 4 deletions receiver/syslogreceiver/syslog.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,9 @@ type SysLogConfig struct {
adapter.BaseConfig `mapstructure:",squash"`
}

// DecodeInputConfig unmarshals the input operator
func (f ReceiverType) DecodeInputConfig(cfg config.Receiver) (*operator.Config, error) {
logConfig := cfg.(*SysLogConfig)
return &operator.Config{Builder: &logConfig.InputConfig}, nil
// InputConfig unmarshals the input operator
func (f ReceiverType) InputConfig(cfg config.Receiver) operator.Config {
return operator.NewConfig(&cfg.(*SysLogConfig).InputConfig)
}

func (cfg *SysLogConfig) Unmarshal(componentParser *confmap.Conf) error {
Expand Down
7 changes: 3 additions & 4 deletions receiver/tcplogreceiver/tcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,7 @@ type TCPLogConfig struct {
adapter.BaseConfig `mapstructure:",squash"`
}

// DecodeInputConfig unmarshals the input operator
func (f ReceiverType) DecodeInputConfig(cfg config.Receiver) (*operator.Config, error) {
logConfig := cfg.(*TCPLogConfig)
return &operator.Config{Builder: &logConfig.InputConfig}, nil
// InputConfig unmarshals the input operator
func (f ReceiverType) InputConfig(cfg config.Receiver) operator.Config {
return operator.NewConfig(&cfg.(*TCPLogConfig).InputConfig)
}
7 changes: 3 additions & 4 deletions receiver/udplogreceiver/udp.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,7 @@ type UDPLogConfig struct {
adapter.BaseConfig `mapstructure:",squash"`
}

// DecodeInputConfig unmarshals the input operator
func (f ReceiverType) DecodeInputConfig(cfg config.Receiver) (*operator.Config, error) {
logConfig := cfg.(*UDPLogConfig)
return &operator.Config{Builder: &logConfig.InputConfig}, nil
// InputConfig unmarshals the input operator
func (f ReceiverType) InputConfig(cfg config.Receiver) operator.Config {
return operator.NewConfig(&cfg.(*UDPLogConfig).InputConfig)
}
38 changes: 15 additions & 23 deletions receiver/windowseventlogreceiver/receiver_others.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,14 @@
package windowseventlogreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/windowseventlogreceiver"

import (
"errors"
"context"
"fmt"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/consumer"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/adapter"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator"
)

const (
Expand All @@ -34,37 +35,28 @@ const (

// NewFactory creates a factory for windowseventlog receiver
func NewFactory() component.ReceiverFactory {
return adapter.NewFactory(ReceiverType{}, stability)
return component.NewReceiverFactory(
typeStr,
createDefaultConfig,
component.WithLogsReceiver(createLogsReceiver, stability))
}

// ReceiverType implements adapter.LogReceiverType
// to create a file tailing receiver
type ReceiverType struct{}

// Type is the receiver type
func (f ReceiverType) Type() config.Type {
return typeStr
}

// CreateDefaultConfig creates a config with type and version
func (f ReceiverType) CreateDefaultConfig() config.Receiver {
func createDefaultConfig() config.Receiver {
return &WindowsLogConfig{
BaseConfig: adapter.BaseConfig{
ReceiverSettings: config.NewReceiverSettings(config.NewComponentID(typeStr)),
Operators: adapter.OperatorConfigs{},
Converter: adapter.ConverterConfig{},
},
}
}

// BaseConfig gets the base config from config, for now
func (f ReceiverType) BaseConfig(cfg config.Receiver) adapter.BaseConfig {
return cfg.(*WindowsLogConfig).BaseConfig
}

// DecodeInputConfig unmarshals the input operator
func (f ReceiverType) DecodeInputConfig(cfg config.Receiver) (*operator.Config, error) {
return nil, errors.New("the windows eventlog receiver is only supported on Windows")
func createLogsReceiver(
_ context.Context,
params component.ReceiverCreateSettings,
cfg config.Receiver,
consumer consumer.Logs,
) (component.LogsReceiver, error) {
return nil, fmt.Errorf("windows eventlog receiver is only supported on Windows")
}

// WindowsLogConfig defines configuration for the windowseventlog receiver
Expand Down
15 changes: 11 additions & 4 deletions receiver/windowseventlogreceiver/receiver_others_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,13 @@
package windowseventlogreceiver

import (
"context"
"testing"

"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/config/configtest"
"go.opentelemetry.io/collector/consumer/consumertest"
)

func TestDefaultConfigFailure(t *testing.T) {
Expand All @@ -30,8 +33,12 @@ func TestDefaultConfigFailure(t *testing.T) {
require.NotNil(t, cfg, "failed to create default config")
require.NoError(t, configtest.CheckConfigStruct(cfg))

r := ReceiverType{}
ops, err := r.DecodeInputConfig(cfg)
require.Nil(t, ops, "failed to create default config")
require.ErrorContains(t, err, "the windows eventlog receiver is only supported on Windows")
receiver, err := factory.CreateLogsReceiver(
context.Background(),
componenttest.NewNopReceiverCreateSettings(),
cfg,
new(consumertest.LogsSink),
)
require.Nil(t, receiver, "failed to create default config")
require.ErrorContains(t, err, "windows eventlog receiver is only supported on Windows")
}
7 changes: 3 additions & 4 deletions receiver/windowseventlogreceiver/receiver_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,9 @@ func (f ReceiverType) BaseConfig(cfg config.Receiver) adapter.BaseConfig {
return cfg.(*WindowsLogConfig).BaseConfig
}

// DecodeInputConfig unmarshals the input operator
func (f ReceiverType) DecodeInputConfig(cfg config.Receiver) (*operator.Config, error) {
logConfig := cfg.(*WindowsLogConfig)
return &operator.Config{Builder: &logConfig.InputConfig}, nil
// InputConfig unmarshals the input operator
func (f ReceiverType) InputConfig(cfg config.Receiver) operator.Config {
return operator.NewConfig(&cfg.(*WindowsLogConfig).InputConfig)
}

// WindowsLogConfig defines configuration for the windowseventlog receiver
Expand Down
16 changes: 16 additions & 0 deletions unreleased/pkg-stanza-simplify-inputconfig.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: breaking

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: pkg/stanza/adapter

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Remove `OperatorConfigs` type, rename `LogReceiverType.DecodeInputConfig` to `LogReceiverType.InputConfig`.

# One or more tracking issues related to the change
issues: [14078]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: