Skip to content

Commit

Permalink
godoc
Browse files Browse the repository at this point in the history
  • Loading branch information
urso committed Mar 10, 2020
1 parent 5987016 commit 83a4471
Show file tree
Hide file tree
Showing 5 changed files with 38 additions and 27 deletions.
12 changes: 8 additions & 4 deletions libbeat/autodiscover/autodiscover.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,16 @@ const (
retryPeriod = 10 * time.Second
)

// TODO autodiscover providers config reload

// EventConfigurer generates a valid list of configs from the given event, the
// received event will have all keys defined by `StartFilter`.
// EventConfigurer is used to configure the creation of configuration objects
// from the autodiscover event bus.
type EventConfigurer interface {
// EventFilter returns the bus filter to retrieve runner start/stop triggering
// events. The bus will filter events to the ones, that contain *all* the
// the required top-level keys.
EventFilter() []string

// CreateConfig creates a list of configurations from a bus.Event. The
// received event will have all keys defined in `EventFilter`.
CreateConfig(bus.Event) ([]*common.Config, error)
}

Expand Down
5 changes: 3 additions & 2 deletions libbeat/autodiscover/eventselect.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,10 @@ func QueryConfig() EventConfigurer { return defaultConfigQuery }
func (q queryConfigFrom) EventFilter() []string { return []string{string(q)} }

func (q queryConfigFrom) CreateConfig(e bus.Event) ([]*common.Config, error) {
config, ok := e[string(q)].([]*common.Config)
fieldName := string(q)j
config, ok := e[fieldName].([]*common.Config)
if !ok {
return nil, errors.New("Got a wrong value in event `config` key")
return nil, fmt.Errorf("Event field '%q' does not contain a valid configuration object", fieldname)
}
return config, nil
}
17 changes: 13 additions & 4 deletions libbeat/cfgfile/factories.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,19 +26,27 @@ import (

type multiplexedFactory []FactoryMatcher

// FactoryMatcher returns a RunnerFactory that should be used to
// handle the given configuration.
// FactoryMatcher returns a RunnerFactory that can handle the given
// configuration if it supports it, otherwise it returns nil.
type FactoryMatcher func(cfg *common.Config) RunnerFactory

var errConfigDoesNotMatch = errors.New("config does not match accepted configurations")

// MultiplexedRunnerFactory is a RunnerFactory that uses a list of
// FactoryMatcher to choose which RunnerFactory should handle the configuration.
// When presented a Config object, MultiplexedRunnerFactory will query the
// matchers in the order given. The first RunnerFactory returned will be used
// to create the runner.
// Creating a runner or checking a configuration will return an error if no
// matcher was found. Use MatchDefault as last argument to
// MultiplexedRunnerFactory to configure a default RunnerFactory that shall
// always be used if no other factory was matched.
func MultiplexedRunnerFactory(matchers ...FactoryMatcher) RunnerFactory {
return multiplexedFactory(matchers)
}

// MatchHasField returns the configured RunnerFactory if the configation contains the configured field.
// MatchHasField returns a FactoryMatcher that returns the given RunnerFactory
// when the input config contains the given field name.
func MatchHasField(field string, factory RunnerFactory) FactoryMatcher {
return func(cfg *common.Config) RunnerFactory {
if cfg.HasField(field) {
Expand All @@ -48,7 +56,8 @@ func MatchHasField(field string, factory RunnerFactory) FactoryMatcher {
}
}

// MatchDefault always returns the configured runner factory.
// MatchDefault return a FactoryMatcher that always returns returns the given
// RunnerFactory.
func MatchDefault(factory RunnerFactory) FactoryMatcher {
return func(cfg *common.Config) RunnerFactory {
return factory
Expand Down
17 changes: 5 additions & 12 deletions libbeat/cfgfile/reload.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,13 +72,12 @@ type Reload struct {
// RunnerFactory is used for validating generated configurations and creating
// of new Runners
type RunnerFactory interface {
ConfigChecker
// Create creates a new Runner based on the given configuration.
Create(p beat.PipelineConnector, config *common.Config, meta *common.MapStrPointer) (Runner, error)
}

// ConfigChecker is usually combined with a RunnerFactory for implementations
// that can check a config without a pipeline and metadata.
type ConfigChecker interface {
// CheckConfig tests if a confiugation can be used to create an input. If it
// is not possible to create an input using the configuration, an error must
// be returned.
CheckConfig(config *common.Config) error
}

Expand Down Expand Up @@ -153,13 +152,7 @@ func (rl *Reloader) Check(runnerFactory RunnerFactory) error {
continue
}

if checker, ok := runnerFactory.(ConfigChecker); ok {
err = checker.CheckConfig(c.Config)
} else {
_, err = runnerFactory.Create(rl.pipeline, c.Config, c.Meta)
}

if err != nil {
if err = runnerFactory.CheckConfig(c.Config); err != nil {
return err
}
}
Expand Down
14 changes: 9 additions & 5 deletions libbeat/publisher/pipeline/nilpipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package pipeline

import "github.com/elastic/beats/v7/libbeat/beat"

type NilPipeline struct{}
type nilPipeline struct{}

type nilClient struct {
eventer beat.ClientEventer
Expand All @@ -28,15 +28,19 @@ type nilClient struct {
ackLastEvent func(interface{})
}

var _nilPipeline = (*NilPipeline)(nil)
var _nilPipeline = (*nilPipeline)(nil)

func NewNilPipeline() *NilPipeline { return _nilPipeline }
// NewNilPipeline returns a new pipeline that is compatible with
// beats.PipelineConnector. The pipeline will discard all events that have been
// published. Client ACK handlers will still be executed, but the callbacks
// will be executed immediately when the event is published.
func NewNilPipeline() beat.PipelineConnector { return _nilPipeline }

func (p *NilPipeline) Connect() (beat.Client, error) {
func (p *nilPipeline) Connect() (beat.Client, error) {
return p.ConnectWith(beat.ClientConfig{})
}

func (p *NilPipeline) ConnectWith(cfg beat.ClientConfig) (beat.Client, error) {
func (p *nilPipeline) ConnectWith(cfg beat.ClientConfig) (beat.Client, error) {
return &nilClient{
eventer: cfg.Events,
ackCount: cfg.ACKCount,
Expand Down

0 comments on commit 83a4471

Please sign in to comment.