Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Heartbeat] States reloader and eslegclient #33405

Merged
merged 11 commits into from
Nov 2, 2022
2 changes: 2 additions & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,8 @@ https://github.com/elastic/beats/compare/v8.2.0\...main[Check the HEAD diff]
*Heartbeat*
- Fix bug affecting let's encrypt and other users of cross-signed certs, where cert expiration was incorrectly calculated. {issue}33215[33215]
- Fix broken disable feature for kibana configured monitors. {pull}33293[33293]
- Fix states client support for output options. {pull}33405[33405]
- Fix states client reloader under managed mode. {pull}33405[33405]


*Metricbeat*
Expand Down
84 changes: 44 additions & 40 deletions heartbeat/beater/heartbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (

conf "github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/go-elasticsearch/v8"

"github.com/elastic/beats/v7/heartbeat/config"
"github.com/elastic/beats/v7/heartbeat/hbregistry"
Expand All @@ -39,6 +38,7 @@ import (
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/cfgfile"
"github.com/elastic/beats/v7/libbeat/common/reload"
"github.com/elastic/beats/v7/libbeat/esleg/eslegclient"
"github.com/elastic/beats/v7/libbeat/management"
"github.com/elastic/beats/v7/libbeat/publisher/pipeline"
)
Expand All @@ -47,17 +47,12 @@ import (
type Heartbeat struct {
done chan struct{}
// config is used for iterating over elements of the config.
config config.Config
scheduler *scheduler.Scheduler
monitorReloader *cfgfile.Reloader
monitorFactory *monitors.RunnerFactory
autodiscover *autodiscover.Autodiscover
}

type EsConfig struct {
Hosts []string `config:"hosts"`
Username string `config:"username"`
Password string `config:"password"`
config config.Config
scheduler *scheduler.Scheduler
monitorReloader *cfgfile.Reloader
monitorFactory *monitors.RunnerFactory
autodiscover *autodiscover.Autodiscover
replaceStateLoader func(sl monitorstate.StateLoader)
}

// New creates a new heartbeat.
Expand All @@ -67,17 +62,14 @@ func New(b *beat.Beat, rawConfig *conf.C) (beat.Beater, error) {
return nil, fmt.Errorf("error reading config file: %w", err)
}

stateLoader := monitorstate.NilStateLoader

if b.Config.Output.Name() == "elasticsearch" {
// Connect to ES and setup the State loader
esc, err := getESClient(b.Config.Output.Config())
if err != nil {
return nil, err
}
if esc != nil {
stateLoader = monitorstate.MakeESLoader(esc, "synthetics-*,heartbeat-*", parsedConfig.RunFrom)
stateLoader, replaceStateLoader := monitorstate.AtomicStateLoader(monitorstate.NilStateLoader)
if b.Config.Output.Name() == "elasticsearch" && !b.Manager.Enabled() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should also add in the changes from https://github.com/elastic/beats/pull/33375/files#diff-3edc3a1c86f948ac04dc2aeef09be1c549a2e5cbedd80a76e49dde4d95fa6883R82 which adds a heartbeat.states.enabled config flag. The service won't have the manager enabled, so we'll have to enable that to have states work with the service.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This actually checks if heartbeat is in unmanaged mode (!b.Manager.Enabled()) to initialize a non-deferred states loader, so it should apply for the service as well.

IIRC, states flag was motivated by #33357. IMO, this PR supersedes the original problem since we can now load the config on both modes. Is there an scenario where we would still like to disable states?

// Connect to ES and setup the State loader if the output is not managed by agent
if err := makeStatesClient(b.Config.Output.Config(), replaceStateLoader, parsedConfig.RunFrom); err != nil {
logp.L().Warnf("could not connect to ES for state management during initial load: %s", err)
}
} else if b.Manager.Enabled() {
stateLoader, replaceStateLoader = monitorstate.DeferredStateLoader(monitorstate.NilStateLoader, 15*time.Second)
}

limit := parsedConfig.Scheduler.Limit
Expand Down Expand Up @@ -107,9 +99,10 @@ func New(b *beat.Beat, rawConfig *conf.C) (beat.Beater, error) {
}

bt := &Heartbeat{
done: make(chan struct{}),
config: parsedConfig,
scheduler: sched,
done: make(chan struct{}),
config: parsedConfig,
scheduler: sched,
replaceStateLoader: replaceStateLoader,
// monitorFactory is the factory used for creating all monitor instances,
// wiring them up to everything needed to actually execute.
monitorFactory: monitors.NewFactory(monitors.FactoryParams{
Expand Down Expand Up @@ -211,6 +204,24 @@ func (bt *Heartbeat) RunStaticMonitors(b *beat.Beat) (stop func(), err error) {

// RunCentralMgmtMonitors loads any central management configured configs.
func (bt *Heartbeat) RunCentralMgmtMonitors(b *beat.Beat) {
// Register output reloader for managed outputs
b.OutputConfigReloader = reload.ReloadableFunc(func(r *reload.ConfigWithMeta) error {
// Do not return error here, it will prevent libbeat output from processing the same event
if r == nil {
return nil
}
outCfg := conf.Namespace{}
if err := r.Config.Unpack(&outCfg); err != nil || outCfg.Name() != "elasticsearch" {
return nil
}

if err := makeStatesClient(outCfg.Config(), bt.replaceStateLoader, bt.config.RunFrom); err != nil {
logp.L().Warnf("could not connect to ES for state management during managed reload: %s", err)
}

return nil
})

mons := cfgfile.NewRunnerList(management.DebugK, bt.monitorFactory, b.Publisher)
reload.Register.MustRegisterList(b.Info.Beat+".monitors", mons)
inputs := cfgfile.NewRunnerList(management.DebugK, bt.monitorFactory, b.Publisher)
Expand Down Expand Up @@ -251,23 +262,16 @@ func (bt *Heartbeat) Stop() {
close(bt.done)
}

// getESClient returns an ES client if one is configured. Will return nil, nil, if none is configured.
func getESClient(outputConfig *conf.C) (esc *elasticsearch.Client, err error) {
esConfig := EsConfig{}
err = outputConfig.Unpack(&esConfig)
func makeStatesClient(cfg *conf.C, replace func(monitorstate.StateLoader), runFrom *config.LocationWithID) error {
esClient, err := eslegclient.NewConnectedClient(cfg, "Heartbeat")
if err != nil {
logp.L().Info("output is not elasticsearch, error / state tracking will not be enabled: %w", err)
return nil, nil
return err
}
esc, err = elasticsearch.NewClient(elasticsearch.Config{
Addresses: esConfig.Hosts,
Username: esConfig.Username,
Password: esConfig.Password,
})
if err != nil {
return nil, fmt.Errorf("could not initialize elasticsearch client: %w", err)

if esClient != nil {
logp.L().Info("replacing states loader")
replace(monitorstate.MakeESLoader(esClient, "synthetics-*,heartbeat-*", runFrom))
}
logp.L().Infof("successfully connected to elasticsearch for error / state tracking: %v", esConfig.Hosts)

return esc, nil
return nil
}
31 changes: 9 additions & 22 deletions heartbeat/monitors/wrappers/monitorstate/esloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,19 @@
package monitorstate

import (
"bytes"
"encoding/json"
"fmt"
"strings"

"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/mapstr"
"github.com/elastic/go-elasticsearch/v8"
"github.com/elastic/go-elasticsearch/v8/esapi"

"github.com/elastic/beats/v7/heartbeat/config"
"github.com/elastic/beats/v7/heartbeat/esutil"
"github.com/elastic/beats/v7/heartbeat/monitors/stdfields"
"github.com/elastic/beats/v7/libbeat/esleg/eslegclient"
)

func MakeESLoader(esc *elasticsearch.Client, indexPattern string, beatLocation *config.LocationWithID) StateLoader {
func MakeESLoader(esc *eslegclient.Connection, indexPattern string, beatLocation *config.LocationWithID) StateLoader {
if indexPattern == "" {
// Should never happen, but if we ever make a coding error...
logp.L().Warn("ES state loader initialized with no index pattern, will not load states from ES")
Expand Down Expand Up @@ -61,25 +59,19 @@ func MakeESLoader(esc *elasticsearch.Client, indexPattern string, beatLocation *
"match": mapstr.M{"observer.name": sf.RunFrom.ID},
})
}

reqBody, err := json.Marshal(mapstr.M{
reqBody := mapstr.M{
"sort": mapstr.M{"@timestamp": "desc"},
"query": mapstr.M{
"bool": mapstr.M{
"must": queryMustClauses,
},
},
})
if err != nil {
return nil, fmt.Errorf("could not serialize query for state save: %w", err)
}

r, err := esc.Search(func(sr *esapi.SearchRequest) {
sr.Index = []string{indexPattern}
size := 1
sr.Size = &size
sr.Body = bytes.NewReader(reqBody)
})
status, body, err := esc.Request("POST", strings.Join([]string{"/", indexPattern, "/", "_search", "?size=1"}, ""), "", nil, reqBody)
if err != nil || status > 299 {
return nil, fmt.Errorf("error executing state search for %s: %w", sf.ID, err)
}

type stateHits struct {
Hits struct {
Expand All @@ -92,13 +84,8 @@ func MakeESLoader(esc *elasticsearch.Client, indexPattern string, beatLocation *
} `json:"hits"`
}

respBody, err := esutil.CheckRetResp(r, err)
if err != nil {
return nil, fmt.Errorf("error executing state search for %s: %w", sf.ID, err)
}

sh := stateHits{}
err = json.Unmarshal(respBody, &sh)
err = json.Unmarshal(body, &sh)
if err != nil {
return nil, fmt.Errorf("could not unmarshal state hits for %s: %w", sf.ID, err)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func newESTestContext(t *testing.T) *esTestContext {
},
}
namespace, _ := uuid.NewV4()
esc := IntegES(t)
esc := IntegApiClient(t)
etc := &esTestContext{
namespace: namespace.String(),
esc: esc,
Expand Down
29 changes: 27 additions & 2 deletions heartbeat/monitors/wrappers/monitorstate/testutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ import (
"github.com/stretchr/testify/require"

"github.com/elastic/beats/v7/heartbeat/config"
"github.com/elastic/beats/v7/libbeat/esleg/eslegclient"
"github.com/elastic/beats/v7/libbeat/esleg/eslegtest"

"github.com/elastic/beats/v7/heartbeat/esutil"
"github.com/elastic/go-elasticsearch/v8"
)
Expand All @@ -34,9 +37,31 @@ func IntegESLoader(t *testing.T, indexPattern string, location *config.LocationW
return MakeESLoader(IntegES(t), indexPattern, location)
}

func IntegES(t *testing.T) (esc *elasticsearch.Client) {
func IntegES(t *testing.T) (esc *eslegclient.Connection) {
conn, err := eslegclient.NewConnection(eslegclient.ConnectionSettings{
URL: eslegtest.GetURL(),
Username: "admin",
Password: "testing",
})
if err != nil {
t.Fatal(err)
panic(err) // panic in case TestLogger did not stop test
}

conn.Encoder = eslegclient.NewJSONEncoder(nil, false)

err = conn.Connect()
if err != nil {
t.Fatal(err)
panic(err) // panic in case TestLogger did not stop test
}

return conn
}

func IntegApiClient(t *testing.T) (esc *elasticsearch.Client) {
esc, err := elasticsearch.NewClient(elasticsearch.Config{
Addresses: []string{"http://127.0.0.1:9200"},
Addresses: []string{eslegtest.GetURL()},
Username: "admin",
Password: "testing",
})
Expand Down
55 changes: 51 additions & 4 deletions heartbeat/monitors/wrappers/monitorstate/tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
package monitorstate

import (
"context"
"errors"
"math/rand"
"sync"
"time"
Expand Down Expand Up @@ -81,11 +83,13 @@ func (t *Tracker) getCurrentState(sf stdfields.StdMonitorFields) (state *State)
var err error
for i := 0; i < tries; i++ {
loadedState, err = t.stateLoader(sf)
if err != nil {
sleepFor := (time.Duration(i*i) * time.Second) + (time.Duration(rand.Intn(500)) * time.Millisecond)
logp.L().Warnf("could not load last externally recorded state, will retry again in %d milliseconds: %w", sleepFor.Milliseconds(), err)
time.Sleep(sleepFor)
if err == nil {
break
}

sleepFor := (time.Duration(i*i) * time.Second) + (time.Duration(rand.Intn(500)) * time.Millisecond)
logp.L().Warnf("could not load last externally recorded state, will retry again in %d milliseconds: %w", sleepFor.Milliseconds(), err)
time.Sleep(sleepFor)
}
if err != nil {
logp.L().Warn("could not load prior state from elasticsearch after %d attempts, will create new state for monitor: %s", tries, sf.ID)
Expand All @@ -104,3 +108,46 @@ func (t *Tracker) getCurrentState(sf stdfields.StdMonitorFields) (state *State)
func NilStateLoader(_ stdfields.StdMonitorFields) (*State, error) {
return nil, nil
}

func AtomicStateLoader(inner StateLoader) (sl StateLoader, replace func(StateLoader)) {
mtx := &sync.Mutex{}
return func(currentSL stdfields.StdMonitorFields) (*State, error) {
mtx.Lock()
defer mtx.Unlock()

return inner(currentSL)
}, func(sl StateLoader) {
mtx.Lock()
defer mtx.Unlock()
inner = sl
logp.L().Info("Updated atomic state loader")
}
}

func DeferredStateLoader(inner StateLoader, timeout time.Duration) (sl StateLoader, replace func(StateLoader)) {
stateLoader, replaceStateLoader := AtomicStateLoader(inner)

wg := sync.WaitGroup{}
ctx, cancel := context.WithTimeout(context.Background(), timeout)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very idiomatic go concurrency!

wg.Add(1)
go func() {
defer cancel()
defer wg.Done()

<-ctx.Done()

if errors.Is(ctx.Err(), context.DeadlineExceeded) {
logp.L().Warn("Timeout trying to defer state loader")
}
}()

return func(currentSL stdfields.StdMonitorFields) (*State, error) {
wg.Wait()

return stateLoader(currentSL)
}, func(sl StateLoader) {
defer cancel()

replaceStateLoader(sl)
}
}
Loading