Skip to content

Commit

Permalink
Events switch on memdb change table instead of type to prevent duplic…
Browse files Browse the repository at this point in the history
…ates (#9486)

* prevent duplicate job events

when a job is updated, the job_version table is updated with a structs.Job, this caused there to be multiple job events since we are switching off the change type and not the table

* test length

* add table value to tests
  • Loading branch information
drewbailey authored Dec 1, 2020
1 parent 5ddaa1b commit db3bfb7
Show file tree
Hide file tree
Showing 2 changed files with 87 additions and 15 deletions.
69 changes: 54 additions & 15 deletions nomad/state/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,24 +50,36 @@ func eventsFromChanges(tx ReadTxn, changes Changes) *structs.Events {

func eventFromChange(change memdb.Change) (structs.Event, bool) {
if change.Deleted() {
switch before := change.Before.(type) {
case *structs.ACLToken:
switch change.Table {
case "acl_token":
before, ok := change.Before.(*structs.ACLToken)
if !ok {
return structs.Event{}, false
}
return structs.Event{
Topic: structs.TopicACLToken,
Key: before.AccessorID,
Payload: structs.ACLTokenEvent{
ACLToken: before,
},
}, true
case *structs.ACLPolicy:
case "acl_policy":
before, ok := change.Before.(*structs.ACLPolicy)
if !ok {
return structs.Event{}, false
}
return structs.Event{
Topic: structs.TopicACLPolicy,
Key: before.Name,
Payload: structs.ACLPolicyEvent{
ACLPolicy: before,
},
}, true
case *structs.Node:
case "nodes":
before, ok := change.Before.(*structs.Node)
if !ok {
return structs.Event{}, false
}
return structs.Event{
Topic: structs.TopicNode,
Key: before.ID,
Expand All @@ -76,28 +88,39 @@ func eventFromChange(change memdb.Change) (structs.Event, bool) {
},
}, true
}

return structs.Event{}, false
}

switch after := change.After.(type) {
case *structs.ACLToken:
switch change.Table {
case "acl_token":
after, ok := change.After.(*structs.ACLToken)
if !ok {
return structs.Event{}, false
}
return structs.Event{
Topic: structs.TopicACLToken,
Key: after.AccessorID,
Payload: &structs.ACLTokenEvent{
Payload: structs.ACLTokenEvent{
ACLToken: after,
},
}, true
case *structs.ACLPolicy:
case "acl_policy":
after, ok := change.After.(*structs.ACLPolicy)
if !ok {
return structs.Event{}, false
}
return structs.Event{
Topic: structs.TopicACLPolicy,
Key: after.Name,
Payload: &structs.ACLPolicyEvent{
Payload: structs.ACLPolicyEvent{
ACLPolicy: after,
},
}, true
case *structs.Evaluation:
case "evals":
after, ok := change.After.(*structs.Evaluation)
if !ok {
return structs.Event{}, false
}
return structs.Event{
Topic: structs.TopicEval,
Key: after.ID,
Expand All @@ -110,7 +133,11 @@ func eventFromChange(change memdb.Change) (structs.Event, bool) {
Eval: after,
},
}, true
case *structs.Allocation:
case "allocs":
after, ok := change.After.(*structs.Allocation)
if !ok {
return structs.Event{}, false
}
alloc := after.Copy()

filterKeys := []string{
Expand All @@ -130,7 +157,11 @@ func eventFromChange(change memdb.Change) (structs.Event, bool) {
Alloc: alloc,
},
}, true
case *structs.Job:
case "jobs":
after, ok := change.After.(*structs.Job)
if !ok {
return structs.Event{}, false
}
return structs.Event{
Topic: structs.TopicJob,
Key: after.ID,
Expand All @@ -139,15 +170,23 @@ func eventFromChange(change memdb.Change) (structs.Event, bool) {
Job: after,
},
}, true
case *structs.Node:
case "nodes":
after, ok := change.After.(*structs.Node)
if !ok {
return structs.Event{}, false
}
return structs.Event{
Topic: structs.TopicNode,
Key: after.ID,
Payload: &structs.NodeStreamEvent{
Node: after,
},
}, true
case *structs.Deployment:
case "deployment":
after, ok := change.After.(*structs.Deployment)
if !ok {
return structs.Event{}, false
}
return structs.Event{
Topic: structs.TopicDeployment,
Key: after.ID,
Expand Down
33 changes: 33 additions & 0 deletions nomad/state/events_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,36 @@ import (
"github.com/stretchr/testify/require"
)

// TestEventFromChange_SingleEventPerTable ensures that only a single event is
// created per table per memdb.Change
func TestEventFromChange_SingleEventPerTable(t *testing.T) {
t.Parallel()
s := TestStateStoreCfg(t, TestStateStorePublisher(t))
defer s.StopEventBroker()

changes := Changes{
Index: 100,
MsgType: structs.JobRegisterRequestType,
Changes: memdb.Changes{
{
Table: "job_version",
Before: mock.Job(),
After: mock.Job(),
},
{
Table: "jobs",
Before: mock.Job(),
After: mock.Job(),
},
},
}

out := eventsFromChanges(s.db.ReadTxn(), changes)
require.Len(t, out.Events, 1)
require.Equal(t, out.Events[0].Type, structs.TypeJobRegistered)

}

func TestEventsFromChanges_DeploymentUpdate(t *testing.T) {
t.Parallel()
s := TestStateStoreCfg(t, TestStateStorePublisher(t))
Expand Down Expand Up @@ -571,10 +601,12 @@ func TestEventsFromChanges_WithDeletion(t *testing.T) {
Index: uint64(1),
Changes: memdb.Changes{
{
Table: "jobs",
Before: &structs.Job{},
After: &structs.Job{},
},
{
Table: "jobs",
Before: &structs.Job{},
After: nil, // deleted
},
Expand All @@ -600,6 +632,7 @@ func TestEventsFromChanges_WithNodeDeregistration(t *testing.T) {
Index: uint64(1),
Changes: memdb.Changes{
{
Table: "nodes",
Before: before,
After: nil, // deleted
},
Expand Down

0 comments on commit db3bfb7

Please sign in to comment.