Skip to content

Commit

Permalink
Node Drain events and Node Events (#8980)
Browse files Browse the repository at this point in the history
Deployment status updates

handle deployment status updates (paused, failed, resume)

deployment alloc health
  • Loading branch information
drewbailey committed Oct 1, 2020
1 parent 54b4581 commit 2624098
Show file tree
Hide file tree
Showing 12 changed files with 631 additions and 51 deletions.
5 changes: 3 additions & 2 deletions nomad/deploymentwatcher/testutil_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package deploymentwatcher

import (
"context"
"reflect"
"strings"
"sync"
Expand Down Expand Up @@ -95,7 +96,7 @@ func (m *mockBackend) UpsertJob(job *structs.Job) (uint64, error) {
func (m *mockBackend) UpdateDeploymentStatus(u *structs.DeploymentStatusUpdateRequest) (uint64, error) {
m.Called(u)
i := m.nextIndex()
return i, m.state.UpdateDeploymentStatus(i, u)
return i, m.state.UpdateDeploymentStatus(context.Background(), i, u)
}

// matchDeploymentStatusUpdateConfig is used to configure the matching
Expand Down Expand Up @@ -179,7 +180,7 @@ func matchDeploymentPromoteRequest(c *matchDeploymentPromoteRequestConfig) func(
func (m *mockBackend) UpdateDeploymentAllocHealth(req *structs.ApplyDeploymentAllocHealthRequest) (uint64, error) {
m.Called(req)
i := m.nextIndex()
return i, m.state.UpdateDeploymentAllocHealth(i, req)
return i, m.state.UpdateDeploymentAllocHealth(context.Background(), i, req)
}

// matchDeploymentAllocHealthRequestConfig is used to configure the matching
Expand Down
40 changes: 25 additions & 15 deletions nomad/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ func (n *nomadFSM) Apply(log *raft.Log) interface{} {
case structs.NodeUpdateStatusRequestType:
return n.applyStatusUpdate(buf[1:], log.Index)
case structs.NodeUpdateDrainRequestType:
return n.applyDrainUpdate(buf[1:], log.Index)
return n.applyDrainUpdate(msgType, buf[1:], log.Index)
case structs.JobRegisterRequestType:
return n.applyUpsertJob(buf[1:], log.Index)
case structs.JobDeregisterRequestType:
Expand All @@ -228,11 +228,11 @@ func (n *nomadFSM) Apply(log *raft.Log) interface{} {
case structs.ApplyPlanResultsRequestType:
return n.applyPlanResults(buf[1:], log.Index)
case structs.DeploymentStatusUpdateRequestType:
return n.applyDeploymentStatusUpdate(buf[1:], log.Index)
return n.applyDeploymentStatusUpdate(msgType, buf[1:], log.Index)
case structs.DeploymentPromoteRequestType:
return n.applyDeploymentPromotion(buf[1:], log.Index)
return n.applyDeploymentPromotion(msgType, buf[1:], log.Index)
case structs.DeploymentAllocHealthRequestType:
return n.applyDeploymentAllocHealth(buf[1:], log.Index)
return n.applyDeploymentAllocHealth(msgType, buf[1:], log.Index)
case structs.DeploymentDeleteRequestType:
return n.applyDeploymentDelete(buf[1:], log.Index)
case structs.JobStabilityRequestType:
Expand All @@ -250,7 +250,7 @@ func (n *nomadFSM) Apply(log *raft.Log) interface{} {
case structs.AutopilotRequestType:
return n.applyAutopilotUpdate(buf[1:], log.Index)
case structs.UpsertNodeEventsType:
return n.applyUpsertNodeEvent(buf[1:], log.Index)
return n.applyUpsertNodeEvent(msgType, buf[1:], log.Index)
case structs.JobBatchDeregisterRequestType:
return n.applyBatchDeregisterJob(buf[1:], log.Index)
case structs.AllocUpdateDesiredTransitionRequestType:
Expand Down Expand Up @@ -402,13 +402,15 @@ func (n *nomadFSM) applyStatusUpdate(buf []byte, index uint64) interface{} {
return nil
}

func (n *nomadFSM) applyDrainUpdate(buf []byte, index uint64) interface{} {
func (n *nomadFSM) applyDrainUpdate(reqType structs.MessageType, buf []byte, index uint64) interface{} {
defer metrics.MeasureSince([]string{"nomad", "fsm", "node_drain_update"}, time.Now())
var req structs.NodeUpdateDrainRequest
if err := structs.Decode(buf, &req); err != nil {
panic(fmt.Errorf("failed to decode request: %v", err))
}

ctx := context.WithValue(context.Background(), state.CtxMsgType, reqType)

// COMPAT Remove in version 0.10
// As part of Nomad 0.8 we have deprecated the drain boolean in favor of a
// drain strategy but we need to handle the upgrade path where the Raft log
Expand All @@ -423,7 +425,7 @@ func (n *nomadFSM) applyDrainUpdate(buf []byte, index uint64) interface{} {
}
}

if err := n.state.UpdateNodeDrain(index, req.NodeID, req.DrainStrategy, req.MarkEligible, req.UpdatedAt, req.NodeEvent); err != nil {
if err := n.state.UpdateNodeDrainCtx(ctx, index, req.NodeID, req.DrainStrategy, req.MarkEligible, req.UpdatedAt, req.NodeEvent); err != nil {
n.logger.Error("UpdateNodeDrain failed", "error", err)
return err
}
Expand Down Expand Up @@ -874,14 +876,16 @@ func (n *nomadFSM) applyReconcileSummaries(buf []byte, index uint64) interface{}
}

// applyUpsertNodeEvent tracks the given node events.
func (n *nomadFSM) applyUpsertNodeEvent(buf []byte, index uint64) interface{} {
func (n *nomadFSM) applyUpsertNodeEvent(msgType structs.MessageType, buf []byte, index uint64) interface{} {
defer metrics.MeasureSince([]string{"nomad", "fsm", "upsert_node_events"}, time.Now())
var req structs.EmitNodeEventsRequest
if err := structs.Decode(buf, &req); err != nil {
panic(fmt.Errorf("failed to decode EmitNodeEventsRequest: %v", err))
}

if err := n.state.UpsertNodeEvents(index, req.NodeEvents); err != nil {
ctx := context.WithValue(context.Background(), state.CtxMsgType, msgType)

if err := n.state.UpsertNodeEventsCtx(ctx, index, req.NodeEvents); err != nil {
n.logger.Error("failed to add node events", "error", err)
return err
}
Expand Down Expand Up @@ -972,14 +976,16 @@ func (n *nomadFSM) applyPlanResults(buf []byte, index uint64) interface{} {

// applyDeploymentStatusUpdate is used to update the status of an existing
// deployment
func (n *nomadFSM) applyDeploymentStatusUpdate(buf []byte, index uint64) interface{} {
func (n *nomadFSM) applyDeploymentStatusUpdate(msgType structs.MessageType, buf []byte, index uint64) interface{} {
defer metrics.MeasureSince([]string{"nomad", "fsm", "apply_deployment_status_update"}, time.Now())
var req structs.DeploymentStatusUpdateRequest
if err := structs.Decode(buf, &req); err != nil {
panic(fmt.Errorf("failed to decode request: %v", err))
}

if err := n.state.UpdateDeploymentStatus(index, &req); err != nil {
ctx := context.WithValue(context.Background(), state.CtxMsgType, msgType)

if err := n.state.UpdateDeploymentStatus(ctx, index, &req); err != nil {
n.logger.Error("UpsertDeploymentStatusUpdate failed", "error", err)
return err
}
Expand All @@ -989,14 +995,16 @@ func (n *nomadFSM) applyDeploymentStatusUpdate(buf []byte, index uint64) interfa
}

// applyDeploymentPromotion is used to promote canaries in a deployment
func (n *nomadFSM) applyDeploymentPromotion(buf []byte, index uint64) interface{} {
func (n *nomadFSM) applyDeploymentPromotion(msgType structs.MessageType, buf []byte, index uint64) interface{} {
defer metrics.MeasureSince([]string{"nomad", "fsm", "apply_deployment_promotion"}, time.Now())
var req structs.ApplyDeploymentPromoteRequest
if err := structs.Decode(buf, &req); err != nil {
panic(fmt.Errorf("failed to decode request: %v", err))
}

if err := n.state.UpdateDeploymentPromotion(index, &req); err != nil {
ctx := context.WithValue(context.Background(), state.CtxMsgType, msgType)

if err := n.state.UpdateDeploymentPromotion(ctx, index, &req); err != nil {
n.logger.Error("UpsertDeploymentPromotion failed", "error", err)
return err
}
Expand All @@ -1007,14 +1015,16 @@ func (n *nomadFSM) applyDeploymentPromotion(buf []byte, index uint64) interface{

// applyDeploymentAllocHealth is used to set the health of allocations as part
// of a deployment
func (n *nomadFSM) applyDeploymentAllocHealth(buf []byte, index uint64) interface{} {
func (n *nomadFSM) applyDeploymentAllocHealth(msgType structs.MessageType, buf []byte, index uint64) interface{} {
defer metrics.MeasureSince([]string{"nomad", "fsm", "apply_deployment_alloc_health"}, time.Now())
var req structs.ApplyDeploymentAllocHealthRequest
if err := structs.Decode(buf, &req); err != nil {
panic(fmt.Errorf("failed to decode request: %v", err))
}

if err := n.state.UpdateDeploymentAllocHealth(index, &req); err != nil {
ctx := context.WithValue(context.Background(), state.CtxMsgType, msgType)

if err := n.state.UpdateDeploymentAllocHealth(ctx, index, &req); err != nil {
n.logger.Error("UpsertDeploymentAllocHealth failed", "error", err)
return err
}
Expand Down
191 changes: 191 additions & 0 deletions nomad/state/deployment_event_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,191 @@
package state

import (
"context"
"testing"
"time"

"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/stream"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/stretchr/testify/require"
)

func TestDeploymentEventFromChanges(t *testing.T) {
t.Parallel()
s := TestStateStoreCfg(t, TestStateStorePublisher(t))
defer s.StopEventPublisher()

// setup
setupTx := s.db.WriteTxn(10)

j := mock.Job()
e := mock.Eval()
e.JobID = j.ID

d := mock.Deployment()
d.JobID = j.ID

require.NoError(t, s.upsertJobImpl(10, j, false, setupTx))
require.NoError(t, s.upsertDeploymentImpl(10, d, setupTx))

setupTx.Txn.Commit()

ctx := context.WithValue(context.Background(), CtxMsgType, structs.DeploymentStatusUpdateRequestType)

req := &structs.DeploymentStatusUpdateRequest{
DeploymentUpdate: &structs.DeploymentStatusUpdate{
DeploymentID: d.ID,
Status: structs.DeploymentStatusPaused,
StatusDescription: structs.DeploymentStatusDescriptionPaused,
},
Eval: e,
// Exlude Job and assert its added
}

require.NoError(t, s.UpdateDeploymentStatus(ctx, 100, req))

events := WaitForEvents(t, s, 100, 1, 1*time.Second)
require.Len(t, events, 1)

got := events[0]
require.Equal(t, uint64(100), got.Index)
require.Equal(t, d.ID, got.Key)

de := got.Payload.(*DeploymentEvent)
require.Equal(t, structs.DeploymentStatusPaused, de.Deployment.Status)
require.Equal(t, j, de.Job)

}

func TestDeploymentEventFromChanges_Promotion(t *testing.T) {
t.Parallel()
s := TestStateStoreCfg(t, TestStateStorePublisher(t))
defer s.StopEventPublisher()

// setup
setupTx := s.db.WriteTxn(10)

j := mock.Job()
tg1 := j.TaskGroups[0]
tg2 := tg1.Copy()
tg2.Name = "foo"
j.TaskGroups = append(j.TaskGroups, tg2)
require.NoError(t, s.upsertJobImpl(10, j, false, setupTx))

d := mock.Deployment()
d.StatusDescription = structs.DeploymentStatusDescriptionRunningNeedsPromotion
d.JobID = j.ID
d.TaskGroups = map[string]*structs.DeploymentState{
"web": {
DesiredTotal: 10,
DesiredCanaries: 1,
},
"foo": {
DesiredTotal: 10,
DesiredCanaries: 1,
},
}
require.NoError(t, s.upsertDeploymentImpl(10, d, setupTx))

// create set of allocs
c1 := mock.Alloc()
c1.JobID = j.ID
c1.DeploymentID = d.ID
d.TaskGroups[c1.TaskGroup].PlacedCanaries = append(d.TaskGroups[c1.TaskGroup].PlacedCanaries, c1.ID)
c1.DeploymentStatus = &structs.AllocDeploymentStatus{
Healthy: helper.BoolToPtr(true),
}
c2 := mock.Alloc()
c2.JobID = j.ID
c2.DeploymentID = d.ID
d.TaskGroups[c2.TaskGroup].PlacedCanaries = append(d.TaskGroups[c2.TaskGroup].PlacedCanaries, c2.ID)
c2.TaskGroup = tg2.Name
c2.DeploymentStatus = &structs.AllocDeploymentStatus{
Healthy: helper.BoolToPtr(true),
}

require.NoError(t, s.upsertAllocsImpl(10, []*structs.Allocation{c1, c2}, setupTx))

// commit setup transaction
setupTx.Txn.Commit()

e := mock.Eval()
// Request to promote canaries
ctx := context.WithValue(context.Background(), CtxMsgType, structs.DeploymentPromoteRequestType)
req := &structs.ApplyDeploymentPromoteRequest{
DeploymentPromoteRequest: structs.DeploymentPromoteRequest{
DeploymentID: d.ID,
All: true,
},
Eval: e,
}

require.NoError(t, s.UpdateDeploymentPromotion(ctx, 100, req))

events := WaitForEvents(t, s, 100, 1, 1*time.Second)
require.Len(t, events, 1)

got := events[0]
require.Equal(t, uint64(100), got.Index)
require.Equal(t, d.ID, got.Key)

de := got.Payload.(*DeploymentEvent)
require.Equal(t, structs.DeploymentStatusRunning, de.Deployment.Status)
require.Len(t, de.Allocs, 2)

}

func WaitForEvents(t *testing.T, s *StateStore, index uint64, want int, timeout time.Duration) []stream.Event {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

go func() {
select {
case <-ctx.Done():
return
case <-time.After(timeout):
require.Fail(t, "timeout waiting for events")
}
}()

maxAttempts := 10
for {
got := EventsForIndex(t, s, index)
if len(got) == want {
return got
}
maxAttempts--
if maxAttempts == 0 {
require.Fail(t, "reached max attempts waiting for desired event count")
}
time.Sleep(10 * time.Millisecond)
}
}

func EventsForIndex(t *testing.T, s *StateStore, index uint64) []stream.Event {
pub, err := s.EventPublisher()
require.NoError(t, err)

sub, err := pub.Subscribe(&stream.SubscribeRequest{
Topics: map[stream.Topic][]string{
"*": []string{"*"},
},
Index: index,
})
defer sub.Unsubscribe()

require.NoError(t, err)

var events []stream.Event
for {
e, err := sub.NextNoBlock()
require.NoError(t, err)
if e == nil {
break
}
events = append(events, e...)
}
return events
}
Loading

0 comments on commit 2624098

Please sign in to comment.