Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Center more functionality around RunnerFactory #16715

Merged
merged 14 commits into from
Mar 26, 2020
80 changes: 0 additions & 80 deletions filebeat/autodiscover/autodiscover.go

This file was deleted.

23 changes: 0 additions & 23 deletions filebeat/autodiscover/include.go

This file was deleted.

17 changes: 11 additions & 6 deletions filebeat/beater/crawler.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,27 +76,32 @@ func (c *crawler) Start(
for _, inputConfig := range c.inputConfigs {
err := c.startInput(pipeline, inputConfig, r.GetStates())
if err != nil {
return err
return fmt.Errorf("starting input failed: %+v", err)
}
}

if configInputs.Enabled() {
c.inputReloader = cfgfile.NewReloader(pipeline, configInputs)
if err := c.inputReloader.Check(c.inputsFactory); err != nil {
return err
return fmt.Errorf("creating input reloader failed: %+v", err)
}

go func() {
c.inputReloader.Run(c.inputsFactory)
}()
}

if configModules.Enabled() {
c.modulesReloader = cfgfile.NewReloader(pipeline, configModules)
if err := c.modulesReloader.Check(c.modulesFactory); err != nil {
return err
return fmt.Errorf("creating module reloader failed: %+v", err)
}

}

if c.inputReloader != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will c.inputReloader ever be non-nil when crawler.Start is called, or is it always initialized from this function?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will be nil if input reloading has not been enabled. I just moved the check down here, so we do not leak the go-routine in case of startup going wrong when the module loader is faulty.

go func() {
c.inputReloader.Run(c.inputsFactory)
}()
}
if c.modulesReloader != nil {
go func() {
c.modulesReloader.Run(c.modulesFactory)
}()
Expand Down
20 changes: 16 additions & 4 deletions filebeat/beater/filebeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (

"github.com/pkg/errors"

fbautodiscover "github.com/elastic/beats/v7/filebeat/autodiscover"
"github.com/elastic/beats/v7/filebeat/channel"
cfg "github.com/elastic/beats/v7/filebeat/config"
"github.com/elastic/beats/v7/filebeat/fileset"
Expand All @@ -43,9 +42,14 @@ import (
"github.com/elastic/beats/v7/libbeat/monitoring"
"github.com/elastic/beats/v7/libbeat/outputs/elasticsearch"

_ "github.com/elastic/beats/v7/filebeat/include"

// Add filebeat level processors
_ "github.com/elastic/beats/v7/filebeat/processor/add_kubernetes_metadata"
_ "github.com/elastic/beats/v7/libbeat/processors/decode_csv_fields"

// include all filebeat specific builders
_ "github.com/elastic/beats/v7/filebeat/autodiscover/builder/hints"
)

const pipelinesWarning = "Filebeat is unable to load the Ingest Node pipelines for the configured" +
Expand Down Expand Up @@ -282,7 +286,7 @@ func (fb *Filebeat) Run(b *beat.Beat) error {
err = crawler.Start(b.Publisher, registrar, config.ConfigInput, config.ConfigModules)
if err != nil {
crawler.Stop()
return err
return fmt.Errorf("Failed to start crawler: %+v", err)
}

// If run once, add crawler completion check as alternative to done signal
Expand All @@ -304,8 +308,16 @@ func (fb *Filebeat) Run(b *beat.Beat) error {

var adiscover *autodiscover.Autodiscover
if fb.config.Autodiscover != nil {
adapter := fbautodiscover.NewAutodiscoverAdapter(inputLoader, moduleLoader)
adiscover, err = autodiscover.NewAutodiscover("filebeat", b.Publisher, adapter, config.Autodiscover)
adiscover, err = autodiscover.NewAutodiscover(
"filebeat",
b.Publisher,
cfgfile.MultiplexedRunnerFactory(
cfgfile.MatchHasField("module", moduleLoader),
cfgfile.MatchDefault(inputLoader),
),
autodiscover.QueryConfig(),
config.Autodiscover,
)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion filebeat/channel/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ type ConnectorFunc func(*common.Config, beat.ClientConfig) (Outleter, error)

type pipelineConnector struct {
parent *OutletFactory
pipeline beat.Pipeline
pipeline beat.PipelineConnector
}

// Connect passes the cfg and the zero value of beat.ClientConfig to the underlying function.
Expand Down
2 changes: 1 addition & 1 deletion filebeat/channel/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func NewOutletFactory(
// Inputs and all harvesters use the same pipeline client instance.
// This guarantees ordering between events as required by the registrar for
// file.State updates
func (f *OutletFactory) Create(p beat.Pipeline) Connector {
func (f *OutletFactory) Create(p beat.PipelineConnector) Connector {
return &pipelineConnector{parent: f, pipeline: p}
}

Expand Down
2 changes: 1 addition & 1 deletion filebeat/channel/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (
)

// Factory is used to create a new Outlet instance
type Factory func(beat.Pipeline) Connector
type Factory func(beat.PipelineConnector) Connector

// Connector creates an Outlet connecting the event publishing with some internal pipeline.
// type Connector func(*common.Config, *common.MapStrPointer) (Outleter, error)
Expand Down
9 changes: 8 additions & 1 deletion filebeat/fileset/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/elastic/beats/v7/libbeat/logp"
"github.com/elastic/beats/v7/libbeat/monitoring"
"github.com/elastic/beats/v7/libbeat/outputs/elasticsearch"
pubpipeline "github.com/elastic/beats/v7/libbeat/publisher/pipeline"

"github.com/mitchellh/hashstructure"
)
Expand Down Expand Up @@ -75,7 +76,7 @@ func NewFactory(
}

// Create creates a module based on a config
func (f *Factory) Create(p beat.Pipeline, c *common.Config, meta *common.MapStrPointer) (cfgfile.Runner, error) {
func (f *Factory) Create(p beat.PipelineConnector, c *common.Config, meta *common.MapStrPointer) (cfgfile.Runner, error) {
// Start a registry of one module:
m, err := NewModuleRegistry([]*common.Config{c}, f.beatInfo, false)
if err != nil {
Expand Down Expand Up @@ -114,6 +115,11 @@ func (f *Factory) Create(p beat.Pipeline, c *common.Config, meta *common.MapStrP
}, nil
}

func (f *Factory) CheckConfig(c *common.Config) error {
_, err := f.Create(pubpipeline.NewNilPipeline(), c, nil)
return err
}

func (p *inputsRunner) Start() {
// Load pipelines
if p.pipelineLoaderFactory != nil {
Expand Down Expand Up @@ -153,6 +159,7 @@ func (p *inputsRunner) Start() {
moduleList.Add(m)
}
}

func (p *inputsRunner) Stop() {
if p.pipelineCallbackID != uuid.Nil {
elasticsearch.DeregisterConnectCallback(p.pipelineCallbackID)
Expand Down
8 changes: 7 additions & 1 deletion filebeat/fileset/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/elastic/beats/v7/libbeat/cfgfile"
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/logp"
pubpipeline "github.com/elastic/beats/v7/libbeat/publisher/pipeline"
)

// SetupFactory is for loading module assets when running setup subcommand.
Expand All @@ -41,7 +42,7 @@ func NewSetupFactory(beatInfo beat.Info, pipelineLoaderFactory PipelineLoaderFac
}

// Create creates a new SetupCfgRunner to setup module configuration.
func (sf *SetupFactory) Create(_ beat.Pipeline, c *common.Config, _ *common.MapStrPointer) (cfgfile.Runner, error) {
func (sf *SetupFactory) Create(_ beat.PipelineConnector, c *common.Config, _ *common.MapStrPointer) (cfgfile.Runner, error) {
m, err := NewModuleRegistry([]*common.Config{c}, sf.beatInfo, false)
if err != nil {
return nil, err
Expand All @@ -54,6 +55,11 @@ func (sf *SetupFactory) Create(_ beat.Pipeline, c *common.Config, _ *common.MapS
}, nil
}

func (sf *SetupFactory) CheckConfig(c *common.Config) error {
_, err := sf.Create(pubpipeline.NewNilPipeline(), c, nil)
return err
}

// SetupCfgRunner is for loading assets of modules.
type SetupCfgRunner struct {
moduleRegistry *ModuleRegistry
Expand Down
8 changes: 7 additions & 1 deletion filebeat/input/runnerfactory.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/cfgfile"
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/publisher/pipeline"
)

// RunnerFactory is a factory for registrars
Expand All @@ -43,7 +44,7 @@ func NewRunnerFactory(outlet channel.Factory, registrar *registrar.Registrar, be

// Create creates a input based on a config
func (r *RunnerFactory) Create(
pipeline beat.Pipeline,
pipeline beat.PipelineConnector,
c *common.Config,
meta *common.MapStrPointer,
) (cfgfile.Runner, error) {
Expand All @@ -56,3 +57,8 @@ func (r *RunnerFactory) Create(

return p, nil
}

func (r *RunnerFactory) CheckConfig(cfg *common.Config) error {
_, err := r.Create(pipeline.NewNilPipeline(), cfg, nil)
return err
}
23 changes: 0 additions & 23 deletions heartbeat/autodiscover/include.go

This file was deleted.

9 changes: 1 addition & 8 deletions heartbeat/beater/heartbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,14 +156,7 @@ func (bt *Heartbeat) RunReloadableMonitors(b *beat.Beat) (err error) {

// makeAutodiscover creates an autodiscover object ready to be started.
func (bt *Heartbeat) makeAutodiscover(b *beat.Beat) (*autodiscover.Autodiscover, error) {
adapter := autodiscover.NewFactoryAdapter(bt.dynamicFactory)

ad, err := autodiscover.NewAutodiscover("heartbeat", b.Publisher, adapter, bt.config.Autodiscover)
if err != nil {
return nil, err
}

return ad, nil
return autodiscover.NewAutodiscover("heartbeat", b.Publisher, bt.dynamicFactory, autodiscover.QueryConfig(), bt.config.Autodiscover)
}

// Stop stops the beat.
Expand Down
4 changes: 3 additions & 1 deletion heartbeat/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@ package cmd
import (
"fmt"

_ "github.com/elastic/beats/v7/heartbeat/autodiscover"
"github.com/elastic/beats/v7/heartbeat/beater"

// include all heartbeat specific autodiscovery builders
_ "github.com/elastic/beats/v7/heartbeat/autodiscover/builder/hints"

// register default heartbeat monitors
_ "github.com/elastic/beats/v7/heartbeat/monitors/defaults"
cmd "github.com/elastic/beats/v7/libbeat/cmd"
Expand Down
2 changes: 1 addition & 1 deletion heartbeat/monitors/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func NewFactory(sched *scheduler.Scheduler, allowWatches bool) *RunnerFactory {
}

// Create makes a new Runner for a new monitor with the given Config.
func (f *RunnerFactory) Create(p beat.Pipeline, c *common.Config, meta *common.MapStrPointer) (cfgfile.Runner, error) {
func (f *RunnerFactory) Create(p beat.PipelineConnector, c *common.Config, meta *common.MapStrPointer) (cfgfile.Runner, error) {
monitor, err := newMonitor(c, globalPluginsReg, p, f.sched, f.allowWatches, meta)
return monitor, err
}
Expand Down
Loading