Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Heartbeat] Refactor config system #23467

Merged
merged 45 commits into from
Feb 17, 2021
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
6a0b645
[Uptime] Copy synthetics suites to tmpdir before running
andrewvc Jan 4, 2021
2c48c55
Add changelog
andrewvc Jan 4, 2021
1ab0fd6
Fix var naming
andrewvc Jan 4, 2021
168c6a0
Remote journeyps
andrewvc Jan 5, 2021
e94fe34
Checkpoint
andrewvc Jan 5, 2021
a6581ba
Checkpoint
andrewvc Jan 5, 2021
3ba5203
Checkpoint
andrewvc Jan 6, 2021
d5aacfe
Checkpoint
andrewvc Jan 6, 2021
3e47422
Suites as jobs
andrewvc Jan 8, 2021
f7971f4
Checkpoint
andrewvc Jan 8, 2021
fd4d03d
Checkpoint
andrewvc Jan 10, 2021
d27f695
Sort of works
andrewvc Jan 10, 2021
47320ce
Reorg
andrewvc Jan 12, 2021
7c05aa6
Simplification attempt
andrewvc Jan 12, 2021
367548c
checkpoint
andrewvc Jan 12, 2021
70f5281
Things work again
andrewvc Jan 13, 2021
3c66ed1
Switch from multiple plugin registration to aliases
andrewvc Jan 14, 2021
a96277f
Cleanup
andrewvc Jan 14, 2021
44628b0
Suites almost working, inline works
andrewvc Jan 15, 2021
75250bb
Checkpoint
andrewvc Jan 15, 2021
54eff79
npm i + beginnings of cleanup
andrewvc Jan 15, 2021
c57c81e
Improve structure of monitor plugins
andrewvc Jan 16, 2021
9b9f6b1
More updates
andrewvc Jan 16, 2021
1c10922
Add some basic validation to sources
andrewvc Jan 16, 2021
faac3c9
Test fixes
andrewvc Jan 16, 2021
100dead
Test fixes
andrewvc Jan 16, 2021
5c7d981
Improve monitor tests
andrewvc Jan 20, 2021
0956aff
Refactor wrappers/monitors for greater testability
andrewvc Jan 21, 2021
d587a67
Merge remote-tracking branch 'origin/master' into remote-journeys
andrewvc Jan 24, 2021
8554370
checkpoint
andrewvc Jan 25, 2021
8081b1c
Add tests for new wrapper behavior
andrewvc Jan 29, 2021
2332b13
Additional tests
andrewvc Jan 29, 2021
fde5765
Add basic validations for local source
andrewvc Jan 29, 2021
c82c155
Add tests for local source
andrewvc Jan 30, 2021
24ced05
Minimize fixtures
andrewvc Jan 30, 2021
95d695d
Merge remote-tracking branch 'origin/master' into remote-journeys
andrewvc Jan 30, 2021
ebcfb03
Update go.sum
andrewvc Jan 30, 2021
34469bf
Fix linter errors
andrewvc Jan 30, 2021
8ed3ad1
Fix local source behavior to remove node_modules
andrewvc Feb 2, 2021
4002755
Improve test coverage
andrewvc Feb 2, 2021
b7c8d19
Add changelog
andrewvc Feb 2, 2021
6e1a49b
Update changelog
andrewvc Feb 3, 2021
3c47a5c
Merge remote-tracking branch 'origin/master' into remote-journeys
andrewvc Feb 9, 2021
4d8c07b
Merge remote-tracking branch 'origin/master' into remote-journeys
andrewvc Feb 17, 2021
8248900
Incorporate PR feedback
andrewvc Feb 17, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -793,6 +793,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
*Heartbeat*

- Add mime type detection for http responses. {pull}22976[22976]
- Copy suite directories for synthetic checks to tmp dir before running. {pull}23347[23347]

*Journalbeat*

Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,8 @@ github.com/eapache/queue v1.1.0 h1:YOEu7KNc61ntiQlcEeUIoDTJ2o8mQznoNvUhiigpIqc=
github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I=
github.com/eclipse/paho.mqtt.golang v1.2.1-0.20200121105743-0d940dd29fd2 h1:DW6WrARxK5J+o8uAKCiACi5wy9EK1UzrsCpGBPsKHAA=
github.com/eclipse/paho.mqtt.golang v1.2.1-0.20200121105743-0d940dd29fd2/go.mod h1:H9keYFcgq3Qr5OUJm/JZI/i6U7joQ8SYLhZwfeOo6Ts=
github.com/elastic/beats v1.3.1 h1:hHzUBHCo3HJHxnRVwa0XlfZoxmP8Rxp7GQ0ZVELGY4A=
github.com/elastic/beats v7.6.2+incompatible h1:jHdLv83KURaqWUC6f55iMyVP6LYZrgElfeqxKWcskVE=
github.com/elastic/dhcp v0.0.0-20200227161230-57ec251c7eb3 h1:lnDkqiRFKm0rxdljqrj3lotWinO9+jFmeDXIC4gvIQs=
github.com/elastic/dhcp v0.0.0-20200227161230-57ec251c7eb3/go.mod h1:aPqzac6AYkipvp4hufTyMj5PDIphF3+At8zr7r51xjY=
github.com/elastic/ecs v1.6.0 h1:8NmgfnsjmKXh9hVsK3H2tZtfUptepNc3msJOAynhtmc=
Expand Down
52 changes: 0 additions & 52 deletions heartbeat/beater/heartbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package beater

import (
"context"
"fmt"
"time"

Expand Down Expand Up @@ -100,13 +99,6 @@ func (bt *Heartbeat) Run(b *beat.Beat) error {
}
}

if len(bt.config.SyntheticSuites) > 0 {
err := bt.RunSyntheticSuiteMonitors(b)
if err != nil {
return err
}
}

if bt.config.Autodiscover != nil {
bt.autodiscover, err = bt.makeAutodiscover(b)
if err != nil {
Expand Down Expand Up @@ -168,50 +160,6 @@ func (bt *Heartbeat) RunReloadableMonitors(b *beat.Beat) (err error) {
return nil
}

// Provide hook to define journey list discovery from x-pack
type JourneyLister func(ctx context.Context, suiteFile string, params common.MapStr) ([]string, error)

var mainJourneyLister JourneyLister

func RegisterJourneyLister(jl JourneyLister) {
mainJourneyLister = jl
}

func (bt *Heartbeat) RunSyntheticSuiteMonitors(b *beat.Beat) error {
// If we are running without XPack this will be nil
if mainJourneyLister == nil {
return nil
}
for _, suite := range bt.config.SyntheticSuites {
logp.Info("Listing suite %s", suite.Path)
journeyNames, err := mainJourneyLister(context.TODO(), suite.Path, suite.Params)
if err != nil {
return err
}
factory := monitors.NewFactory(b.Info, bt.scheduler, false)
for _, name := range journeyNames {
cfg, err := common.NewConfigFrom(map[string]interface{}{
"type": "browser",
"path": suite.Path,
"schedule": suite.Schedule,
"params": suite.Params,
"journey_name": name,
"name": name,
"id": name,
})
if err != nil {
return err
}
created, err := factory.Create(b.Publisher, cfg)
if err != nil {
return errors.Wrap(err, "could not create monitor")
}
created.Start()
}
}
return nil
}

// makeAutodiscover creates an autodiscover object ready to be started.
func (bt *Heartbeat) makeAutodiscover(b *beat.Beat) (*autodiscover.Autodiscover, error) {
autodiscover, err := autodiscover.NewAutodiscover(
Expand Down
9 changes: 1 addition & 8 deletions heartbeat/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ type Config struct {
ConfigMonitors *common.Config `config:"config.monitors"`
Scheduler Scheduler `config:"scheduler"`
Autodiscover *autodiscover.Config `config:"autodiscover"`
SyntheticSuites []*SyntheticSuite `config:"synthetic_suites"`
SyntheticSuites []*common.Config `config:"synthetic_suites"`
}

// Scheduler defines the syntax of a heartbeat.yml scheduler block.
Expand All @@ -41,12 +41,5 @@ type Scheduler struct {
Location string `config:"location"`
}

type SyntheticSuite struct {
Path string `config:"path"`
Name string `config:"id_prefix"`
Schedule string `config:"schedule"`
Params map[string]interface{} `config:"params"`
}

// DefaultConfig is the canonical instantiation of Config.
var DefaultConfig = Config{}
3 changes: 1 addition & 2 deletions heartbeat/monitors/active/http/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,7 @@ import (
)

func init() {
monitors.RegisterActive("http", create)
monitors.RegisterActive("synthetics/http", create)
monitors.RegisterActive("http", create, "synthetics/http")
}

var debugf = logp.MakeDebug("http")
Expand Down
3 changes: 1 addition & 2 deletions heartbeat/monitors/active/icmp/icmp.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,7 @@ import (
var debugf = logp.MakeDebug("icmp")

func init() {
monitors.RegisterActive("icmp", create)
monitors.RegisterActive("synthetics/icmp", create)
monitors.RegisterActive("icmp", create, "synthetics/icmp")
}

func create(
Expand Down
3 changes: 1 addition & 2 deletions heartbeat/monitors/active/tcp/tcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,7 @@ import (
)

func init() {
monitors.RegisterActive("tcp", create)
monitors.RegisterActive("synthetics/tcp", create)
monitors.RegisterActive("tcp", create, "synthetics/tcp")
}

var debugf = logp.MakeDebug("tcp")
Expand Down
29 changes: 16 additions & 13 deletions heartbeat/monitors/mocks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,19 +137,22 @@ func createMockJob(name string, cfg *common.Config) ([]jobs.Job, error) {
func mockPluginBuilder() pluginBuilder {
reg := monitoring.NewRegistry()

return pluginBuilder{"test", ActiveMonitor, func(s string, config *common.Config) ([]jobs.Job, int, error) {
// Declare a real config block with a required attr so we can see what happens when it doesn't work
unpacked := struct {
URLs []string `config:"urls" validate:"required"`
}{}
err := config.Unpack(&unpacked)
if err != nil {
return nil, 0, err
}
c := common.Config{}
j, err := createMockJob("test", &c)
return j, 1, err
}, newPluginCountersRecorder("test", reg)}
return pluginBuilder{
"test",
[]string{"testAlias"},
func(s string, config *common.Config) ([]jobs.Job, int, error) {
// Declare a real config block with a required attr so we can see what happens when it doesn't work
unpacked := struct {
URLs []string `config:"urls" validate:"required"`
}{}
err := config.Unpack(&unpacked)
if err != nil {
return nil, 0, err
}
c := common.Config{}
j, err := createMockJob("test", &c)
return j, 1, err
}, newPluginCountersRecorder("test", reg)}
}

func mockPluginsReg() *pluginsReg {
Expand Down
8 changes: 4 additions & 4 deletions heartbeat/monitors/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,18 +120,18 @@ func newMonitorUnsafe(
// Extract just the Id, Type, and Enabled fields from the config
// We'll parse things more precisely later once we know what exact type of
// monitor we have
stdFields, err := stdfields.ConfigToStdMonitorFields(config)
standardFields, err := stdfields.ConfigToStdMonitorFields(config)
if err != nil {
return nil, err
}

monitorPlugin, found := registrar.get(stdFields.Type)
monitorPlugin, found := registrar.get(standardFields.Type)
if !found {
return nil, fmt.Errorf("monitor type %v does not exist, valid types are %v", stdFields.Type, registrar.monitorNames())
return nil, fmt.Errorf("monitor type %v does not exist, valid types are %v", standardFields.Type, registrar.monitorNames())
}

m := &Monitor{
stdFields: stdFields,
stdFields: standardFields,
pluginName: monitorPlugin.name,
scheduler: scheduler,
configuredJobs: []*configuredJob{},
Expand Down
18 changes: 12 additions & 6 deletions heartbeat/monitors/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import (

type pluginBuilder struct {
name string
typ Type
aliases []string
builder PluginBuilder
stats registryRecorder
}
Expand Down Expand Up @@ -62,7 +62,7 @@ func init() {
}

stats := statsForPlugin(p.name)
return globalPluginsReg.register(pluginBuilder{p.name, p.typ, p.builder, stats})
return globalPluginsReg.register(pluginBuilder{p.name, p.aliases, p.builder, stats})
})
}

Expand Down Expand Up @@ -94,9 +94,9 @@ func newPluginsReg() *pluginsReg {
}

// RegisterActive registers a new active (as opposed to passive) monitor.
func RegisterActive(name string, builder PluginBuilder) {
func RegisterActive(name string, builder PluginBuilder, aliases ...string) {
stats := statsForPlugin(name)
if err := globalPluginsReg.add(pluginBuilder{name, ActiveMonitor, builder, stats}); err != nil {
if err := globalPluginsReg.add(pluginBuilder{name, aliases, builder, stats}); err != nil {
panic(err)
}
}
Expand All @@ -106,20 +106,26 @@ func RegisterActive(name string, builder PluginBuilder) {
type ErrPluginAlreadyExists pluginBuilder

func (m ErrPluginAlreadyExists) Error() string {
return fmt.Sprintf("monitor plugin '%s' already exists", m.typ)
return fmt.Sprintf("monitor plugin named '%s' with aliases %v already exists", m.name, m.aliases)
}

func (r *pluginsReg) add(plugin pluginBuilder) error {
if _, exists := r.monitors[plugin.name]; exists {
return ErrPluginAlreadyExists(plugin)
}
r.monitors[plugin.name] = plugin
for _, alias := range plugin.aliases {
if _, exists := r.monitors[alias]; exists {
return ErrPluginAlreadyExists(plugin)
}
r.monitors[alias] = plugin
}
return nil
}

func (r *pluginsReg) register(plugin pluginBuilder) error {
if _, found := r.monitors[plugin.name]; found {
return fmt.Errorf("monitor type %v already exists", plugin.typ)
return fmt.Errorf("monitor type %v already exists", plugin.name)
}

r.monitors[plugin.name] = plugin
Expand Down
60 changes: 33 additions & 27 deletions heartbeat/monitors/wrappers/monitors.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,33 +38,50 @@ import (

// WrapCommon applies the common wrappers that all monitor jobs get.
func WrapCommon(js []jobs.Job, stdMonFields stdfields.StdMonitorFields) []jobs.Job {
jobWrappers := []jobs.JobWrapper{
addMonitorMeta(stdMonFields, len(js) > 1),
addMonitorStatus(stdMonFields.Type),
}

if stdMonFields.Type != "browser" {
jobWrappers = append(jobWrappers, addMonitorDuration)
if stdMonFields.Type == "browser" {
return WrapBrowser(js, stdMonFields)
} else {
return WrapLightweight(js, stdMonFields)
}
}

func WrapLightweight(js []jobs.Job, stdMonFields stdfields.StdMonitorFields) []jobs.Job {
return jobs.WrapAllSeparately(
jobs.WrapAll(
js,
jobWrappers...,
addMonitorMeta(stdMonFields, len(js) > 1),
addMonitorStatus(stdMonFields.Type),
addMonitorDuration,
),
func() jobs.JobWrapper {
return makeAddSummary(stdMonFields.Type)
})
}

func WrapBrowser(js []jobs.Job, stdMonFields stdfields.StdMonitorFields) []jobs.Job {
return jobs.WrapAll(
js,
addMonitorMeta(stdMonFields, len(js) > 1),
addMonitorStatus(stdMonFields.Type),
)
}

// addMonitorMeta adds the id, name, and type fields to the monitor.
func addMonitorMeta(stdMonFields stdfields.StdMonitorFields, isMulti bool) jobs.JobWrapper {
return func(job jobs.Job) jobs.Job {
return func(event *beat.Event) ([]jobs.Job, error) {
started := time.Now()
cont, e := job(event)
thisID := stdMonFields.ID

thisName := stdMonFields.Name
// Allow jobs to override the ID, useful for browser suites
// which do this logic on their own
if v, _ := event.GetValue("monitor.id"); v != nil {
thisID = v.(string)
}
if v, _ := event.GetValue("monitor.name"); v != nil {
thisName = v.(string)
}
if isMulti {
url, err := event.GetValue("url.full")
if err != nil {
Expand All @@ -78,7 +95,7 @@ func addMonitorMeta(stdMonFields stdfields.StdMonitorFields, isMulti bool) jobs.
fieldsToMerge := common.MapStr{
"monitor": common.MapStr{
"id": thisID,
"name": stdMonFields.Name,
"name": thisName,
"type": stdMonFields.Type,
"timespan": timespan(started, stdMonFields.Schedule, stdMonFields.Timeout),
},
Expand Down Expand Up @@ -120,13 +137,6 @@ func addMonitorStatus(monitorType string) jobs.JobWrapper {
return func(event *beat.Event) ([]jobs.Job, error) {
cont, err := origJob(event)

// Non-summary browser events have no status associated
if monitorType == "browser" {
if t, _ := event.GetValue("synthetics.type"); t != "heartbeat/summary" {
return cont, nil
}
}

fields := common.MapStr{
"monitor": common.MapStr{
"status": look.Status(err),
Expand Down Expand Up @@ -167,6 +177,7 @@ func makeAddSummary(monitorType string) jobs.JobWrapper {
// state struct here.
state := struct {
mtx sync.Mutex
monitorId string
remaining uint16
up uint16
down uint16
Expand All @@ -191,6 +202,10 @@ func makeAddSummary(monitorType string) jobs.JobWrapper {

return func(job jobs.Job) jobs.Job {
return func(event *beat.Event) ([]jobs.Job, error) {
if v, _ := event.GetValue("monitor.id"); v != state.monitorId {
resetState()
}

cont, jobErr := job(event)
state.mtx.Lock()
defer state.mtx.Unlock()
Expand All @@ -208,7 +223,6 @@ func makeAddSummary(monitorType string) jobs.JobWrapper {
}
}

// No error check needed here
event.PutValue("monitor.check_group", state.checkGroup)

// Adjust the total remaining to account for new continuations
Expand All @@ -220,15 +234,7 @@ func makeAddSummary(monitorType string) jobs.JobWrapper {
if state.remaining == 0 {
up := state.up
down := state.down
if monitorType == "browser" {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now handled directly in the browser type

if eventStatus == "down" {
up = 0
down = 1
} else {
up = 1
down = 0
}
}

eventext.MergeEventFields(event, common.MapStr{
"summary": common.MapStr{
"up": up,
Expand Down
Loading