Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve logging around agent checkins. #1477

Merged
merged 9 commits into from
Oct 12, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 25 additions & 16 deletions internal/pkg/agent/application/gateway/fleet/fleet_gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ func (f *fleetGateway) worker() {
// Execute the checkin call and for any errors returned by the fleet-server API
// the function will retry to communicate with fleet-server with an exponential delay and some
// jitter to help better distribute the load from a fleet of agents.
resp, err := f.doExecute()
resp, err := f.executeCheckinWithRetries()
if err != nil {
continue
}
Expand Down Expand Up @@ -274,21 +274,34 @@ func (f *fleetGateway) gatherQueuedActions(ts time.Time) (queued, expired []flee
return queued, expired
}

func (f *fleetGateway) doExecute() (*fleetapi.CheckinResponse, error) {
func (f *fleetGateway) executeCheckinWithRetries() (*fleetapi.CheckinResponse, error) {
f.backoff.Reset()

// Guard if the context is stopped by a out of bound call,
// this mean we are rebooting to change the log level or the system is shutting us down.
for f.bgContext.Err() == nil {
f.log.Debugf("Checkin started")
resp, err := f.execute(f.bgContext)
f.log.Debugf("Checking started")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should be Checkin

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤦

resp, took, err := f.executeCheckin(f.bgContext)
if err != nil {
// Only update the local status on failure: https://github.com/elastic/elastic-agent/issues/1148
f.localReporter.Update(state.Degraded, fmt.Sprintf("checkin failed: %v", err), nil)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am wondering if we should only update the local reporter to the degraded state after multiple repeated failures, instead of just the first one?

In the fleet UI the agent is marked as degraded after multiple missed checkins, not just one.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good, we can move it down to where it checks the fail counter

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

f.checkinFailCounter++
f.log.Errorf("Could not communicate with fleet-server checkin API will retry, error: %s", err)

// Report the first two failures at warn level as they may be recoverable with retries.
if f.checkinFailCounter <= 2 {
f.log.Warnw("Possible transient error during checkin with fleet-server, retrying",
"error.message", err, "request_duration", took, "failed_checkins", f.checkinFailCounter,
"retry_after", f.backoff.NextWait())
} else {
f.log.Errorw("Cannot checkin in with fleet-server, retrying",
"error.message", err, "request_duration", took, "failed_checkins", f.checkinFailCounter,
"retry_after", f.backoff.NextWait())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just asking, but did you check how the retry_after duration will be printed, if it'll be human readable or the milliseconds?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will be human readable according to https://pkg.go.dev/time#Duration.String since NextWait() returns a time.Duration.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually I apparently need to manually call the .String() message if I want it to be human readable, otherwise the units are nanoseconds.

Copy link
Member Author

@cmacknz cmacknz Oct 12, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just added the _ns suffix to specify the units. The ns durations are easier to graph and work with in Kibana so I stuck with those. Nanoseconds are the units for event.duration if we ever wanted to make our logs ECS compliant.

}

if !f.backoff.Wait() {
// Something bad has happened and we log it and we should update our current state.
err := errors.New(
"execute retry loop was stopped",
"checkin retry loop was stopped",
errors.TypeNetwork,
errors.M(errors.MetaKeyURI, f.client.URI()),
)
Expand All @@ -297,10 +310,6 @@ func (f *fleetGateway) doExecute() (*fleetapi.CheckinResponse, error) {
f.localReporter.Update(state.Failed, err.Error(), nil)
return nil, err
}
if f.checkinFailCounter > 1 {
f.localReporter.Update(state.Degraded, fmt.Sprintf("checkin failed: %v", err), nil)
f.log.Errorf("checkin number %d failed: %s", f.checkinFailCounter, err.Error())
}
continue
}

Expand All @@ -319,7 +328,7 @@ func (f *fleetGateway) doExecute() (*fleetapi.CheckinResponse, error) {
return nil, f.bgContext.Err()
}

func (f *fleetGateway) execute(ctx context.Context) (*fleetapi.CheckinResponse, error) {
func (f *fleetGateway) executeCheckin(ctx context.Context) (*fleetapi.CheckinResponse, time.Duration, error) {
ecsMeta, err := info.Metadata()
if err != nil {
f.log.Error(errors.New("failed to load metadata", err))
Expand All @@ -340,23 +349,23 @@ func (f *fleetGateway) execute(ctx context.Context) (*fleetapi.CheckinResponse,
Message: f.statusController.Status().Message,
}

resp, err := cmd.Execute(ctx, req)
resp, took, err := cmd.Execute(ctx, req)
if isUnauth(err) {
f.unauthCounter++

if f.shouldUnenroll() {
f.log.Warnf("received an invalid api key error '%d' times. Starting to unenroll the elastic agent.", f.unauthCounter)
return &fleetapi.CheckinResponse{
Actions: []fleetapi.Action{&fleetapi.ActionUnenroll{ActionID: "", ActionType: "UNENROLL", IsDetected: true}},
}, nil
}, took, nil
}

return nil, err
return nil, took, err
}

f.unauthCounter = 0
if err != nil {
return nil, err
return nil, took, err
}

// Save the latest ackToken
Expand All @@ -368,7 +377,7 @@ func (f *fleetGateway) execute(ctx context.Context) (*fleetapi.CheckinResponse,
}
}

return resp, nil
return resp, took, nil
}

// shouldUnenroll checks if the max number of trying an invalid key is reached
Expand Down
5 changes: 5 additions & 0 deletions internal/pkg/core/backoff/backoff.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,16 @@

package backoff

import "time"

// Backoff defines the interface for backoff strategies.
type Backoff interface {
// Wait blocks for a duration of time governed by the backoff strategy.
Wait() bool

// NextWait returns the duration of the next call to Wait().
NextWait() time.Duration

// Reset resets the backoff duration to an initial value governed by the backoff strategy.
Reset()
}
Expand Down
50 changes: 39 additions & 11 deletions internal/pkg/core/backoff/backoff_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,9 @@ import (

type factory func(<-chan struct{}) Backoff

func TestBackoff(t *testing.T) {
t.Run("test close channel", testCloseChannel)
t.Run("test unblock after some time", testUnblockAfterInit)
}

func testCloseChannel(t *testing.T) {
init := 2 * time.Second
max := 5 * time.Minute
func TestCloseChannel(t *testing.T) {
init := 2 * time.Millisecond
max := 5 * time.Second

tests := map[string]factory{
"ExpBackoff": func(done <-chan struct{}) Backoff {
Expand All @@ -42,9 +37,9 @@ func testCloseChannel(t *testing.T) {
}
}

func testUnblockAfterInit(t *testing.T) {
init := 1 * time.Second
max := 5 * time.Minute
func TestUnblockAfterInit(t *testing.T) {
init := 1 * time.Millisecond
max := 5 * time.Second

tests := map[string]factory{
"ExpBackoff": func(done <-chan struct{}) Backoff {
Expand All @@ -68,3 +63,36 @@ func testUnblockAfterInit(t *testing.T) {
})
}
}

func TestNextWait(t *testing.T) {
init := time.Millisecond
max := 5 * time.Second

tests := map[string]factory{
"ExpBackoff": func(done <-chan struct{}) Backoff {
return NewExpBackoff(done, init, max)
},
"EqualJitterBackoff": func(done <-chan struct{}) Backoff {
return NewEqualJitterBackoff(done, init, max)
},
}

for name, f := range tests {
t.Run(name, func(t *testing.T) {
c := make(chan struct{})
b := f(c)

startWait := b.NextWait()
assert.Equal(t, startWait, b.NextWait(), "next wait not stable")

startedAt := time.Now()
b.Wait()
waitDuration := time.Now().Sub(startedAt)
nextWait := b.NextWait()

t.Logf("actualWait: %s startWait: %s nextWait: %s", waitDuration, startWait, nextWait)
assert.Less(t, startWait, nextWait, "wait value did not increase")
assert.GreaterOrEqual(t, waitDuration, startWait, "next wait duration <= actual wait duration")
})
}
}
17 changes: 12 additions & 5 deletions internal/pkg/core/backoff/equal_jitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@ type EqualJitterBackoff struct {
duration time.Duration
done <-chan struct{}

init time.Duration
max time.Duration
init time.Duration
max time.Duration
nextRand time.Duration

last time.Time
}
Expand All @@ -29,6 +30,7 @@ func NewEqualJitterBackoff(done <-chan struct{}, init, max time.Duration) Backof
done: done,
init: init,
max: max,
nextRand: time.Duration(rand.Int63n(int64(init))),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can put a nolint directive for this line

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will do, I went further and just turned off this linter in #1478

}
}

Expand All @@ -38,13 +40,18 @@ func (b *EqualJitterBackoff) Reset() {
b.duration = b.init * 2
}

func (b *EqualJitterBackoff) NextWait() time.Duration {
// Make sure we have always some minimal back off and jitter.
temp := b.duration / 2
return temp + b.nextRand
}

// Wait block until either the timer is completed or channel is done.
func (b *EqualJitterBackoff) Wait() bool {
// Make sure we have always some minimal back off and jitter.
temp := int64(b.duration / 2)
backoff := time.Duration(temp + rand.Int63n(temp))
backoff := b.NextWait()

// increase duration for next wait.
b.nextRand = time.Duration(rand.Int63n(int64(b.duration)))
b.duration *= 2
if b.duration > b.max {
b.duration = b.max
Expand Down
17 changes: 11 additions & 6 deletions internal/pkg/core/backoff/exponential.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,18 +36,23 @@ func (b *ExpBackoff) Reset() {
b.duration = b.init
}

func (b *ExpBackoff) NextWait() time.Duration {
nextWait := b.duration
nextWait *= 2
if nextWait > b.max {
nextWait = b.max
}
return nextWait
}

// Wait block until either the timer is completed or channel is done.
func (b *ExpBackoff) Wait() bool {
backoff := b.duration
b.duration *= 2
if b.duration > b.max {
b.duration = b.max
}
b.duration = b.NextWait()

select {
case <-b.done:
return false
case <-time.After(backoff):
case <-time.After(b.duration):
b.last = time.Now()
return true
}
Expand Down
23 changes: 13 additions & 10 deletions internal/pkg/fleetapi/checkin_cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,50 +78,53 @@ func NewCheckinCmd(info agentInfo, client client.Sender) *CheckinCmd {
}
}

// Execute enroll the Agent in the Fleet Server.
func (e *CheckinCmd) Execute(ctx context.Context, r *CheckinRequest) (*CheckinResponse, error) {
// Execute enroll the Agent in the Fleet Server. Returns the decoded check in response, a duration indicating
// how long the request took, and an error.
func (e *CheckinCmd) Execute(ctx context.Context, r *CheckinRequest) (*CheckinResponse, time.Duration, error) {
if err := r.Validate(); err != nil {
return nil, err
return nil, 0, err
}

b, err := json.Marshal(r)
if err != nil {
return nil, errors.New(err,
return nil, 0, errors.New(err,
"fail to encode the checkin request",
errors.TypeUnexpected)
}

cp := fmt.Sprintf(checkingPath, e.info.AgentID())
sendStart := time.Now()
resp, err := e.client.Send(ctx, "POST", cp, nil, nil, bytes.NewBuffer(b))
sendDuration := time.Now().Sub(sendStart)
Copy link
Member Author

@cmacknz cmacknz Oct 11, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know that I particularly like manually timing the request like this and returning it all the way up the call stack, but this was the easiest way to do it. In an ideal world we could use request tracing to obtain this information without having to modify our network client.

Perhaps a more sustainable way to do this would be to implement a way for the request tracer to output traced requests for failure requests to our logs.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We found ourselves trying to figure out checkin request durations by looking at the difference in timestamp between the agent and cloud proxy on a few occasions, so this is worth doing at least for this one request for convenience.

if err != nil {
return nil, errors.New(err,
return nil, sendDuration, errors.New(err,
"fail to checkin to fleet-server",
errors.TypeNetwork,
errors.M(errors.MetaKeyURI, cp))
}
defer resp.Body.Close()

if resp.StatusCode != http.StatusOK {
return nil, client.ExtractError(resp.Body)
return nil, sendDuration, client.ExtractError(resp.Body)
}

rs, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, errors.New(err, "failed to read checkin response")
return nil, sendDuration, errors.New(err, "failed to read checkin response")
}

checkinResponse := &CheckinResponse{}
decoder := json.NewDecoder(bytes.NewReader(rs))
if err := decoder.Decode(checkinResponse); err != nil {
return nil, errors.New(err,
return nil, sendDuration, errors.New(err,
"fail to decode checkin response",
errors.TypeNetwork,
errors.M(errors.MetaKeyURI, cp))
}

if err := checkinResponse.Validate(); err != nil {
return nil, err
return nil, sendDuration, err
}

return checkinResponse, nil
return checkinResponse, sendDuration, nil
}
Loading