diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index beb9100b479f..c0984dd3eed8 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -433,6 +433,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Bundle synthetics deps with heartbeat docker image. {pull}23274[23274] - Add mime type detection for http responses. {pull}22976[22976] - Support JSON expressions / validation of JSON arrays. {pull}28073[28073] +- Experimental 'run once' mode. {pull}25972[25972] *Journalbeat* diff --git a/heartbeat/_meta/config/beat.yml.tmpl b/heartbeat/_meta/config/beat.yml.tmpl index f03774a5c9d7..949acb8f357b 100644 --- a/heartbeat/_meta/config/beat.yml.tmpl +++ b/heartbeat/_meta/config/beat.yml.tmpl @@ -37,6 +37,18 @@ heartbeat.monitors: # Name of corresponding APM service, if Elastic APM is in use for the monitored service. #service.name: my-apm-service-name +# Experimental: Configure monitors that run exactly once. +# If enabled, heartbeat.monitors will be ignored +# Heartbeat will run these monitors once then exit. +#heartbeat.run_once: +#- type: http + #id: my-monitor + #name: My Monitor + #urls: ["http://localhost:9200"] + # NOTE: you must still provide the schedule field! Heartbeat + # Uses this to determine the contents of the monitor.timespan field + #schedule: '@every 10s' + {{header "Elasticsearch template setting"}} setup.template.settings: diff --git a/heartbeat/beater/heartbeat.go b/heartbeat/beater/heartbeat.go index 5fd9a4e09086..f76d19e98f38 100644 --- a/heartbeat/beater/heartbeat.go +++ b/heartbeat/beater/heartbeat.go @@ -20,12 +20,15 @@ package beater import ( "errors" "fmt" + "sync" "syscall" "time" "github.com/elastic/beats/v7/heartbeat/config" "github.com/elastic/beats/v7/heartbeat/hbregistry" "github.com/elastic/beats/v7/heartbeat/monitors" + "github.com/elastic/beats/v7/heartbeat/monitors/plugin" + "github.com/elastic/beats/v7/heartbeat/monitors/stdfields" "github.com/elastic/beats/v7/heartbeat/scheduler" "github.com/elastic/beats/v7/libbeat/autodiscover" "github.com/elastic/beats/v7/libbeat/beat" @@ -34,6 +37,7 @@ import ( "github.com/elastic/beats/v7/libbeat/common/reload" "github.com/elastic/beats/v7/libbeat/logp" "github.com/elastic/beats/v7/libbeat/management" + "github.com/elastic/beats/v7/x-pack/functionbeat/function/core" _ "github.com/elastic/beats/v7/libbeat/processors/script" ) @@ -81,10 +85,17 @@ func New(b *beat.Beat, rawConfig *common.Config) (beat.Beater, error) { // Run executes the beat. func (bt *Heartbeat) Run(b *beat.Beat) error { logp.Info("heartbeat is running! Hit CTRL-C to stop it.") - groups, _ := syscall.Getgroups() logp.Info("Effective user/group ids: %d/%d, with groups: %v", syscall.Geteuid(), syscall.Getegid(), groups) + if bt.config.RunOnce != nil { + err := bt.runRunOnce(b) + if err != nil { + return err + } + return nil + } + stopStaticMonitors, err := bt.RunStaticMonitors(b) if err != nil { return err @@ -126,6 +137,65 @@ func (bt *Heartbeat) Run(b *beat.Beat) error { return nil } +// runRunOnce runs the given config then exits immediately after any queued events have been sent to ES +func (bt *Heartbeat) runRunOnce(b *beat.Beat) error { + logp.Info("Starting run_once run. This is an experimental feature and may be changed or removed in the future!") + cfgs := bt.config.RunOnce + + publishClient, err := core.NewSyncClient(logp.NewLogger("run_once mode"), b.Publisher, beat.ClientConfig{}) + if err != nil { + return fmt.Errorf("could not create sync client: %w", err) + } + defer publishClient.Close() + + wg := &sync.WaitGroup{} + for _, cfg := range cfgs { + err := runRunOnceSingleConfig(cfg, publishClient, wg) + if err != nil { + logp.Warn("error running run_once config: %s", err) + } + } + + wg.Wait() + publishClient.Wait() + + logp.Info("Ending run_once run") + + return nil +} + +func runRunOnceSingleConfig(cfg *common.Config, publishClient *core.SyncClient, wg *sync.WaitGroup) (err error) { + sf, err := stdfields.ConfigToStdMonitorFields(cfg) + if err != nil { + return fmt.Errorf("could not get stdmon fields: %w", err) + } + pluginFactory, exists := plugin.GlobalPluginsReg.Get(sf.Type) + if !exists { + return fmt.Errorf("no plugin for type: %s", sf.Type) + } + plugin, err := pluginFactory.Make(sf.Type, cfg) + if err != nil { + return err + } + + results := plugin.RunWrapped(sf) + + wg.Add(1) + go func() { + defer wg.Done() + defer plugin.Close() + for { + event := <-results + if event == nil { + break + } + publishClient.Publish(*event) + } + }() + + return nil +} + // RunStaticMonitors runs the `heartbeat.monitors` portion of the yaml config if present. func (bt *Heartbeat) RunStaticMonitors(b *beat.Beat) (stop func(), err error) { factory := monitors.NewFactory(b.Info, bt.scheduler, true) diff --git a/heartbeat/config/config.go b/heartbeat/config/config.go index 8156495a36e8..8f674926744a 100644 --- a/heartbeat/config/config.go +++ b/heartbeat/config/config.go @@ -27,7 +27,7 @@ import ( // Config defines the structure of heartbeat.yml. type Config struct { - // Modules is a list of module specific configuration data. + RunOnce []*common.Config `config:"run_once"` Monitors []*common.Config `config:"monitors"` ConfigMonitors *common.Config `config:"config.monitors"` Scheduler Scheduler `config:"scheduler"` diff --git a/heartbeat/docs/heartbeat-options.asciidoc b/heartbeat/docs/heartbeat-options.asciidoc index 48be43156041..7aa65fae814a 100644 --- a/heartbeat/docs/heartbeat-options.asciidoc +++ b/heartbeat/docs/heartbeat-options.asciidoc @@ -109,3 +109,37 @@ include::monitors/monitor-tcp.asciidoc[] include::monitors/monitor-http.asciidoc[] include::monitors/monitor-browser.asciidoc[] + +[float] +[[run-once-mode]] +=== Run Once Mode (Experimental) + +You can configure {beatname_uc} run monitors exactly once then exit, bypassing the scheduler. This is referred to as running {beatname_uc} in "run once" mode. This is an experimental feature +and is subject to change. + +[source,yaml] +---------------------------------------------------------------------- +# heartbeat.yml +heartbeat.run_once: +- type: icmp + id: ping-myhost + name: My Host Ping + hosts: ["myhost"] + # Note that schedule is still needed to inform heartbeat when the next + # expected check is to be run. This is needed to populate the monitor.timespan field used by the Uptime app. + schedule: '@every 5s' +- type: tcp + id: myhost-tcp-echo + name: My Host TCP Echo + hosts: ["myhost:777"] # default TCP Echo Protocol + check.send: "Check" + check.receive: "Check" + schedule: '@every 5s' +- type: http + id: service-status + name: Service Status + service.name: my-apm-service-name + hosts: ["http://localhost:80/service/status"] + check.response.status: [200] + schedule: '@every 5s' +---------------------------------------------------------------------- diff --git a/heartbeat/heartbeat.yml b/heartbeat/heartbeat.yml index fdbb29c9661d..885d51e42b87 100644 --- a/heartbeat/heartbeat.yml +++ b/heartbeat/heartbeat.yml @@ -37,6 +37,18 @@ heartbeat.monitors: # Name of corresponding APM service, if Elastic APM is in use for the monitored service. #service.name: my-apm-service-name +# Experimental: Configure monitors that run exactly once. +# If enabled, heartbeat.monitors will be ignored +# Heartbeat will run these monitors once then exit. +#heartbeat.run_once: +#- type: http + #id: my-monitor + #name: My Monitor + #urls: ["http://localhost:9200"] + # NOTE: you must still provide the schedule field! Heartbeat + # Uses this to determine the contents of the monitor.timespan field + #schedule: '@every 10s' + # ======================= Elasticsearch template setting ======================= setup.template.settings: diff --git a/heartbeat/monitors/active/http/http.go b/heartbeat/monitors/active/http/http.go index 315870e25c53..149ca1a4b4fd 100644 --- a/heartbeat/monitors/active/http/http.go +++ b/heartbeat/monitors/active/http/http.go @@ -121,7 +121,7 @@ func create( js[i] = wrappers.WithURLField(u, job) } - return plugin.Plugin{Jobs: js, Close: nil, Endpoints: len(config.Hosts)}, nil + return plugin.Plugin{Jobs: js, Endpoints: len(config.Hosts)}, nil } func newRoundTripper(config *Config) (http.RoundTripper, error) { diff --git a/heartbeat/monitors/active/icmp/icmp.go b/heartbeat/monitors/active/icmp/icmp.go index 1315a1dddf06..ef57cdbebae3 100644 --- a/heartbeat/monitors/active/icmp/icmp.go +++ b/heartbeat/monitors/active/icmp/icmp.go @@ -113,7 +113,7 @@ func (jf *jobFactory) makePlugin() (plugin2 plugin.Plugin, err error) { j = append(j, wrappers.WithURLField(u, job)) } - return plugin.Plugin{Jobs: j, Close: nil, Endpoints: len(jf.config.Hosts)}, nil + return plugin.Plugin{Jobs: j, Endpoints: len(jf.config.Hosts)}, nil } func (jf *jobFactory) pingIPFactory(config *Config) func(*net.IPAddr) jobs.Job { diff --git a/heartbeat/monitors/active/tcp/tcp.go b/heartbeat/monitors/active/tcp/tcp.go index aeaebe79a555..805e5a9a6d40 100644 --- a/heartbeat/monitors/active/tcp/tcp.go +++ b/heartbeat/monitors/active/tcp/tcp.go @@ -68,7 +68,7 @@ func createWithResolver( return plugin.Plugin{}, err } - return plugin.Plugin{Jobs: js, Close: nil, Endpoints: len(jc.endpoints)}, nil + return plugin.Plugin{Jobs: js, Endpoints: len(jc.endpoints)}, nil } // jobFactory is where most of the logic here lives. It provides a common context around diff --git a/heartbeat/monitors/mocks_test.go b/heartbeat/monitors/mocks_test.go index fecf8a53f590..6d51791e3d72 100644 --- a/heartbeat/monitors/mocks_test.go +++ b/heartbeat/monitors/mocks_test.go @@ -145,7 +145,7 @@ func mockPluginBuilder() (plugin.PluginFactory, *atomic.Int, *atomic.Int) { return plugin.PluginFactory{ Name: "test", Aliases: []string{"testAlias"}, - Builder: func(s string, config *common.Config) (plugin.Plugin, error) { + Make: func(s string, config *common.Config) (plugin.Plugin, error) { built.Inc() // Declare a real config block with a required attr so we can see what happens when it doesn't work unpacked := struct { @@ -160,7 +160,7 @@ func mockPluginBuilder() (plugin.PluginFactory, *atomic.Int, *atomic.Int) { closed.Inc() return nil } - return plugin.Plugin{Jobs: j, Close: closer, Endpoints: 1}, err + return plugin.Plugin{Jobs: j, DoClose: closer, Endpoints: 1}, err }, Stats: plugin.NewPluginCountersRecorder("test", reg)}, built, diff --git a/heartbeat/monitors/plugin/plugin.go b/heartbeat/monitors/plugin/plugin.go index 73335ca5e296..d1a6690b983e 100644 --- a/heartbeat/monitors/plugin/plugin.go +++ b/heartbeat/monitors/plugin/plugin.go @@ -25,25 +25,67 @@ import ( "github.com/elastic/beats/v7/heartbeat/hbregistry" "github.com/elastic/beats/v7/heartbeat/monitors/jobs" + "github.com/elastic/beats/v7/heartbeat/monitors/stdfields" + "github.com/elastic/beats/v7/heartbeat/monitors/wrappers" + "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/plugin" ) +// PluginFactory represents an uninstantiated plug in instance generated from a monitor config. Invoking the Make function creates a plug-in instance. type PluginFactory struct { Name string Aliases []string - Builder PluginFactoryCreate + Make PluginMake Stats RegistryRecorder } -type PluginFactoryCreate func(string, *common.Config) (p Plugin, err error) +type PluginMake func(string, *common.Config) (p Plugin, err error) +// Plugin describes a configured instance of a plug-in with its jobs already instantiated. type Plugin struct { Jobs []jobs.Job - Close func() error + DoClose func() error Endpoints int } +// Close closes the plugin, invoking any DoClose hooks if avialable. +func (p Plugin) Close() error { + if p.DoClose != nil { + return p.DoClose() + } + return nil +} + +// RunWrapped runs the plug-in with the provided wrappers returning a channel of resultant events. +func (p Plugin) RunWrapped(fields stdfields.StdMonitorFields) chan *beat.Event { + wj := wrappers.WrapCommon(p.Jobs, fields) + results := make(chan *beat.Event) + + var runJob func(j jobs.Job) + runJob = func(j jobs.Job) { + e := &beat.Event{} + conts, err := j(e) + // No error handling since WrapCommon handles all errors + if err != nil { + panic(fmt.Sprintf("unexpected error on wrapped job!: %s", err)) + } + results <- e + for _, c := range conts { + runJob(c) + } + } + + go func() { + for _, j := range wj { + runJob(j) + } + close(results) + }() + + return results +} + var pluginKey = "heartbeat.monitor" // stateGlobalRecorder records statistics across all plugin types @@ -70,7 +112,7 @@ func init() { } stats := statsForPlugin(p.Name) - return GlobalPluginsReg.Register(PluginFactory{p.Name, p.Aliases, p.Builder, stats}) + return GlobalPluginsReg.Register(PluginFactory{p.Name, p.Aliases, p.Make, stats}) }) } @@ -98,9 +140,9 @@ func NewPluginsReg() *PluginsReg { } // Register registers a new active (as opposed to passive) monitor. -func Register(name string, builder PluginFactoryCreate, aliases ...string) { +func Register(name string, make PluginMake, aliases ...string) { stats := statsForPlugin(name) - if err := GlobalPluginsReg.Add(PluginFactory{name, aliases, builder, stats}); err != nil { + if err := GlobalPluginsReg.Add(PluginFactory{name, aliases, make, stats}); err != nil { panic(err) } } @@ -161,5 +203,5 @@ func (r *PluginsReg) MonitorNames() []string { } func (e *PluginFactory) Create(cfg *common.Config) (p Plugin, err error) { - return e.Builder(e.Name, cfg) + return e.Make(e.Name, cfg) } diff --git a/heartbeat/tests/system/config/heartbeat.yml.j2 b/heartbeat/tests/system/config/heartbeat.yml.j2 index 0555c5729cba..8cd31ea87343 100644 --- a/heartbeat/tests/system/config/heartbeat.yml.j2 +++ b/heartbeat/tests/system/config/heartbeat.yml.j2 @@ -50,6 +50,59 @@ heartbeat.monitors: {% endif %} {% endfor -%} +{%- if run_once is defined %} +heartbeat.run_once: +{% for monitor in run_once -%} +- type: {{ monitor.type }} + schedule: '{{ monitor.schedule|default("@every 1s") }}' + {%- if monitor.timeout is defined %} + timeout: {{monitor.timeout}} + {% endif -%} + + {%- if monitor.enabled is defined %} + enabled: {{monitor.enabled}} + {% endif -%} + + {%- if monitor.tags is defined %} + tags: + {% for tag in monitor.tags -%} + - '{{ tag }}' + {% endfor %} + {% endif -%} + + {%- if monitor.hosts is defined %} + hosts: + {%- for host in monitor.hosts %} + - '{{ host }}' + {% endfor -%} + {% endif -%} + + {%- if monitor.urls is defined %} + urls: + {%- for url in monitor.urls %} + - '{{ url }}' + {% endfor %} + {% endif -%} + + + {%- if monitor.check_response_json is defined %} + check.response.json: + {%- for check in monitor.check_response_json %} + - {{check}} + {% endfor %} + {% endif -%} + + {%- if monitor.fields is defined %} + {% if monitor.fields_under_root %}fields_under_root: true{% endif %} + fields: + {% for k, v in monitor.fields.items() -%} + {{ k }}: {{ v }} + {% endfor %} + {% endif %} +{% endfor -%} +{% endif %} + + {% if reload or reload_path -%} heartbeat.config.monitors: path: {{ reload_path|default("${path.config}/monitors.d/*.yml") }} diff --git a/heartbeat/tests/system/test_base.py b/heartbeat/tests/system/test_base.py index b65ccea587df..9fea6a7aff7d 100644 --- a/heartbeat/tests/system/test_base.py +++ b/heartbeat/tests/system/test_base.py @@ -33,6 +33,36 @@ def test_base(self): self.wait_until(lambda: self.log_contains("heartbeat is running")) heartbeat_proc.check_kill_and_wait() + def test_run_once(self): + """ + Basic test with exiting Heartbeat normally + """ + + config = { + "run_once": [ + { + "type": "http", + "id": "http-check", + "urls": ["http://localhost:9200"], + }, + { + "type": "tcp", + "id": "tcp-check", + "hosts": ["localhost:9200"], + } + ] + } + + self.render_config_template( + path=os.path.abspath(self.working_dir) + "/log/*", + **config + ) + + heartbeat_proc = self.start_beat() + self.wait_until(lambda: self.output_has(lines=2)) + self.wait_until(lambda: self.log_contains("Ending run_once run")) + heartbeat_proc.check_wait() + def test_disabled(self): """ Basic test against a disabled monitor @@ -54,7 +84,7 @@ def test_disabled(self): ) heartbeat_proc = self.start_beat() - self.wait_until(lambda: self.log_contains("skipping disabled monitor")) + self.wait_until(lambda: self.log_contains("heartbeat is running")) heartbeat_proc.check_kill_and_wait() def test_fields_under_root(self): diff --git a/x-pack/heartbeat/heartbeat.yml b/x-pack/heartbeat/heartbeat.yml index fdbb29c9661d..885d51e42b87 100644 --- a/x-pack/heartbeat/heartbeat.yml +++ b/x-pack/heartbeat/heartbeat.yml @@ -37,6 +37,18 @@ heartbeat.monitors: # Name of corresponding APM service, if Elastic APM is in use for the monitored service. #service.name: my-apm-service-name +# Experimental: Configure monitors that run exactly once. +# If enabled, heartbeat.monitors will be ignored +# Heartbeat will run these monitors once then exit. +#heartbeat.run_once: +#- type: http + #id: my-monitor + #name: My Monitor + #urls: ["http://localhost:9200"] + # NOTE: you must still provide the schedule field! Heartbeat + # Uses this to determine the contents of the monitor.timespan field + #schedule: '@every 10s' + # ======================= Elasticsearch template setting ======================= setup.template.settings: diff --git a/x-pack/heartbeat/monitors/browser/suite.go b/x-pack/heartbeat/monitors/browser/suite.go index da27f4a3e15f..0347f419ab58 100644 --- a/x-pack/heartbeat/monitors/browser/suite.go +++ b/x-pack/heartbeat/monitors/browser/suite.go @@ -112,7 +112,7 @@ func (s *Suite) jobs() []jobs.Job { func (s *Suite) plugin() plugin.Plugin { return plugin.Plugin{ Jobs: s.jobs(), - Close: s.Close, + DoClose: s.Close, Endpoints: 1, } }