Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[8.5](backport #1477) Improve logging around agent checkins. #1492

Merged
merged 1 commit into from
Oct 12, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -208,3 +208,5 @@
- Add `lumberjack` input type to the Filebeat spec. {pull}[959]
- Add support for hints' based autodiscovery in kubernetes provider. {pull}[698]
- Improve logging during upgrades. {pull}[1287]
- Added status message to CheckinRequest {pull}[1369]
- Improve logging of Fleet checkins errors. {pull}[1477]
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
kind: enhancement
summary: Improve logging of Fleet check-in errors.
description: Improve logging of Fleet check-in errors and only report the local state as degraded after two consecutive failed check-ins.
pr: 1477
issue: 1154
39 changes: 24 additions & 15 deletions internal/pkg/agent/application/gateway/fleet/fleet_gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ func (f *fleetGateway) worker() {
// Execute the checkin call and for any errors returned by the fleet-server API
// the function will retry to communicate with fleet-server with an exponential delay and some
// jitter to help better distribute the load from a fleet of agents.
resp, err := f.doExecute()
resp, err := f.executeCheckinWithRetries()
if err != nil {
continue
}
Expand Down Expand Up @@ -274,21 +274,34 @@ func (f *fleetGateway) gatherQueuedActions(ts time.Time) (queued, expired []flee
return queued, expired
}

func (f *fleetGateway) doExecute() (*fleetapi.CheckinResponse, error) {
func (f *fleetGateway) executeCheckinWithRetries() (*fleetapi.CheckinResponse, error) {
f.backoff.Reset()

// Guard if the context is stopped by a out of bound call,
// this mean we are rebooting to change the log level or the system is shutting us down.
for f.bgContext.Err() == nil {
f.log.Debugf("Checkin started")
resp, err := f.execute(f.bgContext)
resp, took, err := f.executeCheckin(f.bgContext)
if err != nil {
f.checkinFailCounter++
f.log.Errorf("Could not communicate with fleet-server checkin API will retry, error: %s", err)

// Report the first two failures at warn level as they may be recoverable with retries.
if f.checkinFailCounter <= 2 {
f.log.Warnw("Possible transient error during checkin with fleet-server, retrying",
"error.message", err, "request_duration_ns", took, "failed_checkins", f.checkinFailCounter,
"retry_after_ns", f.backoff.NextWait())
} else {
// Only update the local status after repeated failures: https://github.com/elastic/elastic-agent/issues/1148
f.localReporter.Update(state.Degraded, fmt.Sprintf("checkin failed: %v", err), nil)
f.log.Errorw("Cannot checkin in with fleet-server, retrying",
"error.message", err, "request_duration_ns", took, "failed_checkins", f.checkinFailCounter,
"retry_after_ns", f.backoff.NextWait())
}

if !f.backoff.Wait() {
// Something bad has happened and we log it and we should update our current state.
err := errors.New(
"execute retry loop was stopped",
"checkin retry loop was stopped",
errors.TypeNetwork,
errors.M(errors.MetaKeyURI, f.client.URI()),
)
Expand All @@ -297,10 +310,6 @@ func (f *fleetGateway) doExecute() (*fleetapi.CheckinResponse, error) {
f.localReporter.Update(state.Failed, err.Error(), nil)
return nil, err
}
if f.checkinFailCounter > 1 {
f.localReporter.Update(state.Degraded, fmt.Sprintf("checkin failed: %v", err), nil)
f.log.Errorf("checkin number %d failed: %s", f.checkinFailCounter, err.Error())
}
continue
}

Expand All @@ -319,7 +328,7 @@ func (f *fleetGateway) doExecute() (*fleetapi.CheckinResponse, error) {
return nil, f.bgContext.Err()
}

func (f *fleetGateway) execute(ctx context.Context) (*fleetapi.CheckinResponse, error) {
func (f *fleetGateway) executeCheckin(ctx context.Context) (*fleetapi.CheckinResponse, time.Duration, error) {
ecsMeta, err := info.Metadata()
if err != nil {
f.log.Error(errors.New("failed to load metadata", err))
Expand All @@ -339,23 +348,23 @@ func (f *fleetGateway) execute(ctx context.Context) (*fleetapi.CheckinResponse,
Status: f.statusController.StatusString(),
}

resp, err := cmd.Execute(ctx, req)
resp, took, err := cmd.Execute(ctx, req)
if isUnauth(err) {
f.unauthCounter++

if f.shouldUnenroll() {
f.log.Warnf("received an invalid api key error '%d' times. Starting to unenroll the elastic agent.", f.unauthCounter)
return &fleetapi.CheckinResponse{
Actions: []fleetapi.Action{&fleetapi.ActionUnenroll{ActionID: "", ActionType: "UNENROLL", IsDetected: true}},
}, nil
}, took, nil
}

return nil, err
return nil, took, err
}

f.unauthCounter = 0
if err != nil {
return nil, err
return nil, took, err
}

// Save the latest ackToken
Expand All @@ -367,7 +376,7 @@ func (f *fleetGateway) execute(ctx context.Context) (*fleetapi.CheckinResponse,
}
}

return resp, nil
return resp, took, nil
}

// shouldUnenroll checks if the max number of trying an invalid key is reached
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"

"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent/internal/pkg/agent/application/gateway"
"github.com/elastic/elastic-agent/internal/pkg/agent/application/paths"
"github.com/elastic/elastic-agent/internal/pkg/agent/storage"
Expand Down Expand Up @@ -692,7 +693,7 @@ func TestRetriesOnFailures(t *testing.T) {
scheduler := scheduler.NewStepper()
client := newTestingClient()
dispatcher := newTestingDispatcher()
log, _ := logger.New("fleet_gateway", false)
log := newInfoLogger(t, "fleet_gateway")
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

Expand All @@ -705,8 +706,8 @@ func TestRetriesOnFailures(t *testing.T) {
queue.On("Actions").Return([]fleetapi.Action{})

localReporter := &testutils.MockReporter{}
localReporter.On("Update", state.Degraded, mock.Anything, mock.Anything).Times(2)
localReporter.On("Update", mock.Anything, mock.Anything, mock.Anything).Maybe()
// The local state should only be reported as degraded after two consecutive failures.
localReporter.On("Update", state.Degraded, mock.Anything, mock.Anything).Once()
localReporter.On("Unregister").Maybe()

fleetReporter := &testutils.MockReporter{}
Expand Down Expand Up @@ -812,3 +813,16 @@ type testAgentInfo struct{}
func (testAgentInfo) AgentID() string { return "agent-secret" }

type request struct{}

func newInfoLogger(t *testing.T, name string) *logger.Logger {
t.Helper()

loggerCfg := logger.DefaultLoggingConfig()
loggerCfg.Level = logp.InfoLevel
loggerCfg.ToFiles = false
loggerCfg.ToStderr = true

log, err := logger.NewFromConfig("", loggerCfg, false)
require.NoError(t, err)
return log
}
5 changes: 5 additions & 0 deletions internal/pkg/core/backoff/backoff.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,16 @@

package backoff

import "time"

// Backoff defines the interface for backoff strategies.
type Backoff interface {
// Wait blocks for a duration of time governed by the backoff strategy.
Wait() bool

// NextWait returns the duration of the next call to Wait().
NextWait() time.Duration

// Reset resets the backoff duration to an initial value governed by the backoff strategy.
Reset()
}
Expand Down
50 changes: 39 additions & 11 deletions internal/pkg/core/backoff/backoff_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,9 @@ import (

type factory func(<-chan struct{}) Backoff

func TestBackoff(t *testing.T) {
t.Run("test close channel", testCloseChannel)
t.Run("test unblock after some time", testUnblockAfterInit)
}

func testCloseChannel(t *testing.T) {
init := 2 * time.Second
max := 5 * time.Minute
func TestCloseChannel(t *testing.T) {
init := 2 * time.Millisecond
max := 5 * time.Second

tests := map[string]factory{
"ExpBackoff": func(done <-chan struct{}) Backoff {
Expand All @@ -42,9 +37,9 @@ func testCloseChannel(t *testing.T) {
}
}

func testUnblockAfterInit(t *testing.T) {
init := 1 * time.Second
max := 5 * time.Minute
func TestUnblockAfterInit(t *testing.T) {
init := 1 * time.Millisecond
max := 5 * time.Second

tests := map[string]factory{
"ExpBackoff": func(done <-chan struct{}) Backoff {
Expand All @@ -68,3 +63,36 @@ func testUnblockAfterInit(t *testing.T) {
})
}
}

func TestNextWait(t *testing.T) {
init := time.Millisecond
max := 5 * time.Second

tests := map[string]factory{
"ExpBackoff": func(done <-chan struct{}) Backoff {
return NewExpBackoff(done, init, max)
},
"EqualJitterBackoff": func(done <-chan struct{}) Backoff {
return NewEqualJitterBackoff(done, init, max)
},
}

for name, f := range tests {
t.Run(name, func(t *testing.T) {
c := make(chan struct{})
b := f(c)

startWait := b.NextWait()
assert.Equal(t, startWait, b.NextWait(), "next wait not stable")

startedAt := time.Now()
b.Wait()
waitDuration := time.Now().Sub(startedAt)
nextWait := b.NextWait()

t.Logf("actualWait: %s startWait: %s nextWait: %s", waitDuration, startWait, nextWait)
assert.Less(t, startWait, nextWait, "wait value did not increase")
assert.GreaterOrEqual(t, waitDuration, startWait, "next wait duration <= actual wait duration")
})
}
}
17 changes: 12 additions & 5 deletions internal/pkg/core/backoff/equal_jitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@ type EqualJitterBackoff struct {
duration time.Duration
done <-chan struct{}

init time.Duration
max time.Duration
init time.Duration
max time.Duration
nextRand time.Duration

last time.Time
}
Expand All @@ -29,6 +30,7 @@ func NewEqualJitterBackoff(done <-chan struct{}, init, max time.Duration) Backof
done: done,
init: init,
max: max,
nextRand: time.Duration(rand.Int63n(int64(init))), //nolint:gosec
}
}

Expand All @@ -38,13 +40,18 @@ func (b *EqualJitterBackoff) Reset() {
b.duration = b.init * 2
}

func (b *EqualJitterBackoff) NextWait() time.Duration {
// Make sure we have always some minimal back off and jitter.
temp := b.duration / 2
return temp + b.nextRand
}

// Wait block until either the timer is completed or channel is done.
func (b *EqualJitterBackoff) Wait() bool {
// Make sure we have always some minimal back off and jitter.
temp := int64(b.duration / 2)
backoff := time.Duration(temp + rand.Int63n(temp))
backoff := b.NextWait()

// increase duration for next wait.
b.nextRand = time.Duration(rand.Int63n(int64(b.duration)))
b.duration *= 2
if b.duration > b.max {
b.duration = b.max
Expand Down
17 changes: 11 additions & 6 deletions internal/pkg/core/backoff/exponential.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,18 +36,23 @@ func (b *ExpBackoff) Reset() {
b.duration = b.init
}

func (b *ExpBackoff) NextWait() time.Duration {
nextWait := b.duration
nextWait *= 2
if nextWait > b.max {
nextWait = b.max
}
return nextWait
}

// Wait block until either the timer is completed or channel is done.
func (b *ExpBackoff) Wait() bool {
backoff := b.duration
b.duration *= 2
if b.duration > b.max {
b.duration = b.max
}
b.duration = b.NextWait()

select {
case <-b.done:
return false
case <-time.After(backoff):
case <-time.After(b.duration):
b.last = time.Now()
return true
}
Expand Down
23 changes: 13 additions & 10 deletions internal/pkg/fleetapi/checkin_cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,50 +77,53 @@ func NewCheckinCmd(info agentInfo, client client.Sender) *CheckinCmd {
}
}

// Execute enroll the Agent in the Fleet Server.
func (e *CheckinCmd) Execute(ctx context.Context, r *CheckinRequest) (*CheckinResponse, error) {
// Execute enroll the Agent in the Fleet Server. Returns the decoded check in response, a duration indicating
// how long the request took, and an error.
func (e *CheckinCmd) Execute(ctx context.Context, r *CheckinRequest) (*CheckinResponse, time.Duration, error) {
if err := r.Validate(); err != nil {
return nil, err
return nil, 0, err
}

b, err := json.Marshal(r)
if err != nil {
return nil, errors.New(err,
return nil, 0, errors.New(err,
"fail to encode the checkin request",
errors.TypeUnexpected)
}

cp := fmt.Sprintf(checkingPath, e.info.AgentID())
sendStart := time.Now()
resp, err := e.client.Send(ctx, "POST", cp, nil, nil, bytes.NewBuffer(b))
sendDuration := time.Now().Sub(sendStart)
if err != nil {
return nil, errors.New(err,
return nil, sendDuration, errors.New(err,
"fail to checkin to fleet-server",
errors.TypeNetwork,
errors.M(errors.MetaKeyURI, cp))
}
defer resp.Body.Close()

if resp.StatusCode != http.StatusOK {
return nil, client.ExtractError(resp.Body)
return nil, sendDuration, client.ExtractError(resp.Body)
}

rs, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, errors.New(err, "failed to read checkin response")
return nil, sendDuration, errors.New(err, "failed to read checkin response")
}

checkinResponse := &CheckinResponse{}
decoder := json.NewDecoder(bytes.NewReader(rs))
if err := decoder.Decode(checkinResponse); err != nil {
return nil, errors.New(err,
return nil, sendDuration, errors.New(err,
"fail to decode checkin response",
errors.TypeNetwork,
errors.M(errors.MetaKeyURI, cp))
}

if err := checkinResponse.Validate(); err != nil {
return nil, err
return nil, sendDuration, err
}

return checkinResponse, nil
return checkinResponse, sendDuration, nil
}
Loading