Skip to content

Commit

Permalink
[manager/dispatcher] Synchronize Dispatcher.Stop() with incoming rpcs.
Browse files Browse the repository at this point in the history
Signed-off-by: Anshul Pundir <anshul.pundir@docker.com>
  • Loading branch information
anshulpundir committed Feb 21, 2018
1 parent e596a87 commit 0b5ac34
Showing 1 changed file with 49 additions and 14 deletions.
63 changes: 49 additions & 14 deletions manager/dispatcher/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,8 +125,15 @@ type clusterUpdate struct {

// Dispatcher is responsible for dispatching tasks and tracking agent health.
type Dispatcher struct {
mu sync.Mutex
wg sync.WaitGroup
// Mutex to synchronize access to dispatcher shared state e.g. nodes,
// lastSeenManagers, networkBootstrapKeys etc.
// TODO(anshul): This can potentially be removed and rpcRW used in its place.
mu sync.Mutex
wg sync.WaitGroup
// This RWMutex synchronizes RPC handlers and the dispatcher stop().
// The RPC handlers use the read lock while stop() uses the write lock
// and acts as a barrier to shutdown.
rpcRW sync.RWMutex
nodes *nodeStore
store *store.MemoryStore
lastSeenManagers []*api.WeightedPeer
Expand Down Expand Up @@ -318,7 +325,13 @@ func (d *Dispatcher) Stop() error {
d.cancel()
d.mu.Unlock()

// The active nodes list can be cleaned out only when all
// existing RPCs have finished.
// RPCs that start after rpcRW.Unlock() should find the context
// cancelled and should fail organically.
d.rpcRW.Lock()
d.nodes.Clean()
d.rpcRW.Unlock()

d.processUpdatesLock.Lock()
// In case there are any waiters. There is no chance of any starting
Expand All @@ -329,6 +342,11 @@ func (d *Dispatcher) Stop() error {

d.clusterUpdateQueue.Close()

// TODO(anshul): This use of Wait() could be unsafe.
// According to go's documentation on WaitGroup,
// Add() with a positive delta that occur when the counter is zero
// must happen before a Wait().
// As is, dispatcher Stop() can race with Run().
d.wg.Wait()

return nil
Expand Down Expand Up @@ -532,6 +550,14 @@ func (d *Dispatcher) register(ctx context.Context, nodeID string, description *a
// UpdateTaskStatus updates status of task. Node should send such updates
// on every status change of its tasks.
func (d *Dispatcher) UpdateTaskStatus(ctx context.Context, r *api.UpdateTaskStatusRequest) (*api.UpdateTaskStatusResponse, error) {
d.rpcRW.RLock()
defer d.rpcRW.RUnlock()

dctx, err := d.isRunningLocked()
if err != nil {
return nil, err
}

nodeInfo, err := ca.RemoteNode(ctx)
if err != nil {
return nil, err
Expand All @@ -547,11 +573,6 @@ func (d *Dispatcher) UpdateTaskStatus(ctx context.Context, r *api.UpdateTaskStat
}
log := log.G(ctx).WithFields(fields)

dctx, err := d.isRunningLocked()
if err != nil {
return nil, err
}

if _, err := d.nodes.GetWithSession(nodeID, r.SessionID); err != nil {
return nil, err
}
Expand Down Expand Up @@ -846,16 +867,19 @@ func (d *Dispatcher) Tasks(r *api.TasksRequest, stream api.Dispatcher_TasksServe
// Assignments is a stream of assignments for a node. Each message contains
// either full list of tasks and secrets for the node, or an incremental update.
func (d *Dispatcher) Assignments(r *api.AssignmentsRequest, stream api.Dispatcher_AssignmentsServer) error {
nodeInfo, err := ca.RemoteNode(stream.Context())
d.rpcRW.RLock()
defer d.rpcRW.RUnlock()

dctx, err := d.isRunningLocked()
if err != nil {
return err
}
nodeID := nodeInfo.NodeID

dctx, err := d.isRunningLocked()
nodeInfo, err := ca.RemoteNode(stream.Context())
if err != nil {
return err
}
nodeID := nodeInfo.NodeID

fields := logrus.Fields{
"node.id": nodeID,
Expand Down Expand Up @@ -1103,6 +1127,13 @@ func (d *Dispatcher) markNodeNotReady(id string, state api.NodeStatus_State, mes
// Node should send new heartbeat earlier than now + TTL, otherwise it will
// be deregistered from dispatcher and its status will be updated to NodeStatus_DOWN
func (d *Dispatcher) Heartbeat(ctx context.Context, r *api.HeartbeatRequest) (*api.HeartbeatResponse, error) {
d.rpcRW.RLock()
defer d.rpcRW.RUnlock()

if !d.isRunning() {
return nil, status.Errorf(codes.Aborted, "dispatcher is stopped")
}

nodeInfo, err := ca.RemoteNode(ctx)
if err != nil {
return nil, err
Expand Down Expand Up @@ -1137,17 +1168,21 @@ func (d *Dispatcher) getRootCACert() []byte {
// a special boolean field Disconnect which if true indicates that node should
// reconnect to another Manager immediately.
func (d *Dispatcher) Session(r *api.SessionRequest, stream api.Dispatcher_SessionServer) error {
ctx := stream.Context()
nodeInfo, err := ca.RemoteNode(ctx)
d.rpcRW.RLock()
defer d.rpcRW.RUnlock()

dctx, err := d.isRunningLocked()
if err != nil {
return err
}
nodeID := nodeInfo.NodeID

dctx, err := d.isRunningLocked()
ctx := stream.Context()

nodeInfo, err := ca.RemoteNode(ctx)
if err != nil {
return err
}
nodeID := nodeInfo.NodeID

var sessionID string
if _, err := d.nodes.GetWithSession(nodeID, r.SessionID); err != nil {
Expand Down

0 comments on commit 0b5ac34

Please sign in to comment.