Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

server: stop after client disconnect #7939

Merged
merged 13 commits into from
May 13, 2020
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 18 additions & 17 deletions api/tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -411,23 +411,24 @@ func (vm *VolumeMount) Canonicalize() {

// TaskGroup is the unit of scheduling.
type TaskGroup struct {
Name *string
Count *int
Constraints []*Constraint
Affinities []*Affinity
Tasks []*Task
Spreads []*Spread
Volumes map[string]*VolumeRequest
RestartPolicy *RestartPolicy
ReschedulePolicy *ReschedulePolicy
EphemeralDisk *EphemeralDisk
Update *UpdateStrategy
Migrate *MigrateStrategy
Networks []*NetworkResource
Meta map[string]string
Services []*Service
ShutdownDelay *time.Duration `mapstructure:"shutdown_delay"`
Scaling *ScalingPolicy
Name *string
Count *int
Constraints []*Constraint
Affinities []*Affinity
Tasks []*Task
Spreads []*Spread
Volumes map[string]*VolumeRequest
RestartPolicy *RestartPolicy
ReschedulePolicy *ReschedulePolicy
EphemeralDisk *EphemeralDisk
Update *UpdateStrategy
Migrate *MigrateStrategy
Networks []*NetworkResource
Meta map[string]string
Services []*Service
ShutdownDelay *time.Duration `mapstructure:"shutdown_delay"`
StopAfterClientDisconnect *time.Duration `mapstructure:"stop_after_client_disconnect"`
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the only new field, the rest are whitespace differences

Scaling *ScalingPolicy
}

// NewTaskGroup creates a new TaskGroup.
Expand Down
1 change: 1 addition & 0 deletions jobspec/parse_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ func parseGroups(result *api.Job, list *ast.ObjectList) error {
"service",
"volume",
"scaling",
"stop_after_client_disconnect",
}
if err := helper.CheckHCLKeys(listVal, valid); err != nil {
return multierror.Prefix(err, fmt.Sprintf("'%s' ->", n))
Expand Down
1 change: 1 addition & 0 deletions jobspec/parse_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@ func TestParse(t *testing.T) {
},
},
},
StopAfterClientDisconnect: helper.TimeToPtr(120 * time.Second),
ReschedulePolicy: &api.ReschedulePolicy{
Interval: helper.TimeToPtr(12 * time.Hour),
Attempts: helper.IntToPtr(5),
Expand Down
2 changes: 2 additions & 0 deletions jobspec/test-fixtures/basic.hcl
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,8 @@ job "binstore-storagelocker" {
}
}

stop_after_client_disconnect = "120s"

task "binstore" {
driver = "docker"
user = "bob"
Expand Down
2 changes: 1 addition & 1 deletion nomad/state/state_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -4623,7 +4623,7 @@ func (s *StateStore) updateSummaryWithAlloc(index uint64, alloc *structs.Allocat
}
case structs.AllocClientStatusFailed, structs.AllocClientStatusComplete:
default:
s.logger.Error("invalid old client status for allocatio",
s.logger.Error("invalid old client status for allocation",
"alloc_id", existingAlloc.ID, "client_status", existingAlloc.ClientStatus)
}
summaryChanged = true
Expand Down
79 changes: 79 additions & 0 deletions nomad/structs/structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -6516,6 +6516,19 @@ func (t *Template) Warnings() error {
return mErr.ErrorOrNil()
}

// AllocState records a single event that changes the state of the whole allocation
type AllocStateField uint8

const (
AllocStateFieldClientStatus AllocStateField = iota
)

type AllocState struct {
Field AllocStateField
Value string
Time time.Time
}

// Set of possible states for a task.
const (
TaskStatePending = "pending" // The task is waiting to be run.
Expand Down Expand Up @@ -8152,6 +8165,9 @@ type Allocation struct {
// TaskStates stores the state of each task,
TaskStates map[string]*TaskState

// AllocStates track meta data associated with changes to the state of the whole allocation, like becoming lost
AllocStates []*AllocState

// PreviousAllocation is the allocation that this allocation is replacing
PreviousAllocation string

Expand Down Expand Up @@ -8420,6 +8436,49 @@ func (a *Allocation) NextRescheduleTime() (time.Time, bool) {
return nextRescheduleTime, rescheduleEligible
}

// ShouldClientStop tests an alloc for StopAfterClientDisconnect configuration
func (a *Allocation) ShouldClientStop() bool {
tg := a.Job.LookupTaskGroup(a.TaskGroup)
if tg == nil ||
tg.StopAfterClientDisconnect == nil ||
*tg.StopAfterClientDisconnect == 0*time.Nanosecond {
return false
}
return true
}

// WaitClientStop uses the reschedule delay mechanism to block rescheduling until
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I forget whether this was in the RFC or not, but either way this is a good catch. We would have had some really wild behavior without this.

Also, like how this API gives us a time.Time rather than ticking the clock over; not having to wait in tests is 👍

// StopAfterClientDisconnect's block interval passes
func (a *Allocation) WaitClientStop() time.Time {
tg := a.Job.LookupTaskGroup(a.TaskGroup)

// An alloc can only be marked lost once, so use the first lost transition
var t time.Time
for _, s := range a.AllocStates {
if s.Field == AllocStateFieldClientStatus &&
s.Value == AllocClientStatusLost {
t = s.Time
break
}
}

// On the first pass, the alloc hasn't been marked lost yet, and so we start
// counting from now
if t.IsZero() {
t = time.Now().UTC()
}

// Find the max kill timeout
kill := DefaultKillTimeout
for _, t := range tg.Tasks {
if t.KillTimeout > kill {
kill = t.KillTimeout
}
}

return t.Add(*tg.StopAfterClientDisconnect + kill)
}

// NextDelay returns a duration after which the allocation can be rescheduled.
// It is calculated according to the delay function and previous reschedule attempts.
func (a *Allocation) NextDelay() time.Duration {
Expand Down Expand Up @@ -8476,6 +8535,24 @@ func (a *Allocation) Terminated() bool {
return false
}

// SetStopped updates the allocation in place to a DesiredStatus stop, with the ClientStatus
func (a *Allocation) SetStop(clientStatus, clientDesc string) {
a.DesiredStatus = AllocDesiredStatusStop
a.ClientStatus = clientStatus
a.ClientDescription = clientDesc
a.AppendState(AllocStateFieldClientStatus, clientStatus)
}

// AppendState creates and appends an AllocState entry recording the time of the state
// transition. Used to mark the transition to lost
func (a *Allocation) AppendState(field AllocStateField, value string) {
a.AllocStates = append(a.AllocStates, &AllocState{
Field: field,
Value: value,
Time: time.Now().UTC(),
})
}

// RanSuccessfully returns whether the client has ran the allocation and all
// tasks finished successfully. Critically this function returns whether the
// allocation has ran to completion and not just that the alloc has converged to
Expand Down Expand Up @@ -9384,6 +9461,8 @@ func (p *Plan) AppendStoppedAlloc(alloc *Allocation, desiredDesc, clientStatus s
newAlloc.ClientStatus = clientStatus
}

newAlloc.AppendState(AllocStateFieldClientStatus, clientStatus)

node := alloc.NodeID
existing := p.NodeUpdate[node]
p.NodeUpdate[node] = append(existing, newAlloc)
Expand Down
69 changes: 68 additions & 1 deletion nomad/structs/structs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3595,13 +3595,21 @@ func TestPlan_AppendStoppedAllocAppendsAllocWithUpdatedAttrs(t *testing.T) {

plan.AppendStoppedAlloc(alloc, desiredDesc, AllocClientStatusLost)

appendedAlloc := plan.NodeUpdate[alloc.NodeID][0]
expectedAlloc := new(Allocation)
*expectedAlloc = *alloc
expectedAlloc.DesiredDescription = desiredDesc
expectedAlloc.DesiredStatus = AllocDesiredStatusStop
expectedAlloc.ClientStatus = AllocClientStatusLost
expectedAlloc.Job = nil
expectedAlloc.AllocStates = []*AllocState{{
Field: AllocStateFieldClientStatus,
Value: "lost",
}}

// This value is set to time.Now() in AppendStoppedAlloc, so clear it
appendedAlloc := plan.NodeUpdate[alloc.NodeID][0]
appendedAlloc.AllocStates[0].Time = time.Time{}

assert.Equal(t, expectedAlloc, appendedAlloc)
assert.Equal(t, alloc.Job, plan.Job)
}
Expand Down Expand Up @@ -4372,6 +4380,65 @@ func TestAllocation_NextDelay(t *testing.T) {

}

func TestAllocation_WaitClientStop(t *testing.T) {
type testCase struct {
desc string
stop time.Duration
status string
expectedShould bool
expectedRescheduleTime time.Time
}
now := time.Now().UTC()
testCases := []testCase{
{
desc: "running",
stop: 2 * time.Second,
status: AllocClientStatusRunning,
expectedShould: true,
},
{
desc: "no stop_after_client_disconnect",
status: AllocClientStatusLost,
expectedShould: false,
},
{
desc: "stop",
status: AllocClientStatusLost,
stop: 2 * time.Second,
expectedShould: true,
expectedRescheduleTime: now.Add((2 + 5) * time.Second),
},
}
for _, tc := range testCases {
t.Run(tc.desc, func(t *testing.T) {
j := testJob()
a := &Allocation{
ClientStatus: tc.status,
Job: j,
TaskStates: map[string]*TaskState{},
}

if tc.status == AllocClientStatusLost {
a.AppendState(AllocStateFieldClientStatus, AllocClientStatusLost)
}

j.TaskGroups[0].StopAfterClientDisconnect = &tc.stop
a.TaskGroup = j.TaskGroups[0].Name

require.Equal(t, tc.expectedShould, a.ShouldClientStop())

if !tc.expectedShould || tc.status != AllocClientStatusLost {
return
}

// the reschedTime is close to the expectedRescheduleTime
reschedTime := a.WaitClientStop()
e := reschedTime.Unix() - tc.expectedRescheduleTime.Unix()
require.Less(t, e, int64(2))
})
}
}

func TestAllocation_Canonicalize_Old(t *testing.T) {
alloc := MockAlloc()
alloc.AllocatedResources = nil
Expand Down
8 changes: 5 additions & 3 deletions scheduler/generic_sched.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,8 +259,10 @@ func (s *GenericScheduler) process() (bool, error) {
// If there are failed allocations, we need to create a blocked evaluation
// to place the failed allocations when resources become available. If the
// current evaluation is already a blocked eval, we reuse it by submitting
// a new eval to the planner in createBlockedEval
if s.eval.Status != structs.EvalStatusBlocked && len(s.failedTGAllocs) != 0 && s.blocked == nil {
// a new eval to the planner in createBlockedEval. If the current eval is
// pending with WaitUntil set, it's delayed rather than blocked.
if s.eval.Status != structs.EvalStatusBlocked && len(s.failedTGAllocs) != 0 && s.blocked == nil &&
s.eval.WaitUntil.IsZero() {
if err := s.createBlockedEval(false); err != nil {
s.logger.Error("failed to make blocked eval", "error", err)
return false, err
Expand Down Expand Up @@ -338,7 +340,7 @@ func (s *GenericScheduler) computeJobAllocs() error {
}

// Update the allocations which are in pending/running state on tainted
// nodes to lost
// nodes to lost, but only if the scheduler has already marked them
updateNonTerminalAllocsToLost(s.plan, tainted, allocs)

reconciler := NewAllocReconciler(s.logger,
Expand Down
Loading