diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index fde2c427394..6937712336e 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -85,6 +85,8 @@ https://github.com/elastic/beats/compare/v8.2.0\...main[Check the HEAD diff] *Heartbeat* - Fix bug affecting let's encrypt and other users of cross-signed certs, where cert expiration was incorrectly calculated. {issue}33215[33215] - Fix broken disable feature for kibana configured monitors. {pull}33293[33293] +- Fix states client support for output options. {pull}33405[33405] +- Fix states client reloader under managed mode. {pull}33405[33405] *Metricbeat* diff --git a/heartbeat/beater/heartbeat.go b/heartbeat/beater/heartbeat.go index 5b9f862e4cb..0453cb846f8 100644 --- a/heartbeat/beater/heartbeat.go +++ b/heartbeat/beater/heartbeat.go @@ -26,7 +26,6 @@ import ( conf "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/elastic-agent-libs/logp" - "github.com/elastic/go-elasticsearch/v8" "github.com/elastic/beats/v7/heartbeat/config" "github.com/elastic/beats/v7/heartbeat/hbregistry" @@ -39,6 +38,7 @@ import ( "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/cfgfile" "github.com/elastic/beats/v7/libbeat/common/reload" + "github.com/elastic/beats/v7/libbeat/esleg/eslegclient" "github.com/elastic/beats/v7/libbeat/management" "github.com/elastic/beats/v7/libbeat/publisher/pipeline" ) @@ -47,17 +47,12 @@ import ( type Heartbeat struct { done chan struct{} // config is used for iterating over elements of the config. - config config.Config - scheduler *scheduler.Scheduler - monitorReloader *cfgfile.Reloader - monitorFactory *monitors.RunnerFactory - autodiscover *autodiscover.Autodiscover -} - -type EsConfig struct { - Hosts []string `config:"hosts"` - Username string `config:"username"` - Password string `config:"password"` + config config.Config + scheduler *scheduler.Scheduler + monitorReloader *cfgfile.Reloader + monitorFactory *monitors.RunnerFactory + autodiscover *autodiscover.Autodiscover + replaceStateLoader func(sl monitorstate.StateLoader) } // New creates a new heartbeat. @@ -67,17 +62,14 @@ func New(b *beat.Beat, rawConfig *conf.C) (beat.Beater, error) { return nil, fmt.Errorf("error reading config file: %w", err) } - stateLoader := monitorstate.NilStateLoader - - if b.Config.Output.Name() == "elasticsearch" { - // Connect to ES and setup the State loader - esc, err := getESClient(b.Config.Output.Config()) - if err != nil { - return nil, err - } - if esc != nil { - stateLoader = monitorstate.MakeESLoader(esc, "synthetics-*,heartbeat-*", parsedConfig.RunFrom) + stateLoader, replaceStateLoader := monitorstate.AtomicStateLoader(monitorstate.NilStateLoader) + if b.Config.Output.Name() == "elasticsearch" && !b.Manager.Enabled() { + // Connect to ES and setup the State loader if the output is not managed by agent + if err := makeStatesClient(b.Config.Output.Config(), replaceStateLoader, parsedConfig.RunFrom); err != nil { + logp.L().Warnf("could not connect to ES for state management during initial load: %s", err) } + } else if b.Manager.Enabled() { + stateLoader, replaceStateLoader = monitorstate.DeferredStateLoader(monitorstate.NilStateLoader, 15*time.Second) } limit := parsedConfig.Scheduler.Limit @@ -107,9 +99,10 @@ func New(b *beat.Beat, rawConfig *conf.C) (beat.Beater, error) { } bt := &Heartbeat{ - done: make(chan struct{}), - config: parsedConfig, - scheduler: sched, + done: make(chan struct{}), + config: parsedConfig, + scheduler: sched, + replaceStateLoader: replaceStateLoader, // monitorFactory is the factory used for creating all monitor instances, // wiring them up to everything needed to actually execute. monitorFactory: monitors.NewFactory(monitors.FactoryParams{ @@ -211,6 +204,24 @@ func (bt *Heartbeat) RunStaticMonitors(b *beat.Beat) (stop func(), err error) { // RunCentralMgmtMonitors loads any central management configured configs. func (bt *Heartbeat) RunCentralMgmtMonitors(b *beat.Beat) { + // Register output reloader for managed outputs + b.OutputConfigReloader = reload.ReloadableFunc(func(r *reload.ConfigWithMeta) error { + // Do not return error here, it will prevent libbeat output from processing the same event + if r == nil { + return nil + } + outCfg := conf.Namespace{} + if err := r.Config.Unpack(&outCfg); err != nil || outCfg.Name() != "elasticsearch" { + return nil + } + + if err := makeStatesClient(outCfg.Config(), bt.replaceStateLoader, bt.config.RunFrom); err != nil { + logp.L().Warnf("could not connect to ES for state management during managed reload: %s", err) + } + + return nil + }) + mons := cfgfile.NewRunnerList(management.DebugK, bt.monitorFactory, b.Publisher) reload.Register.MustRegisterList(b.Info.Beat+".monitors", mons) inputs := cfgfile.NewRunnerList(management.DebugK, bt.monitorFactory, b.Publisher) @@ -251,23 +262,16 @@ func (bt *Heartbeat) Stop() { close(bt.done) } -// getESClient returns an ES client if one is configured. Will return nil, nil, if none is configured. -func getESClient(outputConfig *conf.C) (esc *elasticsearch.Client, err error) { - esConfig := EsConfig{} - err = outputConfig.Unpack(&esConfig) +func makeStatesClient(cfg *conf.C, replace func(monitorstate.StateLoader), runFrom *config.LocationWithID) error { + esClient, err := eslegclient.NewConnectedClient(cfg, "Heartbeat") if err != nil { - logp.L().Info("output is not elasticsearch, error / state tracking will not be enabled: %w", err) - return nil, nil + return err } - esc, err = elasticsearch.NewClient(elasticsearch.Config{ - Addresses: esConfig.Hosts, - Username: esConfig.Username, - Password: esConfig.Password, - }) - if err != nil { - return nil, fmt.Errorf("could not initialize elasticsearch client: %w", err) + + if esClient != nil { + logp.L().Info("replacing states loader") + replace(monitorstate.MakeESLoader(esClient, "synthetics-*,heartbeat-*", runFrom)) } - logp.L().Infof("successfully connected to elasticsearch for error / state tracking: %v", esConfig.Hosts) - return esc, nil + return nil } diff --git a/heartbeat/monitors/wrappers/monitorstate/esloader.go b/heartbeat/monitors/wrappers/monitorstate/esloader.go index 3f4165193ea..b7302dd8a78 100644 --- a/heartbeat/monitors/wrappers/monitorstate/esloader.go +++ b/heartbeat/monitors/wrappers/monitorstate/esloader.go @@ -18,21 +18,19 @@ package monitorstate import ( - "bytes" "encoding/json" "fmt" + "strings" "github.com/elastic/elastic-agent-libs/logp" "github.com/elastic/elastic-agent-libs/mapstr" - "github.com/elastic/go-elasticsearch/v8" - "github.com/elastic/go-elasticsearch/v8/esapi" "github.com/elastic/beats/v7/heartbeat/config" - "github.com/elastic/beats/v7/heartbeat/esutil" "github.com/elastic/beats/v7/heartbeat/monitors/stdfields" + "github.com/elastic/beats/v7/libbeat/esleg/eslegclient" ) -func MakeESLoader(esc *elasticsearch.Client, indexPattern string, beatLocation *config.LocationWithID) StateLoader { +func MakeESLoader(esc *eslegclient.Connection, indexPattern string, beatLocation *config.LocationWithID) StateLoader { if indexPattern == "" { // Should never happen, but if we ever make a coding error... logp.L().Warn("ES state loader initialized with no index pattern, will not load states from ES") @@ -61,25 +59,19 @@ func MakeESLoader(esc *elasticsearch.Client, indexPattern string, beatLocation * "match": mapstr.M{"observer.name": sf.RunFrom.ID}, }) } - - reqBody, err := json.Marshal(mapstr.M{ + reqBody := mapstr.M{ "sort": mapstr.M{"@timestamp": "desc"}, "query": mapstr.M{ "bool": mapstr.M{ "must": queryMustClauses, }, }, - }) - if err != nil { - return nil, fmt.Errorf("could not serialize query for state save: %w", err) } - r, err := esc.Search(func(sr *esapi.SearchRequest) { - sr.Index = []string{indexPattern} - size := 1 - sr.Size = &size - sr.Body = bytes.NewReader(reqBody) - }) + status, body, err := esc.Request("POST", strings.Join([]string{"/", indexPattern, "/", "_search", "?size=1"}, ""), "", nil, reqBody) + if err != nil || status > 299 { + return nil, fmt.Errorf("error executing state search for %s: %w", sf.ID, err) + } type stateHits struct { Hits struct { @@ -92,13 +84,8 @@ func MakeESLoader(esc *elasticsearch.Client, indexPattern string, beatLocation * } `json:"hits"` } - respBody, err := esutil.CheckRetResp(r, err) - if err != nil { - return nil, fmt.Errorf("error executing state search for %s: %w", sf.ID, err) - } - sh := stateHits{} - err = json.Unmarshal(respBody, &sh) + err = json.Unmarshal(body, &sh) if err != nil { return nil, fmt.Errorf("could not unmarshal state hits for %s: %w", sf.ID, err) } diff --git a/heartbeat/monitors/wrappers/monitorstate/esloader_test.go b/heartbeat/monitors/wrappers/monitorstate/esloader_test.go index d3b318b6c4c..c7bacf90194 100644 --- a/heartbeat/monitors/wrappers/monitorstate/esloader_test.go +++ b/heartbeat/monitors/wrappers/monitorstate/esloader_test.go @@ -105,7 +105,7 @@ func newESTestContext(t *testing.T) *esTestContext { }, } namespace, _ := uuid.NewV4() - esc := IntegES(t) + esc := IntegApiClient(t) etc := &esTestContext{ namespace: namespace.String(), esc: esc, diff --git a/heartbeat/monitors/wrappers/monitorstate/testutil.go b/heartbeat/monitors/wrappers/monitorstate/testutil.go index 2ec908c2cf9..54098309758 100644 --- a/heartbeat/monitors/wrappers/monitorstate/testutil.go +++ b/heartbeat/monitors/wrappers/monitorstate/testutil.go @@ -24,6 +24,9 @@ import ( "github.com/stretchr/testify/require" "github.com/elastic/beats/v7/heartbeat/config" + "github.com/elastic/beats/v7/libbeat/esleg/eslegclient" + "github.com/elastic/beats/v7/libbeat/esleg/eslegtest" + "github.com/elastic/beats/v7/heartbeat/esutil" "github.com/elastic/go-elasticsearch/v8" ) @@ -34,9 +37,31 @@ func IntegESLoader(t *testing.T, indexPattern string, location *config.LocationW return MakeESLoader(IntegES(t), indexPattern, location) } -func IntegES(t *testing.T) (esc *elasticsearch.Client) { +func IntegES(t *testing.T) (esc *eslegclient.Connection) { + conn, err := eslegclient.NewConnection(eslegclient.ConnectionSettings{ + URL: eslegtest.GetURL(), + Username: "admin", + Password: "testing", + }) + if err != nil { + t.Fatal(err) + panic(err) // panic in case TestLogger did not stop test + } + + conn.Encoder = eslegclient.NewJSONEncoder(nil, false) + + err = conn.Connect() + if err != nil { + t.Fatal(err) + panic(err) // panic in case TestLogger did not stop test + } + + return conn +} + +func IntegApiClient(t *testing.T) (esc *elasticsearch.Client) { esc, err := elasticsearch.NewClient(elasticsearch.Config{ - Addresses: []string{"http://127.0.0.1:9200"}, + Addresses: []string{eslegtest.GetURL()}, Username: "admin", Password: "testing", }) diff --git a/heartbeat/monitors/wrappers/monitorstate/tracker.go b/heartbeat/monitors/wrappers/monitorstate/tracker.go index 6bd8069a27a..2fe018e17b9 100644 --- a/heartbeat/monitors/wrappers/monitorstate/tracker.go +++ b/heartbeat/monitors/wrappers/monitorstate/tracker.go @@ -18,6 +18,8 @@ package monitorstate import ( + "context" + "errors" "math/rand" "sync" "time" @@ -81,11 +83,13 @@ func (t *Tracker) getCurrentState(sf stdfields.StdMonitorFields) (state *State) var err error for i := 0; i < tries; i++ { loadedState, err = t.stateLoader(sf) - if err != nil { - sleepFor := (time.Duration(i*i) * time.Second) + (time.Duration(rand.Intn(500)) * time.Millisecond) - logp.L().Warnf("could not load last externally recorded state, will retry again in %d milliseconds: %w", sleepFor.Milliseconds(), err) - time.Sleep(sleepFor) + if err == nil { + break } + + sleepFor := (time.Duration(i*i) * time.Second) + (time.Duration(rand.Intn(500)) * time.Millisecond) + logp.L().Warnf("could not load last externally recorded state, will retry again in %d milliseconds: %w", sleepFor.Milliseconds(), err) + time.Sleep(sleepFor) } if err != nil { logp.L().Warn("could not load prior state from elasticsearch after %d attempts, will create new state for monitor: %s", tries, sf.ID) @@ -104,3 +108,46 @@ func (t *Tracker) getCurrentState(sf stdfields.StdMonitorFields) (state *State) func NilStateLoader(_ stdfields.StdMonitorFields) (*State, error) { return nil, nil } + +func AtomicStateLoader(inner StateLoader) (sl StateLoader, replace func(StateLoader)) { + mtx := &sync.Mutex{} + return func(currentSL stdfields.StdMonitorFields) (*State, error) { + mtx.Lock() + defer mtx.Unlock() + + return inner(currentSL) + }, func(sl StateLoader) { + mtx.Lock() + defer mtx.Unlock() + inner = sl + logp.L().Info("Updated atomic state loader") + } +} + +func DeferredStateLoader(inner StateLoader, timeout time.Duration) (sl StateLoader, replace func(StateLoader)) { + stateLoader, replaceStateLoader := AtomicStateLoader(inner) + + wg := sync.WaitGroup{} + ctx, cancel := context.WithTimeout(context.Background(), timeout) + wg.Add(1) + go func() { + defer cancel() + defer wg.Done() + + <-ctx.Done() + + if errors.Is(ctx.Err(), context.DeadlineExceeded) { + logp.L().Warn("Timeout trying to defer state loader") + } + }() + + return func(currentSL stdfields.StdMonitorFields) (*State, error) { + wg.Wait() + + return stateLoader(currentSL) + }, func(sl StateLoader) { + defer cancel() + + replaceStateLoader(sl) + } +} diff --git a/heartbeat/monitors/wrappers/monitorstate/tracker_test.go b/heartbeat/monitors/wrappers/monitorstate/tracker_test.go index 14a9a34c12d..a1221ecc072 100644 --- a/heartbeat/monitors/wrappers/monitorstate/tracker_test.go +++ b/heartbeat/monitors/wrappers/monitorstate/tracker_test.go @@ -19,8 +19,11 @@ package monitorstate import ( "testing" + "time" "github.com/stretchr/testify/require" + + "github.com/elastic/beats/v7/heartbeat/monitors/stdfields" ) func TestTrackerRecord(t *testing.T) { @@ -65,3 +68,66 @@ func TestTrackerRecordFlappingDisabled(t *testing.T) { require.Equal(t, StatusDown, ms.Status) requireMSStatusCount(t, ms, StatusDown, 1) } + +func TestAtomicStateLoader(t *testing.T) { + stateA := &State{ID: "A"} + stateB := &State{ID: "B"} + loaderA := func(stdfields.StdMonitorFields) (*State, error) { + return stateA, nil + } + loaderB := func(stdfields.StdMonitorFields) (*State, error) { + return stateB, nil + } + + asl, replace := AtomicStateLoader(loaderA) + resState, _ := asl(stdfields.StdMonitorFields{}) + require.Equal(t, stateA, resState) + + replace(loaderB) + resState, _ = asl(stdfields.StdMonitorFields{}) + require.Equal(t, stateB, resState) + + replace(loaderA) + resState, _ = asl(stdfields.StdMonitorFields{}) + require.Equal(t, stateA, resState) + +} + +func TestDeferredStateLoaderTimeout(t *testing.T) { + stateA := &State{ID: "A"} + loaderA := func(stdfields.StdMonitorFields) (*State, error) { + return stateA, nil + } + + dsl, _ := DeferredStateLoader(loaderA, 100*time.Millisecond) + resState, _ := dsl(stdfields.StdMonitorFields{}) + require.Equal(t, stateA, resState) +} + +func TestDeferredStateLoader(t *testing.T) { + stateA := &State{ID: "A"} + stateB := &State{ID: "B"} + loaderA := func(stdfields.StdMonitorFields) (*State, error) { + return stateA, nil + } + loaderB := func(stdfields.StdMonitorFields) (*State, error) { + return stateB, nil + } + + // Test deferred initialization, launch query while stateA and expect + // updated stateB + dsl, replace := DeferredStateLoader(loaderA, 10*time.Second) + + go func() { + time.Sleep(1 * time.Second) + + replace(loaderB) + }() + + resState, _ := dsl(stdfields.StdMonitorFields{}) + require.Equal(t, stateB, resState) + + replace(loaderA) + resState, _ = dsl(stdfields.StdMonitorFields{}) + require.Equal(t, stateA, resState) +}