Skip to content

Commit

Permalink
orchestrator/global: Delete tasks when their node is deleted
Browse files Browse the repository at this point in the history
Currently, the global orchestrator leaves associated tasks around when
a node is deleted. There is no process for reaping these tasks, as the
task reaper keeps a certain number of global service tasks *per node*.
Instead, delete the task right away when its node is deleted. This is
similar to how the replicated orchestrator deletes tasks when scaling
down, since tasks in those slots that are no longer being used would not
end up being reaped.

Signed-off-by: Aaron Lehmann <aaron.lehmann@docker.com>
  • Loading branch information
aaronlehmann committed Jul 10, 2017
1 parent fd73175 commit 56463e4
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 17 deletions.
41 changes: 25 additions & 16 deletions manager/orchestrator/global/global.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,13 +159,13 @@ func (g *Orchestrator) Run(ctx context.Context) error {
switch v.Node.Status.State {
// NodeStatus_DISCONNECTED is a transient state, no need to make any change
case api.NodeStatus_DOWN:
g.removeTasksFromNode(ctx, v.Node)
g.foreachTaskFromNode(ctx, v.Node, g.shutdownTask)
case api.NodeStatus_READY:
// node could come back to READY from DOWN or DISCONNECT
g.reconcileOneNode(ctx, v.Node)
}
case api.EventDeleteNode:
g.removeTasksFromNode(ctx, v.Node)
g.foreachTaskFromNode(ctx, v.Node, g.deleteTask)
delete(g.nodes, v.Node.ID)
case api.EventUpdateTask:
g.handleTaskChange(ctx, v.Task)
Expand Down Expand Up @@ -201,7 +201,7 @@ func (g *Orchestrator) FixTask(ctx context.Context, batch *store.Batch, t *api.T
}
// if the node no longer valid, remove the task
if t.NodeID == "" || orchestrator.InvalidNode(node) {
g.removeTask(ctx, batch, t)
g.shutdownTask(ctx, batch, t)
return
}

Expand Down Expand Up @@ -236,7 +236,7 @@ func (g *Orchestrator) Stop() {
g.restarts.CancelAll()
}

func (g *Orchestrator) removeTasksFromNode(ctx context.Context, node *api.Node) {
func (g *Orchestrator) foreachTaskFromNode(ctx context.Context, node *api.Node, cb func(context.Context, *store.Batch, *api.Task)) {
var (
tasks []*api.Task
err error
Expand All @@ -245,21 +245,21 @@ func (g *Orchestrator) removeTasksFromNode(ctx context.Context, node *api.Node)
tasks, err = store.FindTasks(tx, store.ByNodeID(node.ID))
})
if err != nil {
log.G(ctx).WithError(err).Errorf("global orchestrator: removeTasksFromNode failed finding tasks")
log.G(ctx).WithError(err).Errorf("global orchestrator: foreachTaskFromNode failed finding tasks")
return
}

err = g.store.Batch(func(batch *store.Batch) error {
for _, t := range tasks {
// Global orchestrator only removes tasks from globalServices
if _, exists := g.globalServices[t.ServiceID]; exists {
g.removeTask(ctx, batch, t)
cb(ctx, batch, t)
}
}
return nil
})
if err != nil {
log.G(ctx).WithError(err).Errorf("global orchestrator: removeTasksFromNode failed batching tasks")
log.G(ctx).WithError(err).Errorf("global orchestrator: foreachTaskFromNode failed batching tasks")
}
}

Expand Down Expand Up @@ -314,7 +314,7 @@ func (g *Orchestrator) reconcileServices(ctx context.Context, serviceIDs []strin
// if restart policy considers this node has finished its task
// it should remove all running tasks
if _, exists := nodeCompleted[serviceID][nodeID]; exists || !meetsConstraints {
g.removeTasks(ctx, batch, ntasks)
g.shutdownTasks(ctx, batch, ntasks)
continue
}

Expand All @@ -340,7 +340,7 @@ func (g *Orchestrator) reconcileServices(ctx context.Context, serviceIDs []strin
// These must be associated with nodes that are drained, or
// nodes that no longer exist.
for _, ntasks := range nodeTasks[serviceID] {
g.removeTasks(ctx, batch, ntasks)
g.shutdownTasks(ctx, batch, ntasks)
}
}
return nil
Expand Down Expand Up @@ -382,7 +382,7 @@ func (g *Orchestrator) updateService(service *api.Service) {
func (g *Orchestrator) reconcileOneNode(ctx context.Context, node *api.Node) {
if node.Spec.Availability == api.NodeAvailabilityDrain {
log.G(ctx).Debugf("global orchestrator: node %s in drain state, removing tasks from it", node.ID)
g.removeTasksFromNode(ctx, node)
g.foreachTaskFromNode(ctx, node, g.shutdownTask)
return
}

Expand Down Expand Up @@ -447,7 +447,7 @@ func (g *Orchestrator) reconcileServicesOneNode(ctx context.Context, serviceIDs
// if restart policy considers this node has finished its task
// it should remove all running tasks
if completed[serviceID] {
g.removeTasks(ctx, batch, tasks[serviceID])
g.shutdownTasks(ctx, batch, tasks[serviceID])
continue
}

Expand Down Expand Up @@ -491,7 +491,7 @@ func (g *Orchestrator) reconcileServicesOneNode(ctx context.Context, serviceIDs
} else {
dirtyTasks = append(dirtyTasks, cleanTasks[1:]...)
}
g.removeTasks(ctx, batch, dirtyTasks)
g.shutdownTasks(ctx, batch, dirtyTasks)
}
}
return nil
Expand Down Expand Up @@ -542,7 +542,7 @@ func (g *Orchestrator) tickTasks(ctx context.Context) {
g.restartTasks = make(map[string]struct{})
}

func (g *Orchestrator) removeTask(ctx context.Context, batch *store.Batch, t *api.Task) {
func (g *Orchestrator) shutdownTask(ctx context.Context, batch *store.Batch, t *api.Task) {
// set existing task DesiredState to TaskStateShutdown
// TODO(aaronl): optimistic update?
err := batch.Update(func(tx store.Tx) error {
Expand All @@ -554,7 +554,7 @@ func (g *Orchestrator) removeTask(ctx context.Context, batch *store.Batch, t *ap
return nil
})
if err != nil {
log.G(ctx).WithError(err).Errorf("global orchestrator: removeTask failed to remove %s", t.ID)
log.G(ctx).WithError(err).Errorf("global orchestrator: shutdownTask failed to shut down %s", t.ID)
}
}

Expand All @@ -572,9 +572,18 @@ func (g *Orchestrator) addTask(ctx context.Context, batch *store.Batch, service
}
}

func (g *Orchestrator) removeTasks(ctx context.Context, batch *store.Batch, tasks []*api.Task) {
func (g *Orchestrator) shutdownTasks(ctx context.Context, batch *store.Batch, tasks []*api.Task) {
for _, t := range tasks {
g.removeTask(ctx, batch, t)
g.shutdownTask(ctx, batch, t)
}
}

func (g *Orchestrator) deleteTask(ctx context.Context, batch *store.Batch, t *api.Task) {
err := batch.Update(func(tx store.Tx) error {
return store.DeleteTask(tx, t.ID)
})
if err != nil {
log.G(ctx).WithError(err).Errorf("global orchestrator: deleteTask failed to delete %s", t.ID)
}
}

Expand Down
2 changes: 1 addition & 1 deletion manager/orchestrator/global/global_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ func TestDeleteNode(t *testing.T) {

deleteNode(t, store, node1)
// task should be set to dead
observedTask := testutils.WatchShutdownTask(t, watch)
observedTask := testutils.WatchTaskDelete(t, watch)
assert.Equal(t, observedTask.ServiceAnnotations.Name, "name1")
assert.Equal(t, observedTask.NodeID, "nodeid1")
}
Expand Down

0 comments on commit 56463e4

Please sign in to comment.