From e0e280f8533d6518567bace748c599d0b6c25821 Mon Sep 17 00:00:00 2001 From: Anshul Pundir Date: Wed, 17 Jan 2018 11:36:25 -0800 Subject: [PATCH] [agent] debug logs for session, node events on dispatcher, heartbeats. Signed-off-by: Anshul Pundir --- agent/agent.go | 2 +- agent/session.go | 10 ++++++++++ connectionbroker/broker.go | 6 ++++++ manager/dispatcher/dispatcher.go | 17 ++++++++++++----- 4 files changed, 29 insertions(+), 6 deletions(-) diff --git a/agent/agent.go b/agent/agent.go index dc72fa0407..d28be94385 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -333,11 +333,11 @@ func (a *Agent) run(ctx context.Context) { a.config.SessionTracker.SessionError(err) } - log.G(ctx).WithError(err).Error("agent: session failed") backoff = initialSessionFailureBackoff + 2*backoff if backoff > maxSessionFailureBackoff { backoff = maxSessionFailureBackoff } + log.G(ctx).WithError(err).WithField("backoff", backoff).Errorf("agent: session failed") } if err := session.close(); err != nil { diff --git a/agent/session.go b/agent/session.go index 0c00c4f1bf..d937c442e7 100644 --- a/agent/session.go +++ b/agent/session.go @@ -65,6 +65,8 @@ func newSession(ctx context.Context, agent *Agent, delay time.Duration, sessionI grpc.WithTransportCredentials(agent.config.Credentials), grpc.WithTimeout(dispatcherRPCTimeout), ) + log.G(ctx).Infof("manager selected by agent for new session: %v", cc.Peer()) + if err != nil { s.errs <- err return s @@ -77,6 +79,7 @@ func newSession(ctx context.Context, agent *Agent, delay time.Duration, sessionI func (s *session) run(ctx context.Context, delay time.Duration, description *api.NodeDescription) { timer := time.NewTimer(delay) // delay before registering. + log.G(ctx).Infof("waiting %v before registering session", delay) defer timer.Stop() select { case <-timer.C: @@ -182,6 +185,13 @@ func (s *session) heartbeat(ctx context.Context) error { return err } + fields := logrus.Fields{ + "sessionID": s.sessionID, + "method": "(*session).heartbeat", + } + + log.G(ctx).WithFields(fields).Debugf("sent heartbeat to manager, next heartbeat period: %v", resp.Period) + heartbeat.Reset(resp.Period) case <-s.closed: return errSessionClosed diff --git a/connectionbroker/broker.go b/connectionbroker/broker.go index 43b384ab2a..67b66fd68b 100644 --- a/connectionbroker/broker.go +++ b/connectionbroker/broker.go @@ -58,6 +58,7 @@ func (b *Broker) Select(dialOpts ...grpc.DialOption) (*Conn, error) { // connection. func (b *Broker) SelectRemote(dialOpts ...grpc.DialOption) (*Conn, error) { peer, err := b.remotes.Select() + if err != nil { return nil, err } @@ -98,6 +99,11 @@ type Conn struct { peer api.Peer } +// Peer returns the peer for this Conn. +func (c *Conn) Peer() *api.Peer { + return &c.peer +} + // Close closes the client connection if it is a remote connection. It also // records a positive experience with the remote peer if success is true, // otherwise it records a negative experience. If a local connection is in use, diff --git a/manager/dispatcher/dispatcher.go b/manager/dispatcher/dispatcher.go index 13d68293ae..a4f19abbbe 100644 --- a/manager/dispatcher/dispatcher.go +++ b/manager/dispatcher/dispatcher.go @@ -362,12 +362,13 @@ func (d *Dispatcher) markNodesUnknown(ctx context.Context) error { nodeCopy := node expireFunc := func() { if err := d.moveTasksToOrphaned(nodeCopy.ID); err != nil { - log.WithError(err).Error(`failed to move all tasks to "ORPHANED" state`) + log.WithError(err).Errorf(`failed to move all tasks for node %s to "ORPHANED" state`, node.ID) } d.downNodes.Delete(nodeCopy.ID) } + log.Infof(`node %s was found to be down when marking unknown on dispatcher start`, node.ID) d.downNodes.Add(nodeCopy, expireFunc) return nil } @@ -379,16 +380,16 @@ func (d *Dispatcher) markNodesUnknown(ctx context.Context) error { expireFunc := func() { log := log.WithField("node", nodeID) - log.Debug("heartbeat expiration for unknown node") + log.Info("heartbeat expiration for node in unknown state %s", nodeID) if err := d.markNodeNotReady(nodeID, api.NodeStatus_DOWN, `heartbeat failure for node in "unknown" state`); err != nil { log.WithError(err).Error(`failed deregistering node after heartbeat expiration for node in "unknown" state`) } } if err := d.nodes.AddUnknown(node, expireFunc); err != nil { - return errors.Wrap(err, `adding node in "unknown" state to node store failed`) + return errors.Wrapf(err, `adding node %s in "unknown" state to node store failed`, nodeID) } if err := store.UpdateNode(tx, node); err != nil { - return errors.Wrap(err, "update failed") + return errors.Wrapf(err, "update for node %s failed", nodeID) } return nil }) @@ -499,13 +500,14 @@ func (d *Dispatcher) register(ctx context.Context, nodeID string, description *a } expireFunc := func() { - log.G(ctx).Debug("heartbeat expiration") + log.G(ctx).Debug("heartbeat expiration for worker %s", nodeID) if err := d.markNodeNotReady(nodeID, api.NodeStatus_DOWN, "heartbeat failure"); err != nil { log.G(ctx).WithError(err).Errorf("failed deregistering node after heartbeat expiration") } } rn := d.nodes.Add(node, expireFunc) + log.G(ctx).Infof("node %s was successfully registered", nodeID) // NOTE(stevvooe): We need be a little careful with re-registration. The // current implementation just matches the node id and then gives away the @@ -1048,6 +1050,7 @@ func (d *Dispatcher) markNodeNotReady(id string, state api.NodeStatus_State, mes } expireFunc := func() { + log.G(dctx).Debugf(`moving all tasks to "ORPHANED" state for worker %s`, id) if err := d.moveTasksToOrphaned(id); err != nil { log.G(dctx).WithError(err).Error(`failed to move all tasks to "ORPHANED" state`) } @@ -1094,6 +1097,8 @@ func (d *Dispatcher) Heartbeat(ctx context.Context, r *api.HeartbeatRequest) (*a } period, err := d.nodes.Heartbeat(nodeInfo.NodeID, r.SessionID) + + log.G(ctx).WithField("method", "(*Dispatcher).Heartbeat").Infof("Received heartbeat from worker %s, expect next heartbeat in %v", nodeInfo.NodeID, period) return &api.HeartbeatResponse{Period: period}, err } @@ -1206,6 +1211,8 @@ func (d *Dispatcher) Session(r *api.SessionRequest, stream api.Dispatcher_Sessio } } + log.Infof("dispatcher session dropped, marking node %s down", nodeID) + if err := d.markNodeNotReady(nodeID, api.NodeStatus_DISCONNECTED, "node is currently trying to find new manager"); err != nil { log.WithError(err).Error("failed to remove node") }