Skip to content

Commit

Permalink
Merge pull request #12454 from hashicorp/f-rename-service-event-stream
Browse files Browse the repository at this point in the history
events: add service API logic and rename topic to service from serviceregistration
  • Loading branch information
jrasell committed Apr 5, 2022
2 parents a285905 + cebe704 commit b7d19a6
Show file tree
Hide file tree
Showing 6 changed files with 44 additions and 19 deletions.
22 changes: 17 additions & 5 deletions api/event_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ const (
TopicAllocation Topic = "Allocation"
TopicJob Topic = "Job"
TopicNode Topic = "Node"
TopicService Topic = "Service"
TopicAll Topic = "*"
)

Expand Down Expand Up @@ -91,12 +92,23 @@ func (e *Event) Node() (*Node, error) {
return out.Node, nil
}

// Service returns a ServiceRegistration struct from a given event payload. If
// the Event Topic is Service this will return a valid ServiceRegistration.
func (e *Event) Service() (*ServiceRegistration, error) {
out, err := e.decodePayload()
if err != nil {
return nil, err
}
return out.Service, nil
}

type eventPayload struct {
Allocation *Allocation `mapstructure:"Allocation"`
Deployment *Deployment `mapstructure:"Deployment"`
Evaluation *Evaluation `mapstructure:"Evaluation"`
Job *Job `mapstructure:"Job"`
Node *Node `mapstructure:"Node"`
Allocation *Allocation `mapstructure:"Allocation"`
Deployment *Deployment `mapstructure:"Deployment"`
Evaluation *Evaluation `mapstructure:"Evaluation"`
Job *Job `mapstructure:"Job"`
Node *Node `mapstructure:"Node"`
Service *ServiceRegistration `mapstructure:"Service"`
}

func (e *Event) decodePayload() (*eventPayload, error) {
Expand Down
12 changes: 12 additions & 0 deletions api/event_stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,18 @@ func TestEventStream_PayloadValueHelpers(t *testing.T) {
}, n)
},
},
{
desc: "service",
input: []byte(`{"Topic": "Service", "Payload": {"Service":{"ID":"some-service-id","Namespace":"some-service-namespace-id","Datacenter":"us-east-1a"}}}`),
expectFn: func(t *testing.T, event Event) {
require.Equal(t, TopicService, event.Topic)
a, err := event.Service()
require.NoError(t, err)
require.Equal(t, "us-east-1a", a.Datacenter)
require.Equal(t, "some-service-id", a.ID)
require.Equal(t, "some-service-namespace-id", a.Namespace)
},
},
}

for _, tc := range testCases {
Expand Down
4 changes: 2 additions & 2 deletions nomad/state/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func eventFromChange(change memdb.Change) (structs.Event, bool) {
return structs.Event{}, false
}
return structs.Event{
Topic: structs.TopicServiceRegistration,
Topic: structs.TopicService,
Key: before.ID,
FilterKeys: []string{
before.JobID,
Expand Down Expand Up @@ -224,7 +224,7 @@ func eventFromChange(change memdb.Change) (structs.Event, bool) {
return structs.Event{}, false
}
return structs.Event{
Topic: structs.TopicServiceRegistration,
Topic: structs.TopicService,
Key: after.ID,
FilterKeys: []string{
after.JobID,
Expand Down
4 changes: 2 additions & 2 deletions nomad/state/events_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -976,7 +976,7 @@ func Test_eventsFromChanges_ServiceRegistration(t *testing.T) {

// Check the event, and it's payload are what we are expecting.
require.Len(t, receivedChange.Events, 1)
require.Equal(t, structs.TopicServiceRegistration, receivedChange.Events[0].Topic)
require.Equal(t, structs.TopicService, receivedChange.Events[0].Topic)
require.Equal(t, structs.TypeServiceRegistration, receivedChange.Events[0].Type)
require.Equal(t, uint64(10), receivedChange.Events[0].Index)

Expand All @@ -994,7 +994,7 @@ func Test_eventsFromChanges_ServiceRegistration(t *testing.T) {

// Check the event, and it's payload are what we are expecting.
require.Len(t, receivedDeleteChange.Events, 1)
require.Equal(t, structs.TopicServiceRegistration, receivedDeleteChange.Events[0].Topic)
require.Equal(t, structs.TopicService, receivedDeleteChange.Events[0].Topic)
require.Equal(t, structs.TypeServiceDeregistration, receivedDeleteChange.Events[0].Type)
require.Equal(t, uint64(20), receivedDeleteChange.Events[0].Index)

Expand Down
3 changes: 2 additions & 1 deletion nomad/stream/event_broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,8 @@ func aclAllowsSubscription(aclObj *acl.ACL, subReq *SubscribeRequest) bool {
case structs.TopicDeployment,
structs.TopicEvaluation,
structs.TopicAllocation,
structs.TopicJob:
structs.TopicJob,
structs.TopicService:
if ok := aclObj.AllowNsOp(subReq.Namespace, acl.NamespaceCapabilityReadJob); !ok {
return false
}
Expand Down
18 changes: 9 additions & 9 deletions nomad/structs/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,15 @@ type EventStreamWrapper struct {
type Topic string

const (
TopicDeployment Topic = "Deployment"
TopicEvaluation Topic = "Evaluation"
TopicAllocation Topic = "Allocation"
TopicJob Topic = "Job"
TopicNode Topic = "Node"
TopicACLPolicy Topic = "ACLPolicy"
TopicACLToken Topic = "ACLToken"
TopicServiceRegistration Topic = "ServiceRegistration"
TopicAll Topic = "*"
TopicDeployment Topic = "Deployment"
TopicEvaluation Topic = "Evaluation"
TopicAllocation Topic = "Allocation"
TopicJob Topic = "Job"
TopicNode Topic = "Node"
TopicACLPolicy Topic = "ACLPolicy"
TopicACLToken Topic = "ACLToken"
TopicService Topic = "Service"
TopicAll Topic = "*"

TypeNodeRegistration = "NodeRegistration"
TypeNodeDeregistration = "NodeDeregistration"
Expand Down

0 comments on commit b7d19a6

Please sign in to comment.