diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 7d500cf8f53..0e1a8adb79c 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -208,3 +208,5 @@ - Add `lumberjack` input type to the Filebeat spec. {pull}[959] - Add support for hints' based autodiscovery in kubernetes provider. {pull}[698] - Improve logging during upgrades. {pull}[1287] +- Added status message to CheckinRequest {pull}[1369] +- Improve logging of Fleet checkins errors. {pull}[1477] diff --git a/changelog/fragments/1665517984-improve-checkin-error-logging.yaml b/changelog/fragments/1665517984-improve-checkin-error-logging.yaml new file mode 100644 index 00000000000..7bf2777d9d5 --- /dev/null +++ b/changelog/fragments/1665517984-improve-checkin-error-logging.yaml @@ -0,0 +1,5 @@ +kind: enhancement +summary: Improve logging of Fleet check-in errors. +description: Improve logging of Fleet check-in errors and only report the local state as degraded after two consecutive failed check-ins. +pr: 1477 +issue: 1154 diff --git a/internal/pkg/agent/application/gateway/fleet/fleet_gateway.go b/internal/pkg/agent/application/gateway/fleet/fleet_gateway.go index b88a0cafee0..f90d94b396d 100644 --- a/internal/pkg/agent/application/gateway/fleet/fleet_gateway.go +++ b/internal/pkg/agent/application/gateway/fleet/fleet_gateway.go @@ -173,7 +173,7 @@ func (f *fleetGateway) worker() { // Execute the checkin call and for any errors returned by the fleet-server API // the function will retry to communicate with fleet-server with an exponential delay and some // jitter to help better distribute the load from a fleet of agents. - resp, err := f.doExecute() + resp, err := f.executeCheckinWithRetries() if err != nil { continue } @@ -274,21 +274,34 @@ func (f *fleetGateway) gatherQueuedActions(ts time.Time) (queued, expired []flee return queued, expired } -func (f *fleetGateway) doExecute() (*fleetapi.CheckinResponse, error) { +func (f *fleetGateway) executeCheckinWithRetries() (*fleetapi.CheckinResponse, error) { f.backoff.Reset() // Guard if the context is stopped by a out of bound call, // this mean we are rebooting to change the log level or the system is shutting us down. for f.bgContext.Err() == nil { f.log.Debugf("Checkin started") - resp, err := f.execute(f.bgContext) + resp, took, err := f.executeCheckin(f.bgContext) if err != nil { f.checkinFailCounter++ - f.log.Errorf("Could not communicate with fleet-server checkin API will retry, error: %s", err) + + // Report the first two failures at warn level as they may be recoverable with retries. + if f.checkinFailCounter <= 2 { + f.log.Warnw("Possible transient error during checkin with fleet-server, retrying", + "error.message", err, "request_duration_ns", took, "failed_checkins", f.checkinFailCounter, + "retry_after_ns", f.backoff.NextWait()) + } else { + // Only update the local status after repeated failures: https://github.com/elastic/elastic-agent/issues/1148 + f.localReporter.Update(state.Degraded, fmt.Sprintf("checkin failed: %v", err), nil) + f.log.Errorw("Cannot checkin in with fleet-server, retrying", + "error.message", err, "request_duration_ns", took, "failed_checkins", f.checkinFailCounter, + "retry_after_ns", f.backoff.NextWait()) + } + if !f.backoff.Wait() { // Something bad has happened and we log it and we should update our current state. err := errors.New( - "execute retry loop was stopped", + "checkin retry loop was stopped", errors.TypeNetwork, errors.M(errors.MetaKeyURI, f.client.URI()), ) @@ -297,10 +310,6 @@ func (f *fleetGateway) doExecute() (*fleetapi.CheckinResponse, error) { f.localReporter.Update(state.Failed, err.Error(), nil) return nil, err } - if f.checkinFailCounter > 1 { - f.localReporter.Update(state.Degraded, fmt.Sprintf("checkin failed: %v", err), nil) - f.log.Errorf("checkin number %d failed: %s", f.checkinFailCounter, err.Error()) - } continue } @@ -319,7 +328,7 @@ func (f *fleetGateway) doExecute() (*fleetapi.CheckinResponse, error) { return nil, f.bgContext.Err() } -func (f *fleetGateway) execute(ctx context.Context) (*fleetapi.CheckinResponse, error) { +func (f *fleetGateway) executeCheckin(ctx context.Context) (*fleetapi.CheckinResponse, time.Duration, error) { ecsMeta, err := info.Metadata() if err != nil { f.log.Error(errors.New("failed to load metadata", err)) @@ -339,7 +348,7 @@ func (f *fleetGateway) execute(ctx context.Context) (*fleetapi.CheckinResponse, Status: f.statusController.StatusString(), } - resp, err := cmd.Execute(ctx, req) + resp, took, err := cmd.Execute(ctx, req) if isUnauth(err) { f.unauthCounter++ @@ -347,15 +356,15 @@ func (f *fleetGateway) execute(ctx context.Context) (*fleetapi.CheckinResponse, f.log.Warnf("received an invalid api key error '%d' times. Starting to unenroll the elastic agent.", f.unauthCounter) return &fleetapi.CheckinResponse{ Actions: []fleetapi.Action{&fleetapi.ActionUnenroll{ActionID: "", ActionType: "UNENROLL", IsDetected: true}}, - }, nil + }, took, nil } - return nil, err + return nil, took, err } f.unauthCounter = 0 if err != nil { - return nil, err + return nil, took, err } // Save the latest ackToken @@ -367,7 +376,7 @@ func (f *fleetGateway) execute(ctx context.Context) (*fleetapi.CheckinResponse, } } - return resp, nil + return resp, took, nil } // shouldUnenroll checks if the max number of trying an invalid key is reached diff --git a/internal/pkg/agent/application/gateway/fleet/fleet_gateway_test.go b/internal/pkg/agent/application/gateway/fleet/fleet_gateway_test.go index 2d691185c1c..722bd792f95 100644 --- a/internal/pkg/agent/application/gateway/fleet/fleet_gateway_test.go +++ b/internal/pkg/agent/application/gateway/fleet/fleet_gateway_test.go @@ -21,6 +21,7 @@ import ( "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" + "github.com/elastic/elastic-agent-libs/logp" "github.com/elastic/elastic-agent/internal/pkg/agent/application/gateway" "github.com/elastic/elastic-agent/internal/pkg/agent/application/paths" "github.com/elastic/elastic-agent/internal/pkg/agent/storage" @@ -692,7 +693,7 @@ func TestRetriesOnFailures(t *testing.T) { scheduler := scheduler.NewStepper() client := newTestingClient() dispatcher := newTestingDispatcher() - log, _ := logger.New("fleet_gateway", false) + log := newInfoLogger(t, "fleet_gateway") ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -705,8 +706,8 @@ func TestRetriesOnFailures(t *testing.T) { queue.On("Actions").Return([]fleetapi.Action{}) localReporter := &testutils.MockReporter{} - localReporter.On("Update", state.Degraded, mock.Anything, mock.Anything).Times(2) - localReporter.On("Update", mock.Anything, mock.Anything, mock.Anything).Maybe() + // The local state should only be reported as degraded after two consecutive failures. + localReporter.On("Update", state.Degraded, mock.Anything, mock.Anything).Once() localReporter.On("Unregister").Maybe() fleetReporter := &testutils.MockReporter{} @@ -812,3 +813,16 @@ type testAgentInfo struct{} func (testAgentInfo) AgentID() string { return "agent-secret" } type request struct{} + +func newInfoLogger(t *testing.T, name string) *logger.Logger { + t.Helper() + + loggerCfg := logger.DefaultLoggingConfig() + loggerCfg.Level = logp.InfoLevel + loggerCfg.ToFiles = false + loggerCfg.ToStderr = true + + log, err := logger.NewFromConfig("", loggerCfg, false) + require.NoError(t, err) + return log +} diff --git a/internal/pkg/core/backoff/backoff.go b/internal/pkg/core/backoff/backoff.go index 06723e7db9a..c97eaae199d 100644 --- a/internal/pkg/core/backoff/backoff.go +++ b/internal/pkg/core/backoff/backoff.go @@ -4,11 +4,16 @@ package backoff +import "time" + // Backoff defines the interface for backoff strategies. type Backoff interface { // Wait blocks for a duration of time governed by the backoff strategy. Wait() bool + // NextWait returns the duration of the next call to Wait(). + NextWait() time.Duration + // Reset resets the backoff duration to an initial value governed by the backoff strategy. Reset() } diff --git a/internal/pkg/core/backoff/backoff_test.go b/internal/pkg/core/backoff/backoff_test.go index 88498ff5a58..12332eb15f2 100644 --- a/internal/pkg/core/backoff/backoff_test.go +++ b/internal/pkg/core/backoff/backoff_test.go @@ -14,14 +14,9 @@ import ( type factory func(<-chan struct{}) Backoff -func TestBackoff(t *testing.T) { - t.Run("test close channel", testCloseChannel) - t.Run("test unblock after some time", testUnblockAfterInit) -} - -func testCloseChannel(t *testing.T) { - init := 2 * time.Second - max := 5 * time.Minute +func TestCloseChannel(t *testing.T) { + init := 2 * time.Millisecond + max := 5 * time.Second tests := map[string]factory{ "ExpBackoff": func(done <-chan struct{}) Backoff { @@ -42,9 +37,9 @@ func testCloseChannel(t *testing.T) { } } -func testUnblockAfterInit(t *testing.T) { - init := 1 * time.Second - max := 5 * time.Minute +func TestUnblockAfterInit(t *testing.T) { + init := 1 * time.Millisecond + max := 5 * time.Second tests := map[string]factory{ "ExpBackoff": func(done <-chan struct{}) Backoff { @@ -68,3 +63,36 @@ func testUnblockAfterInit(t *testing.T) { }) } } + +func TestNextWait(t *testing.T) { + init := time.Millisecond + max := 5 * time.Second + + tests := map[string]factory{ + "ExpBackoff": func(done <-chan struct{}) Backoff { + return NewExpBackoff(done, init, max) + }, + "EqualJitterBackoff": func(done <-chan struct{}) Backoff { + return NewEqualJitterBackoff(done, init, max) + }, + } + + for name, f := range tests { + t.Run(name, func(t *testing.T) { + c := make(chan struct{}) + b := f(c) + + startWait := b.NextWait() + assert.Equal(t, startWait, b.NextWait(), "next wait not stable") + + startedAt := time.Now() + b.Wait() + waitDuration := time.Now().Sub(startedAt) + nextWait := b.NextWait() + + t.Logf("actualWait: %s startWait: %s nextWait: %s", waitDuration, startWait, nextWait) + assert.Less(t, startWait, nextWait, "wait value did not increase") + assert.GreaterOrEqual(t, waitDuration, startWait, "next wait duration <= actual wait duration") + }) + } +} diff --git a/internal/pkg/core/backoff/equal_jitter.go b/internal/pkg/core/backoff/equal_jitter.go index d87077397cd..671201f5892 100644 --- a/internal/pkg/core/backoff/equal_jitter.go +++ b/internal/pkg/core/backoff/equal_jitter.go @@ -16,8 +16,9 @@ type EqualJitterBackoff struct { duration time.Duration done <-chan struct{} - init time.Duration - max time.Duration + init time.Duration + max time.Duration + nextRand time.Duration last time.Time } @@ -29,6 +30,7 @@ func NewEqualJitterBackoff(done <-chan struct{}, init, max time.Duration) Backof done: done, init: init, max: max, + nextRand: time.Duration(rand.Int63n(int64(init))), //nolint:gosec } } @@ -38,13 +40,18 @@ func (b *EqualJitterBackoff) Reset() { b.duration = b.init * 2 } +func (b *EqualJitterBackoff) NextWait() time.Duration { + // Make sure we have always some minimal back off and jitter. + temp := b.duration / 2 + return temp + b.nextRand +} + // Wait block until either the timer is completed or channel is done. func (b *EqualJitterBackoff) Wait() bool { - // Make sure we have always some minimal back off and jitter. - temp := int64(b.duration / 2) - backoff := time.Duration(temp + rand.Int63n(temp)) + backoff := b.NextWait() // increase duration for next wait. + b.nextRand = time.Duration(rand.Int63n(int64(b.duration))) b.duration *= 2 if b.duration > b.max { b.duration = b.max diff --git a/internal/pkg/core/backoff/exponential.go b/internal/pkg/core/backoff/exponential.go index 81224b95eb5..51b5b4e0cb5 100644 --- a/internal/pkg/core/backoff/exponential.go +++ b/internal/pkg/core/backoff/exponential.go @@ -36,18 +36,23 @@ func (b *ExpBackoff) Reset() { b.duration = b.init } +func (b *ExpBackoff) NextWait() time.Duration { + nextWait := b.duration + nextWait *= 2 + if nextWait > b.max { + nextWait = b.max + } + return nextWait +} + // Wait block until either the timer is completed or channel is done. func (b *ExpBackoff) Wait() bool { - backoff := b.duration - b.duration *= 2 - if b.duration > b.max { - b.duration = b.max - } + b.duration = b.NextWait() select { case <-b.done: return false - case <-time.After(backoff): + case <-time.After(b.duration): b.last = time.Now() return true } diff --git a/internal/pkg/fleetapi/checkin_cmd.go b/internal/pkg/fleetapi/checkin_cmd.go index e225aababb9..89faf0aba2a 100644 --- a/internal/pkg/fleetapi/checkin_cmd.go +++ b/internal/pkg/fleetapi/checkin_cmd.go @@ -77,23 +77,26 @@ func NewCheckinCmd(info agentInfo, client client.Sender) *CheckinCmd { } } -// Execute enroll the Agent in the Fleet Server. -func (e *CheckinCmd) Execute(ctx context.Context, r *CheckinRequest) (*CheckinResponse, error) { +// Execute enroll the Agent in the Fleet Server. Returns the decoded check in response, a duration indicating +// how long the request took, and an error. +func (e *CheckinCmd) Execute(ctx context.Context, r *CheckinRequest) (*CheckinResponse, time.Duration, error) { if err := r.Validate(); err != nil { - return nil, err + return nil, 0, err } b, err := json.Marshal(r) if err != nil { - return nil, errors.New(err, + return nil, 0, errors.New(err, "fail to encode the checkin request", errors.TypeUnexpected) } cp := fmt.Sprintf(checkingPath, e.info.AgentID()) + sendStart := time.Now() resp, err := e.client.Send(ctx, "POST", cp, nil, nil, bytes.NewBuffer(b)) + sendDuration := time.Now().Sub(sendStart) if err != nil { - return nil, errors.New(err, + return nil, sendDuration, errors.New(err, "fail to checkin to fleet-server", errors.TypeNetwork, errors.M(errors.MetaKeyURI, cp)) @@ -101,26 +104,26 @@ func (e *CheckinCmd) Execute(ctx context.Context, r *CheckinRequest) (*CheckinRe defer resp.Body.Close() if resp.StatusCode != http.StatusOK { - return nil, client.ExtractError(resp.Body) + return nil, sendDuration, client.ExtractError(resp.Body) } rs, err := ioutil.ReadAll(resp.Body) if err != nil { - return nil, errors.New(err, "failed to read checkin response") + return nil, sendDuration, errors.New(err, "failed to read checkin response") } checkinResponse := &CheckinResponse{} decoder := json.NewDecoder(bytes.NewReader(rs)) if err := decoder.Decode(checkinResponse); err != nil { - return nil, errors.New(err, + return nil, sendDuration, errors.New(err, "fail to decode checkin response", errors.TypeNetwork, errors.M(errors.MetaKeyURI, cp)) } if err := checkinResponse.Validate(); err != nil { - return nil, err + return nil, sendDuration, err } - return checkinResponse, nil + return checkinResponse, sendDuration, nil } diff --git a/internal/pkg/fleetapi/checkin_cmd_test.go b/internal/pkg/fleetapi/checkin_cmd_test.go index 2d9aef2741a..56726bb5559 100644 --- a/internal/pkg/fleetapi/checkin_cmd_test.go +++ b/internal/pkg/fleetapi/checkin_cmd_test.go @@ -11,6 +11,7 @@ import ( "io/ioutil" "net/http" "testing" + "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -25,6 +26,7 @@ func (*agentinfo) AgentID() string { return "id" } func TestCheckin(t *testing.T) { const withAPIKey = "secret" + const requestDelay = time.Millisecond ctx := context.Background() agentInfo := &agentinfo{} @@ -39,6 +41,8 @@ func TestCheckin(t *testing.T) { mux.HandleFunc(path, authHandler(func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusInternalServerError) fmt.Fprint(w, raw) + // Introduce a small delay to test the request time measurment. + time.Sleep(requestDelay) }, withAPIKey)) return mux }, withAPIKey, @@ -47,8 +51,10 @@ func TestCheckin(t *testing.T) { request := CheckinRequest{} - _, err := cmd.Execute(ctx, &request) + _, took, err := cmd.Execute(ctx, &request) require.Error(t, err) + // Ensure the request took at least as long as the artificial delay. + require.GreaterOrEqual(t, took, requestDelay) }, )) @@ -96,7 +102,7 @@ func TestCheckin(t *testing.T) { request := CheckinRequest{} - r, err := cmd.Execute(ctx, &request) + r, _, err := cmd.Execute(ctx, &request) require.NoError(t, err) require.Equal(t, 1, len(r.Actions)) @@ -157,7 +163,7 @@ func TestCheckin(t *testing.T) { request := CheckinRequest{} - r, err := cmd.Execute(ctx, &request) + r, _, err := cmd.Execute(ctx, &request) require.NoError(t, err) require.Equal(t, 2, len(r.Actions)) @@ -173,7 +179,7 @@ func TestCheckin(t *testing.T) { }, )) - t.Run("When we receive no action", withServerWithAuthClient( + t.Run("When we receive no action with delay", withServerWithAuthClient( func(t *testing.T) *http.ServeMux { raw := `{ "actions": [] }` mux := http.NewServeMux() @@ -189,7 +195,7 @@ func TestCheckin(t *testing.T) { request := CheckinRequest{} - r, err := cmd.Execute(ctx, &request) + r, _, err := cmd.Execute(ctx, &request) require.NoError(t, err) require.Equal(t, 0, len(r.Actions)) @@ -223,7 +229,7 @@ func TestCheckin(t *testing.T) { request := CheckinRequest{Metadata: testMetadata()} - r, err := cmd.Execute(ctx, &request) + r, _, err := cmd.Execute(ctx, &request) require.NoError(t, err) require.Equal(t, 0, len(r.Actions)) @@ -257,7 +263,7 @@ func TestCheckin(t *testing.T) { request := CheckinRequest{} - r, err := cmd.Execute(ctx, &request) + r, _, err := cmd.Execute(ctx, &request) require.NoError(t, err) require.Equal(t, 0, len(r.Actions))