Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[agent] debug logs for session, node events on dispatcher, heartbeats #2486

Merged
merged 1 commit into from
Jan 24, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -333,11 +333,11 @@ func (a *Agent) run(ctx context.Context) {
a.config.SessionTracker.SessionError(err)
}

log.G(ctx).WithError(err).Error("agent: session failed")
backoff = initialSessionFailureBackoff + 2*backoff
if backoff > maxSessionFailureBackoff {
backoff = maxSessionFailureBackoff
}
log.G(ctx).WithError(err).WithField("backoff", backoff).Errorf("agent: session failed")
}

if err := session.close(); err != nil {
Expand Down
15 changes: 14 additions & 1 deletion agent/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ func newSession(ctx context.Context, agent *Agent, delay time.Duration, sessionI
grpc.WithTransportCredentials(agent.config.Credentials),
grpc.WithTimeout(dispatcherRPCTimeout),
)
log.G(ctx).Infof("manager selected by agent for new session: %v", cc.Peer())

if err != nil {
s.errs <- err
return s
Expand All @@ -77,6 +79,7 @@ func newSession(ctx context.Context, agent *Agent, delay time.Duration, sessionI

func (s *session) run(ctx context.Context, delay time.Duration, description *api.NodeDescription) {
timer := time.NewTimer(delay) // delay before registering.
log.G(ctx).Infof("waiting %v before registering session", delay)
defer timer.Stop()
select {
case <-timer.C:
Expand Down Expand Up @@ -166,22 +169,32 @@ func (s *session) heartbeat(ctx context.Context) error {
heartbeat := time.NewTimer(1) // send out a heartbeat right away
defer heartbeat.Stop()

fields := logrus.Fields{
"sessionID": s.sessionID,
"method": "(*session).heartbeat",
}

for {
select {
case <-heartbeat.C:
heartbeatCtx, cancel := context.WithTimeout(ctx, dispatcherRPCTimeout)
// TODO(anshul) log manager info in all logs in this function.
log.G(ctx).WithFields(fields).Debugf("sending heartbeat to manager %v with timeout %v", s.conn.Peer(), dispatcherRPCTimeout)
resp, err := client.Heartbeat(heartbeatCtx, &api.HeartbeatRequest{
SessionID: s.sessionID,
})
cancel()
if err != nil {
log.G(ctx).WithFields(fields).WithError(err).Errorf("heartbeat to manager %v failed", s.conn.Peer())
if grpc.Code(err) == codes.NotFound {
err = errNodeNotRegistered
}

return err
}

log.G(ctx).WithFields(fields).Debugf("heartbeat successful to manager %v, next heartbeat period: %v", s.conn.Peer(), resp.Period)

heartbeat.Reset(resp.Period)
case <-s.closed:
return errSessionClosed
Expand Down Expand Up @@ -408,7 +421,7 @@ func (s *session) sendError(err error) {
}
}

// close closing session. It should be called only in <-session.errs branch
// close the given session. It should be called only in <-session.errs branch
// of event loop, or when cleaning up the agent.
func (s *session) close() error {
s.closeOnce.Do(func() {
Expand Down
6 changes: 6 additions & 0 deletions connectionbroker/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ func (b *Broker) Select(dialOpts ...grpc.DialOption) (*Conn, error) {
// connection.
func (b *Broker) SelectRemote(dialOpts ...grpc.DialOption) (*Conn, error) {
peer, err := b.remotes.Select()

if err != nil {
return nil, err
}
Expand Down Expand Up @@ -98,6 +99,11 @@ type Conn struct {
peer api.Peer
}

// Peer returns the peer for this Conn.
func (c *Conn) Peer() api.Peer {
return c.peer
}

// Close closes the client connection if it is a remote connection. It also
// records a positive experience with the remote peer if success is true,
// otherwise it records a negative experience. If a local connection is in use,
Expand Down
32 changes: 25 additions & 7 deletions manager/dispatcher/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,9 @@ func getWeightedPeers(cluster Cluster) []*api.WeightedPeer {
// Run runs dispatcher tasks which should be run on leader dispatcher.
// Dispatcher can be stopped with cancelling ctx or calling Stop().
func (d *Dispatcher) Run(ctx context.Context) error {
ctx = log.WithModule(ctx, "dispatcher")
log.G(ctx).Info("dispatcher starting")

d.taskUpdatesLock.Lock()
d.taskUpdates = make(map[string]*api.TaskStatus)
d.taskUpdatesLock.Unlock()
Expand All @@ -208,7 +211,6 @@ func (d *Dispatcher) Run(ctx context.Context) error {
d.mu.Unlock()
return errors.New("dispatcher is already running")
}
ctx = log.WithModule(ctx, "dispatcher")
if err := d.markNodesUnknown(ctx); err != nil {
log.G(ctx).Errorf(`failed to move all nodes to "unknown" state: %v`, err)
}
Expand Down Expand Up @@ -310,8 +312,12 @@ func (d *Dispatcher) Stop() error {
d.mu.Unlock()
return errors.New("dispatcher is already stopped")
}

log := log.G(d.ctx).WithField("method", "(*Dispatcher).Stop")
log.Info("dispatcher stopping")
d.cancel()
d.mu.Unlock()

d.nodes.Clean()

d.processUpdatesLock.Lock()
Expand Down Expand Up @@ -361,13 +367,15 @@ func (d *Dispatcher) markNodesUnknown(ctx context.Context) error {
if node.Status.State == api.NodeStatus_DOWN {
nodeCopy := node
expireFunc := func() {
log.Infof("moving tasks to orphaned state for node: %s", nodeCopy.ID)
if err := d.moveTasksToOrphaned(nodeCopy.ID); err != nil {
log.WithError(err).Error(`failed to move all tasks to "ORPHANED" state`)
log.WithError(err).Errorf(`failed to move all tasks for node %s to "ORPHANED" state`, node.ID)
}

d.downNodes.Delete(nodeCopy.ID)
}

log.Infof(`node %s was found to be down when marking unknown on dispatcher start`, node.ID)
d.downNodes.Add(nodeCopy, expireFunc)
return nil
}
Expand All @@ -379,16 +387,16 @@ func (d *Dispatcher) markNodesUnknown(ctx context.Context) error {

expireFunc := func() {
log := log.WithField("node", nodeID)
log.Debug("heartbeat expiration for unknown node")
log.Info(`heartbeat expiration for node %s in state "unknown"`, nodeID)
if err := d.markNodeNotReady(nodeID, api.NodeStatus_DOWN, `heartbeat failure for node in "unknown" state`); err != nil {
log.WithError(err).Error(`failed deregistering node after heartbeat expiration for node in "unknown" state`)
}
}
if err := d.nodes.AddUnknown(node, expireFunc); err != nil {
return errors.Wrap(err, `adding node in "unknown" state to node store failed`)
return errors.Wrapf(err, `adding node %s in "unknown" state to node store failed`, nodeID)
}
if err := store.UpdateNode(tx, node); err != nil {
return errors.Wrap(err, "update failed")
return errors.Wrapf(err, "update for node %s failed", nodeID)
}
return nil
})
Expand Down Expand Up @@ -470,6 +478,7 @@ func nodeIPFromContext(ctx context.Context) (string, error) {

// register is used for registration of node with particular dispatcher.
func (d *Dispatcher) register(ctx context.Context, nodeID string, description *api.NodeDescription) (string, error) {
logLocal := log.G(ctx).WithField("method", "(*Dispatcher).register")
// prevent register until we're ready to accept it
dctx, err := d.isRunningLocked()
if err != nil {
Expand All @@ -491,21 +500,22 @@ func (d *Dispatcher) register(ctx context.Context, nodeID string, description *a

addr, err := nodeIPFromContext(ctx)
if err != nil {
log.G(ctx).WithError(err).Debug("failed to get remote node IP")
logLocal.WithError(err).Debug("failed to get remote node IP")
}

if err := d.markNodeReady(dctx, nodeID, description, addr); err != nil {
return "", err
}

expireFunc := func() {
log.G(ctx).Debug("heartbeat expiration")
log.G(ctx).Debug("heartbeat expiration for worker %s, setting worker status to NodeStatus_DOWN ", nodeID)
if err := d.markNodeNotReady(nodeID, api.NodeStatus_DOWN, "heartbeat failure"); err != nil {
log.G(ctx).WithError(err).Errorf("failed deregistering node after heartbeat expiration")
}
}

rn := d.nodes.Add(node, expireFunc)
logLocal.Infof("worker %s was successfully registered", nodeID)

// NOTE(stevvooe): We need be a little careful with re-registration. The
// current implementation just matches the node id and then gives away the
Expand Down Expand Up @@ -1029,6 +1039,8 @@ func (d *Dispatcher) moveTasksToOrphaned(nodeID string) error {

// markNodeNotReady sets the node state to some state other than READY
func (d *Dispatcher) markNodeNotReady(id string, state api.NodeStatus_State, message string) error {
logLocal := log.G(d.ctx).WithField("method", "(*Dispatcher).markNodeNotReady")

dctx, err := d.isRunningLocked()
if err != nil {
return err
Expand All @@ -1048,6 +1060,7 @@ func (d *Dispatcher) markNodeNotReady(id string, state api.NodeStatus_State, mes
}

expireFunc := func() {
log.G(dctx).Debugf(`worker timed-out %s in "down" state, moving all tasks to "ORPHANED" state`, id)
if err := d.moveTasksToOrphaned(id); err != nil {
log.G(dctx).WithError(err).Error(`failed to move all tasks to "ORPHANED" state`)
}
Expand All @@ -1056,6 +1069,7 @@ func (d *Dispatcher) markNodeNotReady(id string, state api.NodeStatus_State, mes
}

d.downNodes.Add(node, expireFunc)
logLocal.Debugf("added node %s to down nodes list", node.ID)

status := &api.NodeStatus{
State: state,
Expand All @@ -1080,6 +1094,7 @@ func (d *Dispatcher) markNodeNotReady(id string, state api.NodeStatus_State, mes
if rn := d.nodes.Delete(id); rn == nil {
return errors.Errorf("node %s is not found in local storage", id)
}
logLocal.Debugf("deleted node %s from node store", node.ID)

return nil
}
Expand All @@ -1094,6 +1109,8 @@ func (d *Dispatcher) Heartbeat(ctx context.Context, r *api.HeartbeatRequest) (*a
}

period, err := d.nodes.Heartbeat(nodeInfo.NodeID, r.SessionID)

log.G(ctx).WithField("method", "(*Dispatcher).Heartbeat").Debugf("received heartbeat from worker %v, expect next heartbeat in %v", nodeInfo, period)
return &api.HeartbeatResponse{Period: period}, err
}

Expand Down Expand Up @@ -1206,6 +1223,7 @@ func (d *Dispatcher) Session(r *api.SessionRequest, stream api.Dispatcher_Sessio
}
}

log.Infof("dispatcher session dropped, marking node %s down", nodeID)
if err := d.markNodeNotReady(nodeID, api.NodeStatus_DISCONNECTED, "node is currently trying to find new manager"); err != nil {
log.WithError(err).Error("failed to remove node")
}
Expand Down