Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migrate all allocs when draining a node #10411

Merged
merged 2 commits into from
Apr 21, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions nomad/drainer/drainer.go
Original file line number Diff line number Diff line change
Expand Up @@ -392,25 +392,25 @@ func (n *NodeDrainer) batchDrainAllocs(allocs []*structs.Allocation) (uint64, er
// affected jobs.
func (n *NodeDrainer) drainAllocs(future *structs.BatchFuture, allocs []*structs.Allocation) {
// Compute the effected jobs and make the transition map
jobs := make(map[string]*structs.Allocation, 4)
jobs := make(map[structs.NamespacedID]*structs.Allocation, 4)
transitions := make(map[string]*structs.DesiredTransition, len(allocs))
for _, alloc := range allocs {
transitions[alloc.ID] = &structs.DesiredTransition{
Migrate: helper.BoolToPtr(true),
}
jobs[alloc.JobID] = alloc
jobs[alloc.JobNamespacedID()] = alloc
}

evals := make([]*structs.Evaluation, 0, len(jobs))
now := time.Now().UTC().UnixNano()
for job, alloc := range jobs {
for _, alloc := range jobs {
evals = append(evals, &structs.Evaluation{
ID: uuid.Generate(),
Namespace: alloc.Namespace,
Priority: alloc.Job.Priority,
Type: alloc.Job.Type,
TriggeredBy: structs.EvalTriggerNodeDrain,
JobID: job,
JobID: alloc.JobID,
Status: structs.EvalStatusPending,
CreateTime: now,
ModifyTime: now,
Expand Down
129 changes: 129 additions & 0 deletions nomad/drainer_int_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -880,6 +880,135 @@ func TestDrainer_AllTypes_Deadline_GarbageCollectedNode(t *testing.T) {
require.Contains(node.Events[2].Details, drainer.NodeDrainEventDetailDeadlined)
}

// TestDrainer_MultipleNSes_ServiceOnly asserts that all jobs on an alloc, even
// when they belong to different namespaces and share the same ID
func TestDrainer_MultipleNSes_ServiceOnly(t *testing.T) {
t.Parallel()
require := require.New(t)

s1, cleanupS1 := TestServer(t, nil)
defer cleanupS1()
codec := rpcClient(t, s1)
testutil.WaitForLeader(t, s1.RPC)

// Create two nodes
n1, n2 := mock.Node(), mock.Node()
nodeReg := &structs.NodeRegisterRequest{
Node: n1,
WriteRequest: structs.WriteRequest{Region: "global"},
}
var nodeResp structs.NodeUpdateResponse
require.NoError(msgpackrpc.CallWithCodec(codec, "Node.Register", nodeReg, &nodeResp))

ns1, ns2 := mock.Namespace(), mock.Namespace()
nses := []*structs.Namespace{ns1, ns2}
nsReg := &structs.NamespaceUpsertRequest{
Namespaces: nses,
WriteRequest: structs.WriteRequest{Region: "global"},
}
var nsResp structs.GenericResponse
require.NoError(msgpackrpc.CallWithCodec(codec, "Namespace.UpsertNamespaces", nsReg, &nsResp))

for _, ns := range nses {
// Create a job that runs on just one
job := mock.Job()
job.ID = "example"
job.Name = "example"
job.Namespace = ns.Name
job.TaskGroups[0].Count = 1
req := &structs.JobRegisterRequest{
Job: job,
WriteRequest: structs.WriteRequest{
Region: "global",
Namespace: job.Namespace,
},
}

// Fetch the response
var resp structs.JobRegisterResponse
require.Nil(msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp))
require.NotZero(resp.Index)
}

// Wait for the two allocations to be placed
state := s1.State()
testutil.WaitForResult(func() (bool, error) {
iter, err := state.Allocs(nil)
if err != nil {
return false, err
}

count := 0
for iter.Next() != nil {
count++
}
if count != 2 {
return false, fmt.Errorf("expected %d allocs, found %d", 2, count)
}
return true, nil
}, func(err error) {
require.NoError(err)
})

// Create the second node
nodeReg = &structs.NodeRegisterRequest{
Node: n2,
WriteRequest: structs.WriteRequest{Region: "global"},
}
require.Nil(msgpackrpc.CallWithCodec(codec, "Node.Register", nodeReg, &nodeResp))

// Drain the first node
drainReq := &structs.NodeUpdateDrainRequest{
NodeID: n1.ID,
DrainStrategy: &structs.DrainStrategy{
DrainSpec: structs.DrainSpec{
Deadline: 10 * time.Minute,
},
},
WriteRequest: structs.WriteRequest{Region: "global"},
}
var drainResp structs.NodeDrainUpdateResponse
require.Nil(msgpackrpc.CallWithCodec(codec, "Node.UpdateDrain", drainReq, &drainResp))

// Wait for the allocs to be replaced
errCh := make(chan error, 2)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go allocPromoter(errCh, ctx, state, codec, n1.ID, s1.logger)
go allocPromoter(errCh, ctx, state, codec, n2.ID, s1.logger)

testutil.WaitForResult(func() (bool, error) {
allocs, err := state.AllocsByNode(nil, n2.ID)
if err != nil {
return false, err
}
return len(allocs) == 2, fmt.Errorf("got %d allocs", len(allocs))
}, func(err error) {
require.NoError(err)
})

// Check that the node drain is removed
testutil.WaitForResult(func() (bool, error) {
if err := checkAllocPromoter(errCh); err != nil {
return false, err
}
node, err := state.NodeByID(nil, n1.ID)
if err != nil {
return false, err
}
return node.DrainStrategy == nil, fmt.Errorf("has drain strategy still set")
}, func(err error) {
require.NoError(err)
})

// Check we got the right events
node, err := state.NodeByID(nil, n1.ID)
require.NoError(err)
// sometimes test gets a duplicate node drain complete event
require.GreaterOrEqualf(len(node.Events), 3, "unexpected number of events: %v", node.Events)
require.Equal(drainer.NodeDrainEventComplete, node.Events[2].Message)
}

// Test that transitions to force drain work.
func TestDrainer_Batch_TransitionToForce(t *testing.T) {
t.Parallel()
Expand Down
4 changes: 4 additions & 0 deletions nomad/structs/structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -9184,6 +9184,10 @@ func (a *Allocation) ConsulNamespace() string {
return a.Job.LookupTaskGroup(a.TaskGroup).Consul.GetNamespace()
}

func (a *Allocation) JobNamespacedID() NamespacedID {
return NewNamespacedID(a.JobID, a.Namespace)
}

// Index returns the index of the allocation. If the allocation is from a task
// group with count greater than 1, there will be multiple allocations for it.
func (a *Allocation) Index() uint {
Expand Down