diff --git a/agent/agent.go b/agent/agent.go index dc72fa0407..d28be94385 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -333,11 +333,11 @@ func (a *Agent) run(ctx context.Context) { a.config.SessionTracker.SessionError(err) } - log.G(ctx).WithError(err).Error("agent: session failed") backoff = initialSessionFailureBackoff + 2*backoff if backoff > maxSessionFailureBackoff { backoff = maxSessionFailureBackoff } + log.G(ctx).WithError(err).WithField("backoff", backoff).Errorf("agent: session failed") } if err := session.close(); err != nil { diff --git a/agent/session.go b/agent/session.go index 0c00c4f1bf..9bb9773a6c 100644 --- a/agent/session.go +++ b/agent/session.go @@ -65,10 +65,14 @@ func newSession(ctx context.Context, agent *Agent, delay time.Duration, sessionI grpc.WithTransportCredentials(agent.config.Credentials), grpc.WithTimeout(dispatcherRPCTimeout), ) + if err != nil { s.errs <- err return s } + + log.G(ctx).Infof("manager selected by agent for new session: %v", cc.Peer()) + s.conn = cc go s.run(sessionCtx, delay, description) @@ -77,6 +81,7 @@ func newSession(ctx context.Context, agent *Agent, delay time.Duration, sessionI func (s *session) run(ctx context.Context, delay time.Duration, description *api.NodeDescription) { timer := time.NewTimer(delay) // delay before registering. + log.G(ctx).Infof("waiting %v before registering session", delay) defer timer.Stop() select { case <-timer.C: @@ -166,15 +171,23 @@ func (s *session) heartbeat(ctx context.Context) error { heartbeat := time.NewTimer(1) // send out a heartbeat right away defer heartbeat.Stop() + fields := logrus.Fields{ + "sessionID": s.sessionID, + "method": "(*session).heartbeat", + } + for { select { case <-heartbeat.C: heartbeatCtx, cancel := context.WithTimeout(ctx, dispatcherRPCTimeout) + // TODO(anshul) log manager info in all logs in this function. + log.G(ctx).WithFields(fields).Debugf("sending heartbeat to manager %v with timeout %v", s.conn.Peer(), dispatcherRPCTimeout) resp, err := client.Heartbeat(heartbeatCtx, &api.HeartbeatRequest{ SessionID: s.sessionID, }) cancel() if err != nil { + log.G(ctx).WithFields(fields).WithError(err).Errorf("heartbeat to manager %v failed", s.conn.Peer()) if grpc.Code(err) == codes.NotFound { err = errNodeNotRegistered } @@ -182,6 +195,8 @@ func (s *session) heartbeat(ctx context.Context) error { return err } + log.G(ctx).WithFields(fields).Debugf("heartbeat successful to manager %v, next heartbeat period: %v", s.conn.Peer(), resp.Period) + heartbeat.Reset(resp.Period) case <-s.closed: return errSessionClosed @@ -408,7 +423,7 @@ func (s *session) sendError(err error) { } } -// close closing session. It should be called only in <-session.errs branch +// close the given session. It should be called only in <-session.errs branch // of event loop, or when cleaning up the agent. func (s *session) close() error { s.closeOnce.Do(func() { diff --git a/connectionbroker/broker.go b/connectionbroker/broker.go index 43b384ab2a..a5510a9ff0 100644 --- a/connectionbroker/broker.go +++ b/connectionbroker/broker.go @@ -58,6 +58,7 @@ func (b *Broker) Select(dialOpts ...grpc.DialOption) (*Conn, error) { // connection. func (b *Broker) SelectRemote(dialOpts ...grpc.DialOption) (*Conn, error) { peer, err := b.remotes.Select() + if err != nil { return nil, err } @@ -98,6 +99,11 @@ type Conn struct { peer api.Peer } +// Peer returns the peer for this Conn. +func (c *Conn) Peer() api.Peer { + return c.peer +} + // Close closes the client connection if it is a remote connection. It also // records a positive experience with the remote peer if success is true, // otherwise it records a negative experience. If a local connection is in use, diff --git a/manager/dispatcher/dispatcher.go b/manager/dispatcher/dispatcher.go index 13d68293ae..bf48818ea5 100644 --- a/manager/dispatcher/dispatcher.go +++ b/manager/dispatcher/dispatcher.go @@ -125,8 +125,12 @@ type clusterUpdate struct { // Dispatcher is responsible for dispatching tasks and tracking agent health. type Dispatcher struct { - mu sync.Mutex - wg sync.WaitGroup + // mu is a lock to provide mutually exclusive access to dispatcher fields + // e.g. lastSeenManagers, networkBootstrapKeys, lastSeenRootCert etc. + mu sync.Mutex + // shutdownWait is used by stop() to wait for existing operations to finish. + shutdownWait sync.WaitGroup + nodes *nodeStore store *store.MemoryStore lastSeenManagers []*api.WeightedPeer @@ -195,6 +199,9 @@ func getWeightedPeers(cluster Cluster) []*api.WeightedPeer { // Run runs dispatcher tasks which should be run on leader dispatcher. // Dispatcher can be stopped with cancelling ctx or calling Stop(). func (d *Dispatcher) Run(ctx context.Context) error { + ctx = log.WithModule(ctx, "dispatcher") + log.G(ctx).Info("dispatcher starting") + d.taskUpdatesLock.Lock() d.taskUpdates = make(map[string]*api.TaskStatus) d.taskUpdatesLock.Unlock() @@ -208,7 +215,6 @@ func (d *Dispatcher) Run(ctx context.Context) error { d.mu.Unlock() return errors.New("dispatcher is already running") } - ctx = log.WithModule(ctx, "dispatcher") if err := d.markNodesUnknown(ctx); err != nil { log.G(ctx).Errorf(`failed to move all nodes to "unknown" state: %v`, err) } @@ -247,8 +253,11 @@ func (d *Dispatcher) Run(ctx context.Context) error { defer cancel() d.ctx, d.cancel = context.WithCancel(ctx) ctx = d.ctx - d.wg.Add(1) - defer d.wg.Done() + + // If Stop() is called, it should wait + // for Run() to complete. + d.shutdownWait.Add(1) + defer d.shutdownWait.Done() d.mu.Unlock() publishManagers := func(peers []*api.Peer) { @@ -310,8 +319,15 @@ func (d *Dispatcher) Stop() error { d.mu.Unlock() return errors.New("dispatcher is already stopped") } + + // Cancel dispatcher context. + // This should also close the the streams in Tasks(), Assignments(). d.cancel() d.mu.Unlock() + + // Wait for the RPCs that are in-progress to finish. + d.shutdownWait.Wait() + d.nodes.Clean() d.processUpdatesLock.Lock() @@ -322,9 +338,6 @@ func (d *Dispatcher) Stop() error { d.processUpdatesLock.Unlock() d.clusterUpdateQueue.Close() - - d.wg.Wait() - return nil } @@ -361,13 +374,15 @@ func (d *Dispatcher) markNodesUnknown(ctx context.Context) error { if node.Status.State == api.NodeStatus_DOWN { nodeCopy := node expireFunc := func() { + log.Infof("moving tasks to orphaned state for node: %s", nodeCopy.ID) if err := d.moveTasksToOrphaned(nodeCopy.ID); err != nil { - log.WithError(err).Error(`failed to move all tasks to "ORPHANED" state`) + log.WithError(err).Errorf(`failed to move all tasks for node %s to "ORPHANED" state`, node.ID) } d.downNodes.Delete(nodeCopy.ID) } + log.Infof(`node %s was found to be down when marking unknown on dispatcher start`, node.ID) d.downNodes.Add(nodeCopy, expireFunc) return nil } @@ -379,16 +394,16 @@ func (d *Dispatcher) markNodesUnknown(ctx context.Context) error { expireFunc := func() { log := log.WithField("node", nodeID) - log.Debug("heartbeat expiration for unknown node") + log.Info(`heartbeat expiration for node %s in state "unknown"`, nodeID) if err := d.markNodeNotReady(nodeID, api.NodeStatus_DOWN, `heartbeat failure for node in "unknown" state`); err != nil { log.WithError(err).Error(`failed deregistering node after heartbeat expiration for node in "unknown" state`) } } if err := d.nodes.AddUnknown(node, expireFunc); err != nil { - return errors.Wrap(err, `adding node in "unknown" state to node store failed`) + return errors.Wrapf(err, `adding node %s in "unknown" state to node store failed`, nodeID) } if err := store.UpdateNode(tx, node); err != nil { - return errors.Wrap(err, "update failed") + return errors.Wrapf(err, "update for node %s failed", nodeID) } return nil }) @@ -470,12 +485,13 @@ func nodeIPFromContext(ctx context.Context) (string, error) { // register is used for registration of node with particular dispatcher. func (d *Dispatcher) register(ctx context.Context, nodeID string, description *api.NodeDescription) (string, error) { - // prevent register until we're ready to accept it dctx, err := d.isRunningLocked() if err != nil { return "", err } + logLocal := log.G(ctx).WithField("method", "(*Dispatcher).register") + if err := d.nodes.CheckRateLimit(nodeID); err != nil { return "", err } @@ -491,7 +507,7 @@ func (d *Dispatcher) register(ctx context.Context, nodeID string, description *a addr, err := nodeIPFromContext(ctx) if err != nil { - log.G(ctx).WithError(err).Debug("failed to get remote node IP") + logLocal.WithError(err).Debug("failed to get remote node IP") } if err := d.markNodeReady(dctx, nodeID, description, addr); err != nil { @@ -499,13 +515,14 @@ func (d *Dispatcher) register(ctx context.Context, nodeID string, description *a } expireFunc := func() { - log.G(ctx).Debug("heartbeat expiration") + log.G(ctx).Debug("heartbeat expiration for worker %s, setting worker status to NodeStatus_DOWN ", nodeID) if err := d.markNodeNotReady(nodeID, api.NodeStatus_DOWN, "heartbeat failure"); err != nil { log.G(ctx).WithError(err).Errorf("failed deregistering node after heartbeat expiration") } } rn := d.nodes.Add(node, expireFunc) + logLocal.Infof("worker %s was successfully registered", nodeID) // NOTE(stevvooe): We need be a little careful with re-registration. The // current implementation just matches the node id and then gives away the @@ -522,6 +539,21 @@ func (d *Dispatcher) register(ctx context.Context, nodeID string, description *a // UpdateTaskStatus updates status of task. Node should send such updates // on every status change of its tasks. func (d *Dispatcher) UpdateTaskStatus(ctx context.Context, r *api.UpdateTaskStatusRequest) (*api.UpdateTaskStatusResponse, error) { + // shutdownWait.Add() followed by isRunning() to ensures that + // if this rpc sees the dispatcher running, + // it will already have called Add() on the shutdownWait wait, + // which ensures that Stop() will wait for this rpc to complete. + // Note that Stop() first does Dispatcher.ctx.cancel() followed by + // shutdownWait.Wait() to make sure new rpc's don't start before waiting + // for existing ones to finish. + d.shutdownWait.Add(1) + defer d.shutdownWait.Done() + + dctx, err := d.isRunningLocked() + if err != nil { + return nil, err + } + nodeInfo, err := ca.RemoteNode(ctx) if err != nil { return nil, err @@ -537,11 +569,6 @@ func (d *Dispatcher) UpdateTaskStatus(ctx context.Context, r *api.UpdateTaskStat } log := log.G(ctx).WithFields(fields) - dctx, err := d.isRunningLocked() - if err != nil { - return nil, err - } - if _, err := d.nodes.GetWithSession(nodeID, r.SessionID); err != nil { return nil, err } @@ -713,16 +740,26 @@ func (d *Dispatcher) processUpdates(ctx context.Context) { // of tasks which should be run on node, if task is not present in that list, // it should be terminated. func (d *Dispatcher) Tasks(r *api.TasksRequest, stream api.Dispatcher_TasksServer) error { - nodeInfo, err := ca.RemoteNode(stream.Context()) + // shutdownWait.Add() followed by isRunning() to ensures that + // if this rpc sees the dispatcher running, + // it will already have called Add() on the shutdownWait wait, + // which ensures that Stop() will wait for this rpc to complete. + // Note that Stop() first does Dispatcher.ctx.cancel() followed by + // shutdownWait.Wait() to make sure new rpc's don't start before waiting + // for existing ones to finish. + d.shutdownWait.Add(1) + defer d.shutdownWait.Done() + + dctx, err := d.isRunningLocked() if err != nil { return err } - nodeID := nodeInfo.NodeID - dctx, err := d.isRunningLocked() + nodeInfo, err := ca.RemoteNode(stream.Context()) if err != nil { return err } + nodeID := nodeInfo.NodeID fields := logrus.Fields{ "node.id": nodeID, @@ -836,16 +873,26 @@ func (d *Dispatcher) Tasks(r *api.TasksRequest, stream api.Dispatcher_TasksServe // Assignments is a stream of assignments for a node. Each message contains // either full list of tasks and secrets for the node, or an incremental update. func (d *Dispatcher) Assignments(r *api.AssignmentsRequest, stream api.Dispatcher_AssignmentsServer) error { - nodeInfo, err := ca.RemoteNode(stream.Context()) + // shutdownWait.Add() followed by isRunning() to ensures that + // if this rpc sees the dispatcher running, + // it will already have called Add() on the shutdownWait wait, + // which ensures that Stop() will wait for this rpc to complete. + // Note that Stop() first does Dispatcher.ctx.cancel() followed by + // shutdownWait.Wait() to make sure new rpc's don't start before waiting + // for existing ones to finish. + d.shutdownWait.Add(1) + defer d.shutdownWait.Done() + + dctx, err := d.isRunningLocked() if err != nil { return err } - nodeID := nodeInfo.NodeID - dctx, err := d.isRunningLocked() + nodeInfo, err := ca.RemoteNode(stream.Context()) if err != nil { return err } + nodeID := nodeInfo.NodeID fields := logrus.Fields{ "node.id": nodeID, @@ -1029,6 +1076,8 @@ func (d *Dispatcher) moveTasksToOrphaned(nodeID string) error { // markNodeNotReady sets the node state to some state other than READY func (d *Dispatcher) markNodeNotReady(id string, state api.NodeStatus_State, message string) error { + logLocal := log.G(d.ctx).WithField("method", "(*Dispatcher).markNodeNotReady") + dctx, err := d.isRunningLocked() if err != nil { return err @@ -1048,6 +1097,7 @@ func (d *Dispatcher) markNodeNotReady(id string, state api.NodeStatus_State, mes } expireFunc := func() { + log.G(dctx).Debugf(`worker timed-out %s in "down" state, moving all tasks to "ORPHANED" state`, id) if err := d.moveTasksToOrphaned(id); err != nil { log.G(dctx).WithError(err).Error(`failed to move all tasks to "ORPHANED" state`) } @@ -1056,6 +1106,7 @@ func (d *Dispatcher) markNodeNotReady(id string, state api.NodeStatus_State, mes } d.downNodes.Add(node, expireFunc) + logLocal.Debugf("added node %s to down nodes list", node.ID) status := &api.NodeStatus{ State: state, @@ -1080,6 +1131,7 @@ func (d *Dispatcher) markNodeNotReady(id string, state api.NodeStatus_State, mes if rn := d.nodes.Delete(id); rn == nil { return errors.Errorf("node %s is not found in local storage", id) } + logLocal.Debugf("deleted node %s from node store", node.ID) return nil } @@ -1088,12 +1140,32 @@ func (d *Dispatcher) markNodeNotReady(id string, state api.NodeStatus_State, mes // Node should send new heartbeat earlier than now + TTL, otherwise it will // be deregistered from dispatcher and its status will be updated to NodeStatus_DOWN func (d *Dispatcher) Heartbeat(ctx context.Context, r *api.HeartbeatRequest) (*api.HeartbeatResponse, error) { + // shutdownWait.Add() followed by isRunning() to ensures that + // if this rpc sees the dispatcher running, + // it will already have called Add() on the shutdownWait wait, + // which ensures that Stop() will wait for this rpc to complete. + // Note that Stop() first does Dispatcher.ctx.cancel() followed by + // shutdownWait.Wait() to make sure new rpc's don't start before waiting + // for existing ones to finish. + d.shutdownWait.Add(1) + defer d.shutdownWait.Done() + + // isRunningLocked() is not needed since its OK if + // the dispatcher context is cancelled while this call is in progress + // since Stop() which cancels the dispatcher context will wait for + // Heartbeat() to complete. + if !d.isRunning() { + return nil, status.Errorf(codes.Aborted, "dispatcher is stopped") + } + nodeInfo, err := ca.RemoteNode(ctx) if err != nil { return nil, err } period, err := d.nodes.Heartbeat(nodeInfo.NodeID, r.SessionID) + + log.G(ctx).WithField("method", "(*Dispatcher).Heartbeat").Debugf("received heartbeat from worker %v, expect next heartbeat in %v", nodeInfo, period) return &api.HeartbeatResponse{Period: period}, err } @@ -1120,17 +1192,27 @@ func (d *Dispatcher) getRootCACert() []byte { // a special boolean field Disconnect which if true indicates that node should // reconnect to another Manager immediately. func (d *Dispatcher) Session(r *api.SessionRequest, stream api.Dispatcher_SessionServer) error { - ctx := stream.Context() - nodeInfo, err := ca.RemoteNode(ctx) + // shutdownWait.Add() followed by isRunning() to ensures that + // if this rpc sees the dispatcher running, + // it will already have called Add() on the shutdownWait wait, + // which ensures that Stop() will wait for this rpc to complete. + // Note that Stop() first does Dispatcher.ctx.cancel() followed by + // shutdownWait.Wait() to make sure new rpc's don't start before waiting + // for existing ones to finish. + d.shutdownWait.Add(1) + defer d.shutdownWait.Done() + + dctx, err := d.isRunningLocked() if err != nil { return err } - nodeID := nodeInfo.NodeID - dctx, err := d.isRunningLocked() + ctx := stream.Context() + nodeInfo, err := ca.RemoteNode(ctx) if err != nil { return err } + nodeID := nodeInfo.NodeID var sessionID string if _, err := d.nodes.GetWithSession(nodeID, r.SessionID); err != nil { @@ -1206,6 +1288,7 @@ func (d *Dispatcher) Session(r *api.SessionRequest, stream api.Dispatcher_Sessio } } + log.Infof("dispatcher session dropped, marking node %s down", nodeID) if err := d.markNodeNotReady(nodeID, api.NodeStatus_DISCONNECTED, "node is currently trying to find new manager"); err != nil { log.WithError(err).Error("failed to remove node") }