Skip to content

Commit

Permalink
Events/deployment events (#9004)
Browse files Browse the repository at this point in the history
* Node Drain events and Node Events (#8980)

Deployment status updates

handle deployment status updates (paused, failed, resume)

deployment alloc health

generate events from apply plan result

txn err check, slim down deployment event

one ndjson line per index

* consolidate down to node event + type

* fix UpdateDeploymentAllocHealth test invocations

* fix test
  • Loading branch information
drewbailey committed Oct 8, 2020
1 parent 19ba709 commit f0f1495
Show file tree
Hide file tree
Showing 23 changed files with 842 additions and 156 deletions.
12 changes: 6 additions & 6 deletions command/agent/event_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,14 +79,14 @@ func TestEventStream_QueryParse(t *testing.T) {
desc: "all topics and keys specified",
query: "?topic=*:*",
want: map[stream.Topic][]string{
"*": []string{"*"},
"*": {"*"},
},
},
{
desc: "all topics and keys inferred",
query: "",
want: map[stream.Topic][]string{
"*": []string{"*"},
"*": {"*"},
},
},
{
Expand All @@ -103,14 +103,14 @@ func TestEventStream_QueryParse(t *testing.T) {
desc: "single topic and key",
query: "?topic=NodeDrain:*",
want: map[stream.Topic][]string{
"NodeDrain": []string{"*"},
"NodeDrain": {"*"},
},
},
{
desc: "single topic multiple keys",
query: "?topic=NodeDrain:*&topic=NodeDrain:3caace09-f1f4-4d23-b37a-9ab5eb75069d",
want: map[stream.Topic][]string{
"NodeDrain": []string{
"NodeDrain": {
"*",
"3caace09-f1f4-4d23-b37a-9ab5eb75069d",
},
Expand All @@ -120,10 +120,10 @@ func TestEventStream_QueryParse(t *testing.T) {
desc: "multiple topics",
query: "?topic=NodeRegister:*&topic=NodeDrain:3caace09-f1f4-4d23-b37a-9ab5eb75069d",
want: map[stream.Topic][]string{
"NodeDrain": []string{
"NodeDrain": {
"3caace09-f1f4-4d23-b37a-9ab5eb75069d",
},
"NodeRegister": []string{
"NodeRegister": {
"*",
},
},
Expand Down
13 changes: 7 additions & 6 deletions nomad/deploymentwatcher/deployments_watcher_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package deploymentwatcher

import (
"context"
"fmt"
"testing"
"time"
Expand Down Expand Up @@ -917,7 +918,7 @@ func TestDeploymentWatcher_Watch_NoProgressDeadline(t *testing.T) {
HealthyAllocationIDs: []string{a.ID},
},
}
require.Nil(m.state.UpdateDeploymentAllocHealth(m.nextIndex(), req), "UpsertDeploymentAllocHealth")
require.Nil(m.state.UpdateDeploymentAllocHealth(context.Background(), m.nextIndex(), req), "UpsertDeploymentAllocHealth")
}

// Wait for there to be one eval
Expand Down Expand Up @@ -945,7 +946,7 @@ func TestDeploymentWatcher_Watch_NoProgressDeadline(t *testing.T) {
UnhealthyAllocationIDs: []string{a.ID},
},
}
require.Nil(m.state.UpdateDeploymentAllocHealth(m.nextIndex(), req2), "UpsertDeploymentAllocHealth")
require.Nil(m.state.UpdateDeploymentAllocHealth(context.Background(), m.nextIndex(), req2), "UpsertDeploymentAllocHealth")

// Wait for there to be one eval
testutil.WaitForResult(func() (bool, error) {
Expand Down Expand Up @@ -1453,7 +1454,7 @@ func TestDeploymentWatcher_RollbackFailed(t *testing.T) {
HealthyAllocationIDs: []string{a.ID},
},
}
require.Nil(m.state.UpdateDeploymentAllocHealth(m.nextIndex(), req), "UpsertDeploymentAllocHealth")
require.Nil(m.state.UpdateDeploymentAllocHealth(context.Background(), m.nextIndex(), req), "UpsertDeploymentAllocHealth")
}

// Wait for there to be one eval
Expand Down Expand Up @@ -1481,7 +1482,7 @@ func TestDeploymentWatcher_RollbackFailed(t *testing.T) {
UnhealthyAllocationIDs: []string{a.ID},
},
}
require.Nil(m.state.UpdateDeploymentAllocHealth(m.nextIndex(), req2), "UpsertDeploymentAllocHealth")
require.Nil(m.state.UpdateDeploymentAllocHealth(context.Background(), m.nextIndex(), req2), "UpsertDeploymentAllocHealth")

// Wait for there to be one eval
testutil.WaitForResult(func() (bool, error) {
Expand Down Expand Up @@ -1562,15 +1563,15 @@ func TestWatcher_BatchAllocUpdates(t *testing.T) {
HealthyAllocationIDs: []string{a1.ID},
},
}
require.Nil(m.state.UpdateDeploymentAllocHealth(m.nextIndex(), req), "UpsertDeploymentAllocHealth")
require.Nil(m.state.UpdateDeploymentAllocHealth(context.Background(), m.nextIndex(), req), "UpsertDeploymentAllocHealth")

req2 := &structs.ApplyDeploymentAllocHealthRequest{
DeploymentAllocHealthRequest: structs.DeploymentAllocHealthRequest{
DeploymentID: d2.ID,
HealthyAllocationIDs: []string{a2.ID},
},
}
require.Nil(m.state.UpdateDeploymentAllocHealth(m.nextIndex(), req2), "UpsertDeploymentAllocHealth")
require.Nil(m.state.UpdateDeploymentAllocHealth(context.Background(), m.nextIndex(), req2), "UpsertDeploymentAllocHealth")

// Wait for there to be one eval for each job
testutil.WaitForResult(func() (bool, error) {
Expand Down
7 changes: 4 additions & 3 deletions nomad/deploymentwatcher/testutil_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package deploymentwatcher

import (
"context"
"reflect"
"strings"
"sync"
Expand Down Expand Up @@ -95,7 +96,7 @@ func (m *mockBackend) UpsertJob(job *structs.Job) (uint64, error) {
func (m *mockBackend) UpdateDeploymentStatus(u *structs.DeploymentStatusUpdateRequest) (uint64, error) {
m.Called(u)
i := m.nextIndex()
return i, m.state.UpdateDeploymentStatus(i, u)
return i, m.state.UpdateDeploymentStatus(context.Background(), i, u)
}

// matchDeploymentStatusUpdateConfig is used to configure the matching
Expand Down Expand Up @@ -149,7 +150,7 @@ func matchDeploymentStatusUpdateRequest(c *matchDeploymentStatusUpdateConfig) fu
func (m *mockBackend) UpdateDeploymentPromotion(req *structs.ApplyDeploymentPromoteRequest) (uint64, error) {
m.Called(req)
i := m.nextIndex()
return i, m.state.UpdateDeploymentPromotion(i, req)
return i, m.state.UpdateDeploymentPromotion(context.Background(), i, req)
}

// matchDeploymentPromoteRequestConfig is used to configure the matching
Expand Down Expand Up @@ -179,7 +180,7 @@ func matchDeploymentPromoteRequest(c *matchDeploymentPromoteRequestConfig) func(
func (m *mockBackend) UpdateDeploymentAllocHealth(req *structs.ApplyDeploymentAllocHealthRequest) (uint64, error) {
m.Called(req)
i := m.nextIndex()
return i, m.state.UpdateDeploymentAllocHealth(i, req)
return i, m.state.UpdateDeploymentAllocHealth(context.Background(), i, req)
}

// matchDeploymentAllocHealthRequestConfig is used to configure the matching
Expand Down
3 changes: 2 additions & 1 deletion nomad/eval_endpoint_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package nomad

import (
"context"
"fmt"
"reflect"
"strings"
Expand Down Expand Up @@ -374,7 +375,7 @@ func TestEvalEndpoint_Dequeue_UpdateWaitIndex(t *testing.T) {
EvalID: eval.ID,
}
assert := assert.New(t)
err := state.UpsertPlanResults(1000, &res)
err := state.UpsertPlanResults(context.Background(), 1000, &res)
assert.Nil(err)

// Dequeue the eval
Expand Down
15 changes: 6 additions & 9 deletions nomad/event_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,19 +117,16 @@ func (e *Event) stream(conn io.ReadWriteCloser) {
}

// Continue if there are no events
if events == nil {
if len(events.Events) == 0 {
continue
}

// Send each event as its own frame
for _, e := range events {
if err := jsonStream.Send(e); err != nil {
select {
case errCh <- err:
case <-ctx.Done():
}
break LOOP
if err := jsonStream.Send(events); err != nil {
select {
case errCh <- err:
case <-ctx.Done():
}
break LOOP
}
}
}()
Expand Down
12 changes: 6 additions & 6 deletions nomad/event_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ OUTER:
continue
}

var event stream.Event
var event stream.Events
err = json.Unmarshal(msg.Event.Data, &event)
require.NoError(t, err)

Expand All @@ -102,7 +102,7 @@ OUTER:
Result: &out,
}
dec, err := mapstructure.NewDecoder(cfg)
dec.Decode(event.Payload)
dec.Decode(event.Events[0].Payload)
require.NoError(t, err)
require.Equal(t, node.ID, out.ID)
break OUTER
Expand All @@ -123,7 +123,7 @@ func TestEventStream_StreamErr(t *testing.T) {
testutil.WaitForLeader(t, s1.RPC)

req := structs.EventStreamRequest{
Topics: map[stream.Topic][]string{"*": []string{"*"}},
Topics: map[stream.Topic][]string{"*": {"*"}},
QueryOptions: structs.QueryOptions{
Region: s1.Region(),
},
Expand Down Expand Up @@ -210,7 +210,7 @@ func TestEventStream_RegionForward(t *testing.T) {

// Create request targed for region foo
req := structs.EventStreamRequest{
Topics: map[stream.Topic][]string{"*": []string{"*"}},
Topics: map[stream.Topic][]string{"*": {"*"}},
QueryOptions: structs.QueryOptions{
Region: "foo",
},
Expand Down Expand Up @@ -272,7 +272,7 @@ OUTER:
continue
}

var event stream.Event
var event stream.Events
err = json.Unmarshal(msg.Event.Data, &event)
require.NoError(t, err)

Expand All @@ -282,7 +282,7 @@ OUTER:
Result: &out,
}
dec, err := mapstructure.NewDecoder(cfg)
dec.Decode(event.Payload)
dec.Decode(event.Events[0].Payload)
require.NoError(t, err)
require.Equal(t, node.ID, out.ID)
break OUTER
Expand Down
48 changes: 30 additions & 18 deletions nomad/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ func (n *nomadFSM) Apply(log *raft.Log) interface{} {
case structs.NodeUpdateStatusRequestType:
return n.applyStatusUpdate(buf[1:], log.Index)
case structs.NodeUpdateDrainRequestType:
return n.applyDrainUpdate(buf[1:], log.Index)
return n.applyDrainUpdate(msgType, buf[1:], log.Index)
case structs.JobRegisterRequestType:
return n.applyUpsertJob(buf[1:], log.Index)
case structs.JobDeregisterRequestType:
Expand All @@ -226,13 +226,13 @@ func (n *nomadFSM) Apply(log *raft.Log) interface{} {
case structs.VaultAccessorDeregisterRequestType:
return n.applyDeregisterVaultAccessor(buf[1:], log.Index)
case structs.ApplyPlanResultsRequestType:
return n.applyPlanResults(buf[1:], log.Index)
return n.applyPlanResults(msgType, buf[1:], log.Index)
case structs.DeploymentStatusUpdateRequestType:
return n.applyDeploymentStatusUpdate(buf[1:], log.Index)
return n.applyDeploymentStatusUpdate(msgType, buf[1:], log.Index)
case structs.DeploymentPromoteRequestType:
return n.applyDeploymentPromotion(buf[1:], log.Index)
return n.applyDeploymentPromotion(msgType, buf[1:], log.Index)
case structs.DeploymentAllocHealthRequestType:
return n.applyDeploymentAllocHealth(buf[1:], log.Index)
return n.applyDeploymentAllocHealth(msgType, buf[1:], log.Index)
case structs.DeploymentDeleteRequestType:
return n.applyDeploymentDelete(buf[1:], log.Index)
case structs.JobStabilityRequestType:
Expand All @@ -250,7 +250,7 @@ func (n *nomadFSM) Apply(log *raft.Log) interface{} {
case structs.AutopilotRequestType:
return n.applyAutopilotUpdate(buf[1:], log.Index)
case structs.UpsertNodeEventsType:
return n.applyUpsertNodeEvent(buf[1:], log.Index)
return n.applyUpsertNodeEvent(msgType, buf[1:], log.Index)
case structs.JobBatchDeregisterRequestType:
return n.applyBatchDeregisterJob(buf[1:], log.Index)
case structs.AllocUpdateDesiredTransitionRequestType:
Expand Down Expand Up @@ -402,13 +402,15 @@ func (n *nomadFSM) applyStatusUpdate(buf []byte, index uint64) interface{} {
return nil
}

func (n *nomadFSM) applyDrainUpdate(buf []byte, index uint64) interface{} {
func (n *nomadFSM) applyDrainUpdate(reqType structs.MessageType, buf []byte, index uint64) interface{} {
defer metrics.MeasureSince([]string{"nomad", "fsm", "node_drain_update"}, time.Now())
var req structs.NodeUpdateDrainRequest
if err := structs.Decode(buf, &req); err != nil {
panic(fmt.Errorf("failed to decode request: %v", err))
}

ctx := context.WithValue(context.Background(), state.CtxMsgType, reqType)

// COMPAT Remove in version 0.10
// As part of Nomad 0.8 we have deprecated the drain boolean in favor of a
// drain strategy but we need to handle the upgrade path where the Raft log
Expand All @@ -423,7 +425,7 @@ func (n *nomadFSM) applyDrainUpdate(buf []byte, index uint64) interface{} {
}
}

if err := n.state.UpdateNodeDrain(index, req.NodeID, req.DrainStrategy, req.MarkEligible, req.UpdatedAt, req.NodeEvent); err != nil {
if err := n.state.UpdateNodeDrainCtx(ctx, index, req.NodeID, req.DrainStrategy, req.MarkEligible, req.UpdatedAt, req.NodeEvent); err != nil {
n.logger.Error("UpdateNodeDrain failed", "error", err)
return err
}
Expand Down Expand Up @@ -874,14 +876,16 @@ func (n *nomadFSM) applyReconcileSummaries(buf []byte, index uint64) interface{}
}

// applyUpsertNodeEvent tracks the given node events.
func (n *nomadFSM) applyUpsertNodeEvent(buf []byte, index uint64) interface{} {
func (n *nomadFSM) applyUpsertNodeEvent(msgType structs.MessageType, buf []byte, index uint64) interface{} {
defer metrics.MeasureSince([]string{"nomad", "fsm", "upsert_node_events"}, time.Now())
var req structs.EmitNodeEventsRequest
if err := structs.Decode(buf, &req); err != nil {
panic(fmt.Errorf("failed to decode EmitNodeEventsRequest: %v", err))
}

if err := n.state.UpsertNodeEvents(index, req.NodeEvents); err != nil {
ctx := context.WithValue(context.Background(), state.CtxMsgType, msgType)

if err := n.state.UpsertNodeEventsCtx(ctx, index, req.NodeEvents); err != nil {
n.logger.Error("failed to add node events", "error", err)
return err
}
Expand Down Expand Up @@ -953,14 +957,16 @@ func (n *nomadFSM) applyDeregisterSIAccessor(buf []byte, index uint64) interface
}

// applyPlanApply applies the results of a plan application
func (n *nomadFSM) applyPlanResults(buf []byte, index uint64) interface{} {
func (n *nomadFSM) applyPlanResults(msgType structs.MessageType, buf []byte, index uint64) interface{} {
defer metrics.MeasureSince([]string{"nomad", "fsm", "apply_plan_results"}, time.Now())
var req structs.ApplyPlanResultsRequest
if err := structs.Decode(buf, &req); err != nil {
panic(fmt.Errorf("failed to decode request: %v", err))
}

if err := n.state.UpsertPlanResults(index, &req); err != nil {
ctx := context.WithValue(context.Background(), state.CtxMsgType, msgType)

if err := n.state.UpsertPlanResults(ctx, index, &req); err != nil {
n.logger.Error("ApplyPlan failed", "error", err)
return err
}
Expand All @@ -972,14 +978,16 @@ func (n *nomadFSM) applyPlanResults(buf []byte, index uint64) interface{} {

// applyDeploymentStatusUpdate is used to update the status of an existing
// deployment
func (n *nomadFSM) applyDeploymentStatusUpdate(buf []byte, index uint64) interface{} {
func (n *nomadFSM) applyDeploymentStatusUpdate(msgType structs.MessageType, buf []byte, index uint64) interface{} {
defer metrics.MeasureSince([]string{"nomad", "fsm", "apply_deployment_status_update"}, time.Now())
var req structs.DeploymentStatusUpdateRequest
if err := structs.Decode(buf, &req); err != nil {
panic(fmt.Errorf("failed to decode request: %v", err))
}

if err := n.state.UpdateDeploymentStatus(index, &req); err != nil {
ctx := context.WithValue(context.Background(), state.CtxMsgType, msgType)

if err := n.state.UpdateDeploymentStatus(ctx, index, &req); err != nil {
n.logger.Error("UpsertDeploymentStatusUpdate failed", "error", err)
return err
}
Expand All @@ -989,14 +997,16 @@ func (n *nomadFSM) applyDeploymentStatusUpdate(buf []byte, index uint64) interfa
}

// applyDeploymentPromotion is used to promote canaries in a deployment
func (n *nomadFSM) applyDeploymentPromotion(buf []byte, index uint64) interface{} {
func (n *nomadFSM) applyDeploymentPromotion(msgType structs.MessageType, buf []byte, index uint64) interface{} {
defer metrics.MeasureSince([]string{"nomad", "fsm", "apply_deployment_promotion"}, time.Now())
var req structs.ApplyDeploymentPromoteRequest
if err := structs.Decode(buf, &req); err != nil {
panic(fmt.Errorf("failed to decode request: %v", err))
}

if err := n.state.UpdateDeploymentPromotion(index, &req); err != nil {
ctx := context.WithValue(context.Background(), state.CtxMsgType, msgType)

if err := n.state.UpdateDeploymentPromotion(ctx, index, &req); err != nil {
n.logger.Error("UpsertDeploymentPromotion failed", "error", err)
return err
}
Expand All @@ -1007,14 +1017,16 @@ func (n *nomadFSM) applyDeploymentPromotion(buf []byte, index uint64) interface{

// applyDeploymentAllocHealth is used to set the health of allocations as part
// of a deployment
func (n *nomadFSM) applyDeploymentAllocHealth(buf []byte, index uint64) interface{} {
func (n *nomadFSM) applyDeploymentAllocHealth(msgType structs.MessageType, buf []byte, index uint64) interface{} {
defer metrics.MeasureSince([]string{"nomad", "fsm", "apply_deployment_alloc_health"}, time.Now())
var req structs.ApplyDeploymentAllocHealthRequest
if err := structs.Decode(buf, &req); err != nil {
panic(fmt.Errorf("failed to decode request: %v", err))
}

if err := n.state.UpdateDeploymentAllocHealth(index, &req); err != nil {
ctx := context.WithValue(context.Background(), state.CtxMsgType, msgType)

if err := n.state.UpdateDeploymentAllocHealth(ctx, index, &req); err != nil {
n.logger.Error("UpsertDeploymentAllocHealth failed", "error", err)
return err
}
Expand Down
Loading

0 comments on commit f0f1495

Please sign in to comment.