Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

events: fix wildcard namespace handling #10935

Merged
merged 2 commits into from
Sep 2, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .changelog/10935.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:bug
events: Fixed wildcard namespace handling
```
2 changes: 2 additions & 0 deletions nomad/fsm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3378,6 +3378,7 @@ func TestFSM_ACLEvents(t *testing.T) {
Topics: map[structs.Topic][]string{
tc.reqTopic: {"*"},
},
Namespace: "default",
}

sub, err := broker.Subscribe(subReq)
Expand Down Expand Up @@ -3431,6 +3432,7 @@ func TestFSM_EventBroker_JobRegisterFSMEvents(t *testing.T) {
Topics: map[structs.Topic][]string{
structs.TopicJob: {"*"},
},
Namespace: "default",
}

sub, err := broker.Subscribe(subReq)
Expand Down
1 change: 1 addition & 0 deletions nomad/state/deployment_events_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ func EventsForIndex(t *testing.T, s *StateStore, index uint64) []structs.Event {
Topics: map[structs.Topic][]string{
"*": {"*"},
},
Namespace: "default",
Index: index,
StartExactlyAtIndex: true,
})
Expand Down
30 changes: 29 additions & 1 deletion nomad/stream/event_broker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,27 @@ func TestEventBroker_handleACLUpdates_policyupdated(t *testing.T) {
Payload: structs.NewACLTokenEvent(&structs.ACLToken{SecretID: secretID}),
},
},
{
desc: "subscribed to evals in all namespaces and removed access",
policyBeforeRules: mock.NamespacePolicy("*", "", []string{acl.NamespaceCapabilityReadJob}),
policyAfterRules: mock.NamespacePolicy(structs.DefaultNamespace, "", []string{acl.NamespaceCapabilityReadJob}),
shouldUnsubscribe: true,
event: structs.Event{
Topic: structs.TopicEvaluation,
Type: structs.TypeEvalUpdated,
Namespace: "foo",
Payload: structs.EvaluationEvent{
Evaluation: &structs.Evaluation{
ID: "some-id",
},
},
},
policyEvent: structs.Event{
Topic: structs.TopicACLToken,
Type: structs.TypeACLTokenUpserted,
Payload: structs.NewACLTokenEvent(&structs.ACLToken{SecretID: secretID}),
},
},
{
desc: "subscribed to deployments and no access change",
policyBeforeRules: mock.NamespacePolicy(structs.DefaultNamespace, "", []string{acl.NamespaceCapabilityReadJob}),
Expand Down Expand Up @@ -467,11 +488,18 @@ func TestEventBroker_handleACLUpdates_policyupdated(t *testing.T) {
publisher, err := NewEventBroker(ctx, aclDelegate, EventBrokerCfg{})
require.NoError(t, err)

var ns string
if tc.event.Namespace != "" {
ns = tc.event.Namespace
} else {
ns = structs.DefaultNamespace
}

sub, err := publisher.SubscribeWithACLCheck(&SubscribeRequest{
Topics: map[structs.Topic][]string{
tc.event.Topic: {"*"},
},
Namespace: structs.DefaultNamespace,
Namespace: ns,
Token: secretID,
})

Expand Down
5 changes: 3 additions & 2 deletions nomad/stream/subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,14 +123,15 @@ func filter(req *SubscribeRequest, events []structs.Event) []structs.Event {

allTopicKeys := req.Topics[structs.TopicAll]

if req.Namespace == "" && len(allTopicKeys) == 1 && allTopicKeys[0] == string(structs.TopicAll) {
// Return all events if subscribed to all namespaces and all topics
if req.Namespace == "*" && len(allTopicKeys) == 1 && allTopicKeys[0] == string(structs.TopicAll) {
return events
}

var result []structs.Event

for _, event := range events {
if req.Namespace != "" && event.Namespace != "" && event.Namespace != req.Namespace {
if req.Namespace != "*" && event.Namespace != "" && event.Namespace != req.Namespace {
continue
}

Expand Down
23 changes: 23 additions & 0 deletions nomad/stream/subscription_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,29 @@ func TestFilter_Namespace(t *testing.T) {
require.Equal(t, 2, cap(actual))
}

func TestFilter_NamespaceAll(t *testing.T) {
events := make([]structs.Event, 0, 5)
events = append(events,
structs.Event{Topic: "Test", Key: "One", Namespace: "foo"},
structs.Event{Topic: "Test", Key: "Two", Namespace: "bar"},
structs.Event{Topic: "Test", Key: "Three", Namespace: "default"},
)

req := &SubscribeRequest{
Topics: map[structs.Topic][]string{
"*": {"*"},
},
Namespace: "*",
}
actual := filter(req, events)
expected := []structs.Event{
{Topic: "Test", Key: "One", Namespace: "foo"},
{Topic: "Test", Key: "Two", Namespace: "bar"},
{Topic: "Test", Key: "Three", Namespace: "default"},
}
require.Equal(t, expected, actual)
}

func TestFilter_FilterKeys(t *testing.T) {
events := make([]structs.Event, 0, 5)
events = append(events, structs.Event{Topic: "Test", Key: "One", FilterKeys: []string{"extra-key"}}, structs.Event{Topic: "Test", Key: "Two"}, structs.Event{Topic: "Test", Key: "Two"})
Expand Down
10 changes: 8 additions & 2 deletions website/content/api-docs/events.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ by default, requiring a management token.

- `namespace` `(string: "default")` - Specifies the target namespace to filter
on. Specifying `*` includes all namespaces for event types that support
namespaces.
namespaces. If you specify all namespaces (`*`) you'll either need a management
token, or an ACL Policy that explicitly applies to all namespaces (`*`).

- `topic` `(topic:filter_key: "*:*")` - Specifies a topic to subscribe to and
filter on. The default is to subscribe to all topics. Multiple topics may be
Expand Down Expand Up @@ -96,10 +97,15 @@ by default, requiring a management token.
### Sample Request

```shell-session
# Subscribe to all events and topics
# Subscribe to all events and topics in the default namespace
$ curl -s -v -N http://127.0.0.1:4646/v1/event/stream
```

```shell-session
# Subscribe to all events and topics in all namespaces
$ curl -s -v -N http://127.0.0.1:4646/v1/event/stream?namespace=*
```

```shell-session
# Start at index 100 and subscribe to all Evaluation events
$ curl -s -v -N http://127.0.0.1:4646/v1/event/stream?index=100&topic=Evaluation
Expand Down