Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

disconnected clients: Support operator manual interventions #12436

15 changes: 3 additions & 12 deletions client/allocrunner/alloc_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -784,9 +784,8 @@ func (ar *allocRunner) NetworkStatus() *structs.AllocNetworkStatus {
return ar.state.NetworkStatus.Copy()
}

// setIndexes is a helper for forcing a set of server side indexes
// on the alloc runner. This is used during reconnect when the task
// has been marked unknown by the server.
// setIndexes is a helper for forcing alloc state on the alloc runner. This is
// used during reconnect when the task has been marked unknown by the server.
func (ar *allocRunner) setIndexes(update *structs.Allocation) {
ar.allocLock.Lock()
defer ar.allocLock.Unlock()
Expand Down Expand Up @@ -1253,15 +1252,13 @@ func (ar *allocRunner) Signal(taskName, signal string) error {

// Reconnect logs a reconnect event for each task in the allocation and syncs the current alloc state with the server.
func (ar *allocRunner) Reconnect(update *structs.Allocation) (err error) {
ar.logger.Trace("reconnecting alloc", "alloc_id", update.ID, "alloc_modify_index", update.AllocModifyIndex)

event := structs.NewTaskEvent(structs.TaskClientReconnected)
event.Time = time.Now().UnixNano()
for _, tr := range ar.tasks {
tr.AppendEvent(event)
}

// Update the client alloc with the server client side indexes.
// Update the client alloc with the server side indexes.
ar.setIndexes(update)

// Calculate alloc state to get the final state with the new events.
Expand All @@ -1274,12 +1271,6 @@ func (ar *allocRunner) Reconnect(update *structs.Allocation) (err error) {
// Build the client allocation
alloc := ar.clientAlloc(states)

// Don't destroy until after we've appended the reconnect event.
if update.DesiredStatus != structs.AllocDesiredStatusRun {
ar.Shutdown()
return
}

// Update the client state store.
err = ar.stateUpdater.PutAllocation(alloc)
if err != nil {
Expand Down
83 changes: 53 additions & 30 deletions nomad/node_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -1151,6 +1151,7 @@ func (n *Node) UpdateAlloc(args *structs.AllocUpdateRequest, reply *structs.Gene
var evals []*structs.Evaluation

for _, allocToUpdate := range args.Alloc {
evalTriggerBy := ""
allocToUpdate.ModifyTime = now.UTC().UnixNano()

alloc, _ := n.srv.State().AllocByID(nil, allocToUpdate.ID)
Expand All @@ -1162,51 +1163,73 @@ func (n *Node) UpdateAlloc(args *structs.AllocUpdateRequest, reply *structs.Gene
continue
}

// if the job has been purged, this will always return error
job, err := n.srv.State().JobByID(nil, alloc.Namespace, alloc.JobID)
var job *structs.Job
var jobType string
var jobPriority int

job, err = n.srv.State().JobByID(nil, alloc.Namespace, alloc.JobID)
if err != nil {
n.logger.Debug("UpdateAlloc unable to find job", "job", alloc.JobID, "error", err)
continue
DerekStrickland marked this conversation as resolved.
Show resolved Hide resolved
}

// If the job is nil it means it has been de-registered.
if job == nil {
n.logger.Debug("UpdateAlloc unable to find job", "job", alloc.JobID)
continue
jobType = alloc.Job.Type
jobPriority = alloc.Job.Priority
evalTriggerBy = structs.EvalTriggerJobDeregister
allocToUpdate.DesiredStatus = structs.AllocDesiredStatusStop
n.logger.Debug("UpdateAlloc unable to find job - shutting down alloc", "job", alloc.JobID)
}

taskGroup := job.LookupTaskGroup(alloc.TaskGroup)
if taskGroup == nil {
continue
var taskGroup *structs.TaskGroup
if job != nil {
jobType = job.Type
jobPriority = job.Priority
taskGroup = job.LookupTaskGroup(alloc.TaskGroup)
}

evalTriggerBy := ""
var eval *structs.Evaluation
// Add an evaluation if this is a failed alloc that is eligible for rescheduling
if allocToUpdate.ClientStatus == structs.AllocClientStatusFailed &&
alloc.FollowupEvalID == "" &&
alloc.RescheduleEligible(taskGroup.ReschedulePolicy, now) {
// If we cannot find the task group for a failed alloc we cannot continue, unless it is an orphan.
if evalTriggerBy != structs.EvalTriggerJobDeregister &&
allocToUpdate.ClientStatus == structs.AllocClientStatusFailed &&
alloc.FollowupEvalID == "" {

if taskGroup == nil {
n.logger.Debug("UpdateAlloc unable to find task group for job", "job", alloc.JobID, "alloc", alloc.ID, "task_group", alloc.TaskGroup)
continue
}

evalTriggerBy = structs.EvalTriggerRetryFailedAlloc
// Set trigger by failed if not an orphan.
if alloc.RescheduleEligible(taskGroup.ReschedulePolicy, now) {
evalTriggerBy = structs.EvalTriggerRetryFailedAlloc
}
}

//Add an evaluation if this is a reconnecting allocation.
if alloc.ClientStatus == structs.AllocClientStatusUnknown {
var eval *structs.Evaluation
// If unknown, and not an orphan, set the trigger by.
if evalTriggerBy != structs.EvalTriggerJobDeregister &&
alloc.ClientStatus == structs.AllocClientStatusUnknown {
evalTriggerBy = structs.EvalTriggerReconnect
}

if evalTriggerBy != "" {
eval = &structs.Evaluation{
ID: uuid.Generate(),
Namespace: alloc.Namespace,
TriggeredBy: evalTriggerBy,
JobID: alloc.JobID,
Type: job.Type,
Priority: job.Priority,
Status: structs.EvalStatusPending,
CreateTime: now.UTC().UnixNano(),
ModifyTime: now.UTC().UnixNano(),
}
evals = append(evals, eval)
// If we weren't able to determine one of our expected eval triggers,
// continue and don't create an eval.
if evalTriggerBy == "" {
continue
}

eval = &structs.Evaluation{
ID: uuid.Generate(),
Namespace: alloc.Namespace,
TriggeredBy: evalTriggerBy,
JobID: alloc.JobID,
Type: jobType,
Priority: jobPriority,
Status: structs.EvalStatusPending,
CreateTime: now.UTC().UnixNano(),
ModifyTime: now.UTC().UnixNano(),
}
evals = append(evals, eval)
}

// Add this to the batch
Expand Down Expand Up @@ -1254,6 +1277,7 @@ func (n *Node) UpdateAlloc(args *structs.AllocUpdateRequest, reply *structs.Gene

// batchUpdate is used to update all the allocations
func (n *Node) batchUpdate(future *structs.BatchFuture, updates []*structs.Allocation, evals []*structs.Evaluation) {
var mErr multierror.Error
// Group pending evals by jobID to prevent creating unnecessary evals
evalsByJobId := make(map[structs.NamespacedID]struct{})
var trimmedEvals []*structs.Evaluation
Expand Down Expand Up @@ -1283,7 +1307,6 @@ func (n *Node) batchUpdate(future *structs.BatchFuture, updates []*structs.Alloc
}

// Commit this update via Raft
var mErr multierror.Error
_, index, err := n.srv.raftApply(structs.AllocClientUpdateRequestType, batch)
if err != nil {
n.logger.Error("alloc update failed", "error", err)
Expand Down
175 changes: 175 additions & 0 deletions nomad/node_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3765,3 +3765,178 @@ func TestClientEndpoint_ShouldCreateNodeEval(t *testing.T) {
})
}
}

func TestClientEndpoint_UpdateAlloc_Evals_ByTrigger(t *testing.T) {
t.Parallel()

type testCase struct {
name string
clientStatus string
serverClientStatus string
triggerBy string
missingJob bool
missingAlloc bool
invalidTaskGroup bool
}

testCases := []testCase{
{
name: "failed-alloc",
clientStatus: structs.AllocClientStatusFailed,
serverClientStatus: structs.AllocClientStatusRunning,
triggerBy: structs.EvalTriggerRetryFailedAlloc,
missingJob: false,
missingAlloc: false,
invalidTaskGroup: false,
},
{
name: "unknown-alloc",
clientStatus: structs.AllocClientStatusRunning,
serverClientStatus: structs.AllocClientStatusUnknown,
triggerBy: structs.EvalTriggerReconnect,
missingJob: false,
missingAlloc: false,
invalidTaskGroup: false,
},
{
name: "orphaned-unknown-alloc",
clientStatus: structs.AllocClientStatusRunning,
serverClientStatus: structs.AllocClientStatusUnknown,
triggerBy: structs.EvalTriggerJobDeregister,
missingJob: true,
missingAlloc: false,
invalidTaskGroup: false,
},
{
name: "running-job",
clientStatus: structs.AllocClientStatusRunning,
serverClientStatus: structs.AllocClientStatusRunning,
triggerBy: "",
missingJob: false,
missingAlloc: false,
invalidTaskGroup: false,
},
{
name: "complete-job",
clientStatus: structs.AllocClientStatusComplete,
serverClientStatus: structs.AllocClientStatusComplete,
triggerBy: "",
missingJob: false,
missingAlloc: false,
invalidTaskGroup: false,
},
{
name: "no-alloc-at-server",
clientStatus: structs.AllocClientStatusUnknown,
serverClientStatus: "",
triggerBy: "",
missingJob: false,
missingAlloc: true,
invalidTaskGroup: false,
},
{
name: "invalid-task-group",
clientStatus: structs.AllocClientStatusUnknown,
serverClientStatus: structs.AllocClientStatusRunning,
triggerBy: "",
missingJob: false,
missingAlloc: false,
invalidTaskGroup: true,
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
s1, cleanupS1 := TestServer(t, func(c *Config) {
// Disabling scheduling in this test so that we can
// ensure that the state store doesn't accumulate more evals
// than what we expect the unit test to add
c.NumSchedulers = 0
})

defer cleanupS1()
codec := rpcClient(t, s1)
testutil.WaitForLeader(t, s1.RPC)

// Create the register request
node := mock.Node()
reg := &structs.NodeRegisterRequest{
Node: node,
WriteRequest: structs.WriteRequest{Region: "global"},
}

// Fetch the response
var nodeResp structs.GenericResponse
err := msgpackrpc.CallWithCodec(codec, "Node.Register", reg, &nodeResp)
require.NoError(t, err)

fsmState := s1.fsm.State()

job := mock.Job()
job.ID = tc.name + "-test-job"

if !tc.missingJob {
err = fsmState.UpsertJob(structs.MsgTypeTestSetup, 101, job)
require.NoError(t, err)
}

serverAlloc := mock.Alloc()
serverAlloc.JobID = job.ID
serverAlloc.NodeID = node.ID
serverAlloc.ClientStatus = tc.serverClientStatus
serverAlloc.TaskGroup = job.TaskGroups[0].Name

// Create the incoming client alloc.
clientAlloc := serverAlloc.Copy()
clientAlloc.ClientStatus = tc.clientStatus

err = fsmState.UpsertJobSummary(99, mock.JobSummary(serverAlloc.JobID))
require.NoError(t, err)

if tc.invalidTaskGroup {
serverAlloc.TaskGroup = "invalid"
}

if !tc.missingAlloc {
err = fsmState.UpsertAllocs(structs.MsgTypeTestSetup, 100, []*structs.Allocation{serverAlloc})
require.NoError(t, err)
}

updateReq := &structs.AllocUpdateRequest{
Alloc: []*structs.Allocation{clientAlloc},
WriteRequest: structs.WriteRequest{Region: "global"},
}

var nodeAllocResp structs.NodeAllocsResponse
err = msgpackrpc.CallWithCodec(codec, "Node.UpdateAlloc", updateReq, &nodeAllocResp)
require.NoError(t, err)
require.NotEqual(t, uint64(0), nodeAllocResp.Index)

// If no eval should be created validate, none were and return.
if tc.triggerBy == "" {
evaluations, err := fsmState.EvalsByJob(nil, job.Namespace, job.ID)
require.NoError(t, err)
require.Len(t, evaluations, 0)
return
}

// Lookup the alloc
updatedAlloc, err := fsmState.AllocByID(nil, serverAlloc.ID)
require.NoError(t, err)
require.Equal(t, tc.clientStatus, updatedAlloc.ClientStatus)

// Assert that exactly one eval with test case TriggeredBy exists
evaluations, err := fsmState.EvalsByJob(nil, job.Namespace, job.ID)
require.NoError(t, err)
require.Equal(t, 1, len(evaluations))
DerekStrickland marked this conversation as resolved.
Show resolved Hide resolved
foundCount := 0
for _, resultEval := range evaluations {
if resultEval.TriggeredBy == tc.triggerBy && resultEval.WaitUntil.IsZero() {
foundCount++
}
}
require.Equal(t, 1, foundCount, "Should create exactly one eval for trigger by", tc.triggerBy)
})
}

}
Loading