Skip to content

Commit

Permalink
[agent] debug logs for session, node events on dispatcher, heartbeats.
Browse files Browse the repository at this point in the history
Signed-off-by: Anshul Pundir <anshul.pundir@docker.com>
  • Loading branch information
anshulpundir committed Jan 23, 2018
1 parent 607c4a0 commit f8af5f4
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 9 deletions.
2 changes: 1 addition & 1 deletion agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -333,11 +333,11 @@ func (a *Agent) run(ctx context.Context) {
a.config.SessionTracker.SessionError(err)
}

log.G(ctx).WithError(err).Error("agent: session failed")
backoff = initialSessionFailureBackoff + 2*backoff
if backoff > maxSessionFailureBackoff {
backoff = maxSessionFailureBackoff
}
log.G(ctx).WithError(err).WithField("backoff", backoff).Errorf("agent: session failed")
}

if err := session.close(); err != nil {
Expand Down
15 changes: 14 additions & 1 deletion agent/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ func newSession(ctx context.Context, agent *Agent, delay time.Duration, sessionI
grpc.WithTransportCredentials(agent.config.Credentials),
grpc.WithTimeout(dispatcherRPCTimeout),
)
log.G(ctx).Infof("manager selected by agent for new session: %v", cc.Peer())

if err != nil {
s.errs <- err
return s
Expand All @@ -77,6 +79,7 @@ func newSession(ctx context.Context, agent *Agent, delay time.Duration, sessionI

func (s *session) run(ctx context.Context, delay time.Duration, description *api.NodeDescription) {
timer := time.NewTimer(delay) // delay before registering.
log.G(ctx).Infof("waiting %v before registering session", delay)
defer timer.Stop()
select {
case <-timer.C:
Expand Down Expand Up @@ -166,22 +169,32 @@ func (s *session) heartbeat(ctx context.Context) error {
heartbeat := time.NewTimer(1) // send out a heartbeat right away
defer heartbeat.Stop()

fields := logrus.Fields{
"sessionID": s.sessionID,
"method": "(*session).heartbeat",
}

for {
select {
case <-heartbeat.C:
heartbeatCtx, cancel := context.WithTimeout(ctx, dispatcherRPCTimeout)
// TODO(anshul) log manager info in all logs in this function.
log.G(ctx).WithFields(fields).Debugf("sending heartbeat to manager %v with timeout %v", s.conn.Peer(), dispatcherRPCTimeout)
resp, err := client.Heartbeat(heartbeatCtx, &api.HeartbeatRequest{
SessionID: s.sessionID,
})
cancel()
if err != nil {
if grpc.Code(err) == codes.NotFound {
log.G(ctx).WithFields(fields).WithError(err).Errorf("heartbeat to manager %v failed", s.conn.Peer())
err = errNodeNotRegistered
}

return err
}

log.G(ctx).WithFields(fields).Debugf("heartbeat successful to manager %v, next heartbeat period: %v", s.conn.Peer(), resp.Period)

heartbeat.Reset(resp.Period)
case <-s.closed:
return errSessionClosed
Expand Down Expand Up @@ -408,7 +421,7 @@ func (s *session) sendError(err error) {
}
}

// close closing session. It should be called only in <-session.errs branch
// close the given session. It should be called only in <-session.errs branch
// of event loop, or when cleaning up the agent.
func (s *session) close() error {
s.closeOnce.Do(func() {
Expand Down
10 changes: 10 additions & 0 deletions connectionbroker/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ func (b *Broker) Select(dialOpts ...grpc.DialOption) (*Conn, error) {
// connection.
func (b *Broker) SelectRemote(dialOpts ...grpc.DialOption) (*Conn, error) {
peer, err := b.remotes.Select()

if err != nil {
return nil, err
}
Expand Down Expand Up @@ -98,6 +99,15 @@ type Conn struct {
peer api.Peer
}

// Peer returns the peer for this Conn.
func (c *Conn) Peer() *api.Peer {
if !c.isLocal {
return &c.peer
}

return &api.Peer{}
}

// Close closes the client connection if it is a remote connection. It also
// records a positive experience with the remote peer if success is true,
// otherwise it records a negative experience. If a local connection is in use,
Expand Down
31 changes: 24 additions & 7 deletions manager/dispatcher/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,9 @@ func getWeightedPeers(cluster Cluster) []*api.WeightedPeer {
// Run runs dispatcher tasks which should be run on leader dispatcher.
// Dispatcher can be stopped with cancelling ctx or calling Stop().
func (d *Dispatcher) Run(ctx context.Context) error {
ctx = log.WithModule(ctx, "dispatcher")
log.G(ctx).Info("dispatcher starting")

d.taskUpdatesLock.Lock()
d.taskUpdates = make(map[string]*api.TaskStatus)
d.taskUpdatesLock.Unlock()
Expand All @@ -208,7 +211,6 @@ func (d *Dispatcher) Run(ctx context.Context) error {
d.mu.Unlock()
return errors.New("dispatcher is already running")
}
ctx = log.WithModule(ctx, "dispatcher")
if err := d.markNodesUnknown(ctx); err != nil {
log.G(ctx).Errorf(`failed to move all nodes to "unknown" state: %v`, err)
}
Expand Down Expand Up @@ -306,12 +308,14 @@ func (d *Dispatcher) Run(ctx context.Context) error {
// Stop stops dispatcher and closes all grpc streams.
func (d *Dispatcher) Stop() error {
d.mu.Lock()
log := log.G(d.ctx).WithField("method", "(*Dispatcher).Stop")
if !d.isRunning() {
d.mu.Unlock()
return errors.New("dispatcher is already stopped")
}
d.cancel()
d.mu.Unlock()

d.nodes.Clean()

d.processUpdatesLock.Lock()
Expand All @@ -324,6 +328,7 @@ func (d *Dispatcher) Stop() error {
d.clusterUpdateQueue.Close()

d.wg.Wait()
log.Info("dispatcher stopped")

return nil
}
Expand Down Expand Up @@ -361,13 +366,15 @@ func (d *Dispatcher) markNodesUnknown(ctx context.Context) error {
if node.Status.State == api.NodeStatus_DOWN {
nodeCopy := node
expireFunc := func() {
log.Infof("moving tasks to orphaned state for node: %s", nodeCopy.ID)
if err := d.moveTasksToOrphaned(nodeCopy.ID); err != nil {
log.WithError(err).Error(`failed to move all tasks to "ORPHANED" state`)
log.WithError(err).Errorf(`failed to move all tasks for node %s to "ORPHANED" state`, node.ID)
}

d.downNodes.Delete(nodeCopy.ID)
}

log.Infof(`node %s was found to be down when marking unknown on dispatcher start`, node.ID)
d.downNodes.Add(nodeCopy, expireFunc)
return nil
}
Expand All @@ -379,16 +386,16 @@ func (d *Dispatcher) markNodesUnknown(ctx context.Context) error {

expireFunc := func() {
log := log.WithField("node", nodeID)
log.Debug("heartbeat expiration for unknown node")
log.Info(`heartbeat expiration for node %s in state "unknown"`, nodeID)
if err := d.markNodeNotReady(nodeID, api.NodeStatus_DOWN, `heartbeat failure for node in "unknown" state`); err != nil {
log.WithError(err).Error(`failed deregistering node after heartbeat expiration for node in "unknown" state`)
}
}
if err := d.nodes.AddUnknown(node, expireFunc); err != nil {
return errors.Wrap(err, `adding node in "unknown" state to node store failed`)
return errors.Wrapf(err, `adding node %s in "unknown" state to node store failed`, nodeID)
}
if err := store.UpdateNode(tx, node); err != nil {
return errors.Wrap(err, "update failed")
return errors.Wrapf(err, "update for node %s failed", nodeID)
}
return nil
})
Expand Down Expand Up @@ -470,6 +477,7 @@ func nodeIPFromContext(ctx context.Context) (string, error) {

// register is used for registration of node with particular dispatcher.
func (d *Dispatcher) register(ctx context.Context, nodeID string, description *api.NodeDescription) (string, error) {
logLocal := log.G(ctx).WithField("method", "(*Dispatcher).register")
// prevent register until we're ready to accept it
dctx, err := d.isRunningLocked()
if err != nil {
Expand All @@ -491,21 +499,22 @@ func (d *Dispatcher) register(ctx context.Context, nodeID string, description *a

addr, err := nodeIPFromContext(ctx)
if err != nil {
log.G(ctx).WithError(err).Debug("failed to get remote node IP")
logLocal.WithError(err).Debug("failed to get remote node IP")
}

if err := d.markNodeReady(dctx, nodeID, description, addr); err != nil {
return "", err
}

expireFunc := func() {
log.G(ctx).Debug("heartbeat expiration")
log.G(ctx).Debug("heartbeat expiration for worker %s, setting worker status to NodeStatus_DOWN ", nodeID)
if err := d.markNodeNotReady(nodeID, api.NodeStatus_DOWN, "heartbeat failure"); err != nil {
log.G(ctx).WithError(err).Errorf("failed deregistering node after heartbeat expiration")
}
}

rn := d.nodes.Add(node, expireFunc)
logLocal.Infof("worker %s was successfully registered", nodeID)

// NOTE(stevvooe): We need be a little careful with re-registration. The
// current implementation just matches the node id and then gives away the
Expand Down Expand Up @@ -1029,6 +1038,8 @@ func (d *Dispatcher) moveTasksToOrphaned(nodeID string) error {

// markNodeNotReady sets the node state to some state other than READY
func (d *Dispatcher) markNodeNotReady(id string, state api.NodeStatus_State, message string) error {
logLocal := log.G(d.ctx).WithField("method", "(*Dispatcher).markNodeNotReady")

dctx, err := d.isRunningLocked()
if err != nil {
return err
Expand All @@ -1048,6 +1059,7 @@ func (d *Dispatcher) markNodeNotReady(id string, state api.NodeStatus_State, mes
}

expireFunc := func() {
log.G(dctx).Debugf(`worker timed-out %s in "down" state, moving all tasks to "ORPHANED" state`, id)
if err := d.moveTasksToOrphaned(id); err != nil {
log.G(dctx).WithError(err).Error(`failed to move all tasks to "ORPHANED" state`)
}
Expand All @@ -1056,6 +1068,7 @@ func (d *Dispatcher) markNodeNotReady(id string, state api.NodeStatus_State, mes
}

d.downNodes.Add(node, expireFunc)
logLocal.Debugf("added node %s to down nodes list", node.ID)

status := &api.NodeStatus{
State: state,
Expand All @@ -1080,6 +1093,7 @@ func (d *Dispatcher) markNodeNotReady(id string, state api.NodeStatus_State, mes
if rn := d.nodes.Delete(id); rn == nil {
return errors.Errorf("node %s is not found in local storage", id)
}
logLocal.Debugf("deleted node %s from node store", node.ID)

return nil
}
Expand All @@ -1094,6 +1108,8 @@ func (d *Dispatcher) Heartbeat(ctx context.Context, r *api.HeartbeatRequest) (*a
}

period, err := d.nodes.Heartbeat(nodeInfo.NodeID, r.SessionID)

log.G(ctx).WithField("method", "(*Dispatcher).Heartbeat").Infof("received heartbeat from worker %v, expect next heartbeat in %v", nodeInfo, period)
return &api.HeartbeatResponse{Period: period}, err
}

Expand Down Expand Up @@ -1206,6 +1222,7 @@ func (d *Dispatcher) Session(r *api.SessionRequest, stream api.Dispatcher_Sessio
}
}

log.Infof("dispatcher session dropped, marking node %s down", nodeID)
if err := d.markNodeNotReady(nodeID, api.NodeStatus_DISCONNECTED, "node is currently trying to find new manager"); err != nil {
log.WithError(err).Error("failed to remove node")
}
Expand Down

0 comments on commit f8af5f4

Please sign in to comment.