From 56463e4f79cec67bac7e2cd80cd01bd8e008b6d2 Mon Sep 17 00:00:00 2001 From: Aaron Lehmann Date: Mon, 10 Jul 2017 10:27:34 -0700 Subject: [PATCH] orchestrator/global: Delete tasks when their node is deleted Currently, the global orchestrator leaves associated tasks around when a node is deleted. There is no process for reaping these tasks, as the task reaper keeps a certain number of global service tasks *per node*. Instead, delete the task right away when its node is deleted. This is similar to how the replicated orchestrator deletes tasks when scaling down, since tasks in those slots that are no longer being used would not end up being reaped. Signed-off-by: Aaron Lehmann --- manager/orchestrator/global/global.go | 41 +++++++++++++--------- manager/orchestrator/global/global_test.go | 2 +- 2 files changed, 26 insertions(+), 17 deletions(-) diff --git a/manager/orchestrator/global/global.go b/manager/orchestrator/global/global.go index f5e6b3afc0..b89a105fec 100644 --- a/manager/orchestrator/global/global.go +++ b/manager/orchestrator/global/global.go @@ -159,13 +159,13 @@ func (g *Orchestrator) Run(ctx context.Context) error { switch v.Node.Status.State { // NodeStatus_DISCONNECTED is a transient state, no need to make any change case api.NodeStatus_DOWN: - g.removeTasksFromNode(ctx, v.Node) + g.foreachTaskFromNode(ctx, v.Node, g.shutdownTask) case api.NodeStatus_READY: // node could come back to READY from DOWN or DISCONNECT g.reconcileOneNode(ctx, v.Node) } case api.EventDeleteNode: - g.removeTasksFromNode(ctx, v.Node) + g.foreachTaskFromNode(ctx, v.Node, g.deleteTask) delete(g.nodes, v.Node.ID) case api.EventUpdateTask: g.handleTaskChange(ctx, v.Task) @@ -201,7 +201,7 @@ func (g *Orchestrator) FixTask(ctx context.Context, batch *store.Batch, t *api.T } // if the node no longer valid, remove the task if t.NodeID == "" || orchestrator.InvalidNode(node) { - g.removeTask(ctx, batch, t) + g.shutdownTask(ctx, batch, t) return } @@ -236,7 +236,7 @@ func (g *Orchestrator) Stop() { g.restarts.CancelAll() } -func (g *Orchestrator) removeTasksFromNode(ctx context.Context, node *api.Node) { +func (g *Orchestrator) foreachTaskFromNode(ctx context.Context, node *api.Node, cb func(context.Context, *store.Batch, *api.Task)) { var ( tasks []*api.Task err error @@ -245,7 +245,7 @@ func (g *Orchestrator) removeTasksFromNode(ctx context.Context, node *api.Node) tasks, err = store.FindTasks(tx, store.ByNodeID(node.ID)) }) if err != nil { - log.G(ctx).WithError(err).Errorf("global orchestrator: removeTasksFromNode failed finding tasks") + log.G(ctx).WithError(err).Errorf("global orchestrator: foreachTaskFromNode failed finding tasks") return } @@ -253,13 +253,13 @@ func (g *Orchestrator) removeTasksFromNode(ctx context.Context, node *api.Node) for _, t := range tasks { // Global orchestrator only removes tasks from globalServices if _, exists := g.globalServices[t.ServiceID]; exists { - g.removeTask(ctx, batch, t) + cb(ctx, batch, t) } } return nil }) if err != nil { - log.G(ctx).WithError(err).Errorf("global orchestrator: removeTasksFromNode failed batching tasks") + log.G(ctx).WithError(err).Errorf("global orchestrator: foreachTaskFromNode failed batching tasks") } } @@ -314,7 +314,7 @@ func (g *Orchestrator) reconcileServices(ctx context.Context, serviceIDs []strin // if restart policy considers this node has finished its task // it should remove all running tasks if _, exists := nodeCompleted[serviceID][nodeID]; exists || !meetsConstraints { - g.removeTasks(ctx, batch, ntasks) + g.shutdownTasks(ctx, batch, ntasks) continue } @@ -340,7 +340,7 @@ func (g *Orchestrator) reconcileServices(ctx context.Context, serviceIDs []strin // These must be associated with nodes that are drained, or // nodes that no longer exist. for _, ntasks := range nodeTasks[serviceID] { - g.removeTasks(ctx, batch, ntasks) + g.shutdownTasks(ctx, batch, ntasks) } } return nil @@ -382,7 +382,7 @@ func (g *Orchestrator) updateService(service *api.Service) { func (g *Orchestrator) reconcileOneNode(ctx context.Context, node *api.Node) { if node.Spec.Availability == api.NodeAvailabilityDrain { log.G(ctx).Debugf("global orchestrator: node %s in drain state, removing tasks from it", node.ID) - g.removeTasksFromNode(ctx, node) + g.foreachTaskFromNode(ctx, node, g.shutdownTask) return } @@ -447,7 +447,7 @@ func (g *Orchestrator) reconcileServicesOneNode(ctx context.Context, serviceIDs // if restart policy considers this node has finished its task // it should remove all running tasks if completed[serviceID] { - g.removeTasks(ctx, batch, tasks[serviceID]) + g.shutdownTasks(ctx, batch, tasks[serviceID]) continue } @@ -491,7 +491,7 @@ func (g *Orchestrator) reconcileServicesOneNode(ctx context.Context, serviceIDs } else { dirtyTasks = append(dirtyTasks, cleanTasks[1:]...) } - g.removeTasks(ctx, batch, dirtyTasks) + g.shutdownTasks(ctx, batch, dirtyTasks) } } return nil @@ -542,7 +542,7 @@ func (g *Orchestrator) tickTasks(ctx context.Context) { g.restartTasks = make(map[string]struct{}) } -func (g *Orchestrator) removeTask(ctx context.Context, batch *store.Batch, t *api.Task) { +func (g *Orchestrator) shutdownTask(ctx context.Context, batch *store.Batch, t *api.Task) { // set existing task DesiredState to TaskStateShutdown // TODO(aaronl): optimistic update? err := batch.Update(func(tx store.Tx) error { @@ -554,7 +554,7 @@ func (g *Orchestrator) removeTask(ctx context.Context, batch *store.Batch, t *ap return nil }) if err != nil { - log.G(ctx).WithError(err).Errorf("global orchestrator: removeTask failed to remove %s", t.ID) + log.G(ctx).WithError(err).Errorf("global orchestrator: shutdownTask failed to shut down %s", t.ID) } } @@ -572,9 +572,18 @@ func (g *Orchestrator) addTask(ctx context.Context, batch *store.Batch, service } } -func (g *Orchestrator) removeTasks(ctx context.Context, batch *store.Batch, tasks []*api.Task) { +func (g *Orchestrator) shutdownTasks(ctx context.Context, batch *store.Batch, tasks []*api.Task) { for _, t := range tasks { - g.removeTask(ctx, batch, t) + g.shutdownTask(ctx, batch, t) + } +} + +func (g *Orchestrator) deleteTask(ctx context.Context, batch *store.Batch, t *api.Task) { + err := batch.Update(func(tx store.Tx) error { + return store.DeleteTask(tx, t.ID) + }) + if err != nil { + log.G(ctx).WithError(err).Errorf("global orchestrator: deleteTask failed to delete %s", t.ID) } } diff --git a/manager/orchestrator/global/global_test.go b/manager/orchestrator/global/global_test.go index f2ba0c292c..886ed18157 100644 --- a/manager/orchestrator/global/global_test.go +++ b/manager/orchestrator/global/global_test.go @@ -155,7 +155,7 @@ func TestDeleteNode(t *testing.T) { deleteNode(t, store, node1) // task should be set to dead - observedTask := testutils.WatchShutdownTask(t, watch) + observedTask := testutils.WatchTaskDelete(t, watch) assert.Equal(t, observedTask.ServiceAnnotations.Name, "name1") assert.Equal(t, observedTask.NodeID, "nodeid1") }