Skip to content

Commit

Permalink
system_scheduler: support disconnected clients (#12555)
Browse files Browse the repository at this point in the history
* structs: Add helper method for checking if alloc is configured to disconnect
* system_scheduler: Add support for disconnected clients
  • Loading branch information
DerekStrickland committed Apr 15, 2022
1 parent fd21ceb commit f5de802
Show file tree
Hide file tree
Showing 7 changed files with 991 additions and 31 deletions.
18 changes: 18 additions & 0 deletions nomad/structs/structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -9935,6 +9935,24 @@ func (a *Allocation) DisconnectTimeout(now time.Time) time.Time {
return now.Add(*timeout)
}

// SupportsDisconnectedClients determines whether both the server and the task group
// are configured to allow the allocation to reconnect after network connectivity
// has been lost and then restored.
func (a *Allocation) SupportsDisconnectedClients(serverSupportsDisconnectedClients bool) bool {
if !serverSupportsDisconnectedClients {
return false
}

if a.Job != nil {
tg := a.Job.LookupTaskGroup(a.TaskGroup)
if tg != nil {
return tg.MaxClientDisconnect != nil
}
}

return false
}

// NextDelay returns a duration after which the allocation can be rescheduled.
// It is calculated according to the delay function and previous reschedule attempts.
func (a *Allocation) NextDelay() time.Duration {
Expand Down
5 changes: 0 additions & 5 deletions scheduler/generic_sched.go
Original file line number Diff line number Diff line change
Expand Up @@ -429,11 +429,6 @@ func (s *GenericScheduler) computeJobAllocs() error {
s.ctx.Plan().AppendAlloc(update, nil)
}

// Log reconnect updates. They will be pulled by the client when it reconnects.
for _, update := range results.reconnectUpdates {
s.logger.Trace("reconnecting alloc", "alloc_id", update.ID, "alloc_modify_index", update.AllocModifyIndex)
}

// Nothing remaining to do if placement is not required
if len(results.place)+len(results.destructiveUpdate) == 0 {
// If the job has been purged we don't have access to the job. Otherwise
Expand Down
10 changes: 1 addition & 9 deletions scheduler/reconcile_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,19 +224,11 @@ func (a allocSet) filterByTainted(taintedNodes map[string]*structs.Node, serverS
reconnecting = make(map[string]*structs.Allocation)
ignore = make(map[string]*structs.Allocation)

supportsDisconnectedClients := serverSupportsDisconnectedClients

for _, alloc := range a {

// make sure we don't apply any reconnect logic to task groups
// without max_client_disconnect
if alloc.Job != nil {
tg := alloc.Job.LookupTaskGroup(alloc.TaskGroup)
if tg != nil {
supportsDisconnectedClients = serverSupportsDisconnectedClients &&
tg.MaxClientDisconnect != nil
}
}
supportsDisconnectedClients := alloc.SupportsDisconnectedClients(serverSupportsDisconnectedClients)

reconnected := false
expired := false
Expand Down
9 changes: 8 additions & 1 deletion scheduler/scheduler_system.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,9 @@ func (s *SystemScheduler) computeJobAllocs() error {
live, term := structs.SplitTerminalAllocs(allocs)

// Diff the required and existing allocations
diff := diffSystemAllocs(s.job, s.nodes, s.notReadyNodes, tainted, live, term)
diff := diffSystemAllocs(s.job, s.nodes, s.notReadyNodes, tainted, live, term,
s.planner.ServersMeetMinimumVersion(minVersionMaxClientDisconnect, true))

s.logger.Debug("reconciled current state with desired state",
"place", len(diff.place), "update", len(diff.update),
"migrate", len(diff.migrate), "stop", len(diff.stop),
Expand All @@ -251,6 +253,10 @@ func (s *SystemScheduler) computeJobAllocs() error {
s.plan.AppendStoppedAlloc(e.Alloc, allocLost, structs.AllocClientStatusLost, "")
}

for _, e := range diff.disconnecting {
s.plan.AppendUnknownAlloc(e.Alloc)
}

// Attempt to do the upgrades in place
destructiveUpdates, inplaceUpdates := inplaceUpdate(s.ctx, s.eval, s.job, s.stack, diff.update)
diff.update = destructiveUpdates
Expand Down Expand Up @@ -508,6 +514,7 @@ func (s *SystemScheduler) canHandle(trigger string) bool {
case structs.EvalTriggerAllocStop:
case structs.EvalTriggerQueuedAllocs:
case structs.EvalTriggerScaling:
case structs.EvalTriggerReconnect:
default:
switch s.sysbatch {
case true:
Expand Down
Loading

0 comments on commit f5de802

Please sign in to comment.