Skip to content

Commit

Permalink
allow way to access secretID without exposing it to stream
Browse files Browse the repository at this point in the history
test that values are omitted

test event creation

test acl events

payloads are pointers

comments
  • Loading branch information
drewbailey committed Dec 9, 2020
1 parent a76d54b commit 3a4804a
Show file tree
Hide file tree
Showing 7 changed files with 259 additions and 25 deletions.
155 changes: 155 additions & 0 deletions nomad/fsm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package nomad

import (
"bytes"
"context"
"fmt"
"reflect"
"strings"
Expand All @@ -15,6 +16,7 @@ import (
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/state"
"github.com/hashicorp/nomad/nomad/stream"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/testutil"
"github.com/hashicorp/raft"
Expand Down Expand Up @@ -3276,3 +3278,156 @@ func TestFSM_SnapshotRestore_Namespaces(t *testing.T) {
t.Fatalf("bad: \n%#v\n%#v", out2, ns2)
}
}

func TestFSM_ACLEvents_ACLToken(t *testing.T) {
t.Parallel()

cases := []struct {
desc string
setupfn func(t *testing.T, fsm *nomadFSM)
raftReq func(t *testing.T) []byte
reqTopic structs.Topic
eventfn func(t *testing.T, e []structs.Event)
}{
{
desc: "ACLToken upserted",
raftReq: func(t *testing.T) []byte {
req := structs.ACLTokenUpsertRequest{
Tokens: []*structs.ACLToken{mock.ACLToken()},
}
buf, err := structs.Encode(structs.ACLTokenUpsertRequestType, req)
require.NoError(t, err)
return buf
},
reqTopic: structs.TopicACLToken,
eventfn: func(t *testing.T, e []structs.Event) {
require.Len(t, e, 1)
require.Equal(t, e[0].Topic, structs.TopicACLToken)
require.Empty(t, e[0].Payload.(*structs.ACLTokenEvent).ACLToken.SecretID)
require.Equal(t, e[0].Type, structs.TypeACLTokenUpserted)
},
},
{
desc: "ACLToken deleted",
setupfn: func(t *testing.T, fsm *nomadFSM) {
token := mock.ACLToken()
token.SecretID = "26be01d3-df3a-45e9-9f49-4487a3dc3496"
token.AccessorID = "b971acba-bbe5-4274-bdfa-8bb1f542a8c1"

require.NoError(t,
fsm.State().UpsertACLTokens(
structs.MsgTypeTestSetup, 10, []*structs.ACLToken{token}))
},
raftReq: func(t *testing.T) []byte {
req := structs.ACLTokenDeleteRequest{
AccessorIDs: []string{"b971acba-bbe5-4274-bdfa-8bb1f542a8c1"},
}
buf, err := structs.Encode(structs.ACLTokenDeleteRequestType, req)
require.NoError(t, err)
return buf
},
reqTopic: structs.TopicACLToken,
eventfn: func(t *testing.T, e []structs.Event) {
require.Len(t, e, 1)
require.Equal(t, e[0].Topic, structs.TopicACLToken)
require.Empty(t, e[0].Payload.(*structs.ACLTokenEvent).ACLToken.SecretID)
require.Equal(t, e[0].Type, structs.TypeACLTokenDeleted)
},
},
{
desc: "ACLPolicy upserted",
raftReq: func(t *testing.T) []byte {
req := structs.ACLPolicyUpsertRequest{
Policies: []*structs.ACLPolicy{mock.ACLPolicy()},
}
buf, err := structs.Encode(structs.ACLPolicyUpsertRequestType, req)
require.NoError(t, err)
return buf
},
reqTopic: structs.TopicACLPolicy,
eventfn: func(t *testing.T, e []structs.Event) {
require.Len(t, e, 1)
require.Equal(t, e[0].Topic, structs.TopicACLPolicy)
require.Equal(t, e[0].Type, structs.TypeACLPolicyUpserted)
},
},
{
desc: "ACLPolicy deleted",
setupfn: func(t *testing.T, fsm *nomadFSM) {
policy := mock.ACLPolicy()
policy.Name = "some-policy"

require.NoError(t,
fsm.State().UpsertACLPolicies(
structs.MsgTypeTestSetup, 10, []*structs.ACLPolicy{policy}))
},
raftReq: func(t *testing.T) []byte {
req := structs.ACLPolicyDeleteRequest{
Names: []string{"some-policy"},
}
buf, err := structs.Encode(structs.ACLPolicyDeleteRequestType, req)
require.NoError(t, err)
return buf
},
reqTopic: structs.TopicACLPolicy,
eventfn: func(t *testing.T, e []structs.Event) {
require.Len(t, e, 1)
require.Equal(t, e[0].Topic, structs.TopicACLPolicy)
require.Equal(t, e[0].Type, structs.TypeACLPolicyDeleted)
},
},
}

for _, tc := range cases {
t.Run(tc.desc, func(t *testing.T) {
fsm := testFSM(t)

// Setup any state necessary
if tc.setupfn != nil {
tc.setupfn(t, fsm)
}

// Apply the log
resp := fsm.Apply(makeLog(tc.raftReq(t)))
require.Nil(t, resp)

broker, err := fsm.State().EventBroker()
require.NoError(t, err)

subReq := &stream.SubscribeRequest{
Topics: map[structs.Topic][]string{
tc.reqTopic: {"*"},
},
}

sub, err := broker.Subscribe(subReq)
require.NoError(t, err)

ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(100*time.Millisecond))
defer cancel()

var events []structs.Event
for {
out, err := sub.Next(ctx)
if len(out.Events) == 0 {
break
}

// consume the queue until the deadline has exceeded or until we've
// received more events than expected
if err == context.DeadlineExceeded {
break
}
require.NoError(t, err)

events = append(events, out.Events...)

if len(events) >= 1 {
break
}

}
tc.eventfn(t, events)
})
}
}
30 changes: 18 additions & 12 deletions nomad/state/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,15 @@ func eventFromChange(change memdb.Change) (structs.Event, bool) {
if !ok {
return structs.Event{}, false
}

// Copy token and empty out secret ID
token := before.Copy()
token.SecretID = ""

return structs.Event{
Topic: structs.TopicACLToken,
Key: before.AccessorID,
Payload: structs.ACLTokenEvent{
ACLToken: before,
},
Topic: structs.TopicACLToken,
Key: token.AccessorID,
Payload: structs.NewACLTokenEvent(before.SecretID, token),
}, true
case "acl_policy":
before, ok := change.Before.(*structs.ACLPolicy)
Expand All @@ -71,7 +74,7 @@ func eventFromChange(change memdb.Change) (structs.Event, bool) {
return structs.Event{
Topic: structs.TopicACLPolicy,
Key: before.Name,
Payload: structs.ACLPolicyEvent{
Payload: &structs.ACLPolicyEvent{
ACLPolicy: before,
},
}, true
Expand Down Expand Up @@ -102,12 +105,15 @@ func eventFromChange(change memdb.Change) (structs.Event, bool) {
if !ok {
return structs.Event{}, false
}

// Copy token and empty out secret ID
token := after.Copy()
token.SecretID = ""

return structs.Event{
Topic: structs.TopicACLToken,
Key: after.AccessorID,
Payload: structs.ACLTokenEvent{
ACLToken: after,
},
Topic: structs.TopicACLToken,
Key: token.AccessorID,
Payload: structs.NewACLTokenEvent(after.SecretID, token),
}, true
case "acl_policy":
after, ok := change.After.(*structs.ACLPolicy)
Expand All @@ -117,7 +123,7 @@ func eventFromChange(change memdb.Change) (structs.Event, bool) {
return structs.Event{
Topic: structs.TopicACLPolicy,
Key: after.Name,
Payload: structs.ACLPolicyEvent{
Payload: &structs.ACLPolicyEvent{
ACLPolicy: after,
},
}, true
Expand Down
53 changes: 53 additions & 0 deletions nomad/state/events_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,59 @@ func TestEventFromChange_SingleEventPerTable(t *testing.T) {
require.Equal(t, out.Events[0].Type, structs.TypeJobRegistered)
}

func TestEventFromChange_ACLTokenSecretID(t *testing.T) {
t.Parallel()
s := TestStateStoreCfg(t, TestStateStorePublisher(t))
defer s.StopEventBroker()

token := mock.ACLToken()
require.NotEmpty(t, token.SecretID)

// Create
changes := Changes{
Index: 100,
MsgType: structs.NodeRegisterRequestType,
Changes: memdb.Changes{
{
Table: "acl_token",
Before: nil,
After: token,
},
},
}

out := eventsFromChanges(s.db.ReadTxn(), changes)
require.Len(t, out.Events, 1)
// Ensure original value not altered
require.NotEmpty(t, token.SecretID)

aclTokenEvent, ok := out.Events[0].Payload.(*structs.ACLTokenEvent)
require.True(t, ok)
require.Empty(t, aclTokenEvent.ACLToken.SecretID)

require.Equal(t, token.SecretID, aclTokenEvent.SecretID())

// Delete
changes = Changes{
Index: 100,
MsgType: structs.NodeDeregisterRequestType,
Changes: memdb.Changes{
{
Table: "acl_token",
Before: token,
After: nil,
},
},
}

out2 := eventsFromChanges(s.db.ReadTxn(), changes)
require.Len(t, out2.Events, 1)

tokenEvent2, ok := out2.Events[0].Payload.(*structs.ACLTokenEvent)
require.True(t, ok)
require.Empty(t, tokenEvent2.ACLToken.SecretID)
}

// TestEventFromChange_NodeSecretID ensures that a node's secret ID is not
// included in a node event
func TestEventFromChange_NodeSecretID(t *testing.T) {
Expand Down
9 changes: 4 additions & 5 deletions nomad/stream/event_broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,7 @@ type EventBrokerCfg struct {

type EventBroker struct {
// mu protects subscriptions
mu sync.Mutex

mu sync.Mutex
subscriptions *subscriptions

// eventBuf stores a configurable amount of events in memory
Expand Down Expand Up @@ -189,8 +188,8 @@ func (e *EventBroker) handleACLUpdates(ctx context.Context) {
return
case update := <-e.aclCh:
switch payload := update.Payload.(type) {
case structs.ACLTokenEvent:
tokenSecretID := payload.ACLToken.SecretID
case *structs.ACLTokenEvent:
tokenSecretID := payload.SecretID()

// Token was deleted
if update.Type == structs.TypeACLTokenDeleted {
Expand All @@ -214,7 +213,7 @@ func (e *EventBroker) handleACLUpdates(ctx context.Context) {
return !aclAllowsSubscription(aclObj, sub.req)
})

case structs.ACLPolicyEvent:
case *structs.ACLPolicyEvent:
// Re-evaluate each subscriptions permissions since a policy
// change may or may not affect the subscription
e.checkSubscriptionsAgainstPolicyChange()
Expand Down
12 changes: 4 additions & 8 deletions nomad/stream/event_broker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ func TestEventBroker_EmptyReqToken_DistinctSubscriptions(t *testing.T) {
require.Equal(t, subscriptionStateOpen, atomic.LoadUint32(&sub2.state))
}

func TestEventBroker_handleACLUpdates_tokendeleted(t *testing.T) {
func TestEventBroker_handleACLUpdates_TokenDeleted(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)

Expand All @@ -133,13 +133,9 @@ func TestEventBroker_handleACLUpdates_tokendeleted(t *testing.T) {
defer sub1.Unsubscribe()

aclEvent := structs.Event{
Topic: structs.TopicACLToken,
Type: structs.TypeACLTokenDeleted,
Payload: structs.ACLTokenEvent{
ACLToken: &structs.ACLToken{
SecretID: "foo",
},
},
Topic: structs.TopicACLToken,
Type: structs.TypeACLTokenDeleted,
Payload: structs.NewACLTokenEvent("foo", &structs.ACLToken{}),
}

publisher.Publish(&structs.Events{Index: 100, Events: []structs.Event{aclEvent}})
Expand Down
13 changes: 13 additions & 0 deletions nomad/structs/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,19 @@ type NodeStreamEvent struct {

type ACLTokenEvent struct {
ACLToken *ACLToken
secretID string
}

// NewACLTokenEvent takes a secretID and token and creates a new ACLTokenEvent.
func NewACLTokenEvent(secretID string, token *ACLToken) *ACLTokenEvent {
return &ACLTokenEvent{
ACLToken: token,
secretID: secretID,
}
}

func (a *ACLTokenEvent) SecretID() string {
return a.secretID
}

type ACLPolicyEvent struct {
Expand Down
12 changes: 12 additions & 0 deletions nomad/structs/structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -10813,6 +10813,18 @@ type ACLToken struct {
ModifyIndex uint64
}

func (a *ACLToken) Copy() *ACLToken {
c := new(ACLToken)
*c = *a

c.Policies = make([]string, len(a.Policies))
copy(c.Policies, a.Policies)
c.Hash = make([]byte, len(a.Hash))
copy(c.Hash, a.Hash)

return c
}

var (
// AnonymousACLToken is used no SecretID is provided, and the
// request is made anonymously.
Expand Down

0 comments on commit 3a4804a

Please sign in to comment.