Skip to content

Commit

Permalink
Merge pull request #2301 from aaronlehmann/global-service-removed-node
Browse files Browse the repository at this point in the history
orchestrator/global: Delete tasks when their node is deleted
  • Loading branch information
aluzzardi committed Jul 11, 2017
2 parents fd73175 + 56463e4 commit a3d96fe
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 17 deletions.
41 changes: 25 additions & 16 deletions manager/orchestrator/global/global.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,13 +159,13 @@ func (g *Orchestrator) Run(ctx context.Context) error {
switch v.Node.Status.State {
// NodeStatus_DISCONNECTED is a transient state, no need to make any change
case api.NodeStatus_DOWN:
g.removeTasksFromNode(ctx, v.Node)
g.foreachTaskFromNode(ctx, v.Node, g.shutdownTask)
case api.NodeStatus_READY:
// node could come back to READY from DOWN or DISCONNECT
g.reconcileOneNode(ctx, v.Node)
}
case api.EventDeleteNode:
g.removeTasksFromNode(ctx, v.Node)
g.foreachTaskFromNode(ctx, v.Node, g.deleteTask)
delete(g.nodes, v.Node.ID)
case api.EventUpdateTask:
g.handleTaskChange(ctx, v.Task)
Expand Down Expand Up @@ -201,7 +201,7 @@ func (g *Orchestrator) FixTask(ctx context.Context, batch *store.Batch, t *api.T
}
// if the node no longer valid, remove the task
if t.NodeID == "" || orchestrator.InvalidNode(node) {
g.removeTask(ctx, batch, t)
g.shutdownTask(ctx, batch, t)
return
}

Expand Down Expand Up @@ -236,7 +236,7 @@ func (g *Orchestrator) Stop() {
g.restarts.CancelAll()
}

func (g *Orchestrator) removeTasksFromNode(ctx context.Context, node *api.Node) {
func (g *Orchestrator) foreachTaskFromNode(ctx context.Context, node *api.Node, cb func(context.Context, *store.Batch, *api.Task)) {
var (
tasks []*api.Task
err error
Expand All @@ -245,21 +245,21 @@ func (g *Orchestrator) removeTasksFromNode(ctx context.Context, node *api.Node)
tasks, err = store.FindTasks(tx, store.ByNodeID(node.ID))
})
if err != nil {
log.G(ctx).WithError(err).Errorf("global orchestrator: removeTasksFromNode failed finding tasks")
log.G(ctx).WithError(err).Errorf("global orchestrator: foreachTaskFromNode failed finding tasks")
return
}

err = g.store.Batch(func(batch *store.Batch) error {
for _, t := range tasks {
// Global orchestrator only removes tasks from globalServices
if _, exists := g.globalServices[t.ServiceID]; exists {
g.removeTask(ctx, batch, t)
cb(ctx, batch, t)
}
}
return nil
})
if err != nil {
log.G(ctx).WithError(err).Errorf("global orchestrator: removeTasksFromNode failed batching tasks")
log.G(ctx).WithError(err).Errorf("global orchestrator: foreachTaskFromNode failed batching tasks")
}
}

Expand Down Expand Up @@ -314,7 +314,7 @@ func (g *Orchestrator) reconcileServices(ctx context.Context, serviceIDs []strin
// if restart policy considers this node has finished its task
// it should remove all running tasks
if _, exists := nodeCompleted[serviceID][nodeID]; exists || !meetsConstraints {
g.removeTasks(ctx, batch, ntasks)
g.shutdownTasks(ctx, batch, ntasks)
continue
}

Expand All @@ -340,7 +340,7 @@ func (g *Orchestrator) reconcileServices(ctx context.Context, serviceIDs []strin
// These must be associated with nodes that are drained, or
// nodes that no longer exist.
for _, ntasks := range nodeTasks[serviceID] {
g.removeTasks(ctx, batch, ntasks)
g.shutdownTasks(ctx, batch, ntasks)
}
}
return nil
Expand Down Expand Up @@ -382,7 +382,7 @@ func (g *Orchestrator) updateService(service *api.Service) {
func (g *Orchestrator) reconcileOneNode(ctx context.Context, node *api.Node) {
if node.Spec.Availability == api.NodeAvailabilityDrain {
log.G(ctx).Debugf("global orchestrator: node %s in drain state, removing tasks from it", node.ID)
g.removeTasksFromNode(ctx, node)
g.foreachTaskFromNode(ctx, node, g.shutdownTask)
return
}

Expand Down Expand Up @@ -447,7 +447,7 @@ func (g *Orchestrator) reconcileServicesOneNode(ctx context.Context, serviceIDs
// if restart policy considers this node has finished its task
// it should remove all running tasks
if completed[serviceID] {
g.removeTasks(ctx, batch, tasks[serviceID])
g.shutdownTasks(ctx, batch, tasks[serviceID])
continue
}

Expand Down Expand Up @@ -491,7 +491,7 @@ func (g *Orchestrator) reconcileServicesOneNode(ctx context.Context, serviceIDs
} else {
dirtyTasks = append(dirtyTasks, cleanTasks[1:]...)
}
g.removeTasks(ctx, batch, dirtyTasks)
g.shutdownTasks(ctx, batch, dirtyTasks)
}
}
return nil
Expand Down Expand Up @@ -542,7 +542,7 @@ func (g *Orchestrator) tickTasks(ctx context.Context) {
g.restartTasks = make(map[string]struct{})
}

func (g *Orchestrator) removeTask(ctx context.Context, batch *store.Batch, t *api.Task) {
func (g *Orchestrator) shutdownTask(ctx context.Context, batch *store.Batch, t *api.Task) {
// set existing task DesiredState to TaskStateShutdown
// TODO(aaronl): optimistic update?
err := batch.Update(func(tx store.Tx) error {
Expand All @@ -554,7 +554,7 @@ func (g *Orchestrator) removeTask(ctx context.Context, batch *store.Batch, t *ap
return nil
})
if err != nil {
log.G(ctx).WithError(err).Errorf("global orchestrator: removeTask failed to remove %s", t.ID)
log.G(ctx).WithError(err).Errorf("global orchestrator: shutdownTask failed to shut down %s", t.ID)
}
}

Expand All @@ -572,9 +572,18 @@ func (g *Orchestrator) addTask(ctx context.Context, batch *store.Batch, service
}
}

func (g *Orchestrator) removeTasks(ctx context.Context, batch *store.Batch, tasks []*api.Task) {
func (g *Orchestrator) shutdownTasks(ctx context.Context, batch *store.Batch, tasks []*api.Task) {
for _, t := range tasks {
g.removeTask(ctx, batch, t)
g.shutdownTask(ctx, batch, t)
}
}

func (g *Orchestrator) deleteTask(ctx context.Context, batch *store.Batch, t *api.Task) {
err := batch.Update(func(tx store.Tx) error {
return store.DeleteTask(tx, t.ID)
})
if err != nil {
log.G(ctx).WithError(err).Errorf("global orchestrator: deleteTask failed to delete %s", t.ID)
}
}

Expand Down
2 changes: 1 addition & 1 deletion manager/orchestrator/global/global_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ func TestDeleteNode(t *testing.T) {

deleteNode(t, store, node1)
// task should be set to dead
observedTask := testutils.WatchShutdownTask(t, watch)
observedTask := testutils.WatchTaskDelete(t, watch)
assert.Equal(t, observedTask.ServiceAnnotations.Name, "name1")
assert.Equal(t, observedTask.NodeID, "nodeid1")
}
Expand Down

0 comments on commit a3d96fe

Please sign in to comment.