Skip to content

Commit

Permalink
acl test
Browse files Browse the repository at this point in the history
  • Loading branch information
drewbailey committed Oct 8, 2020
1 parent bde5316 commit 17ebff0
Show file tree
Hide file tree
Showing 2 changed files with 112 additions and 3 deletions.
1 change: 0 additions & 1 deletion nomad/event_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ func (e *Event) stream(conn io.ReadWriteCloser) {
return
}

// TODO(drew) handle streams without ACLS
// authToken is passed to the subscribe request so the event stream
// can handle closing a subscription if the authToken expires.
// If ACLs are disabled, a random token is generated and it will
Expand Down
114 changes: 112 additions & 2 deletions nomad/event_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"time"

"github.com/hashicorp/go-msgpack/codec"
"github.com/hashicorp/nomad/acl"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/stream"
"github.com/hashicorp/nomad/nomad/structs"
Expand Down Expand Up @@ -47,7 +48,7 @@ func TestEventStream(t *testing.T) {
// invoke handler
go handler(p2)

// send request
// decode request responses
go func() {
decoder := codec.NewDecoder(p1, structs.MsgpackHandle)
for {
Expand All @@ -70,6 +71,7 @@ func TestEventStream(t *testing.T) {
node := mock.Node()
publisher.Publish(&structs.Events{Index: uint64(1), Events: []structs.Event{{Topic: "test", Payload: node}}})

// Send request
encoder := codec.NewEncoder(p1, structs.MsgpackHandle)
require.Nil(t, encoder.Encode(req))

Expand Down Expand Up @@ -292,6 +294,114 @@ OUTER:
}
}

// TODO(drew) acl test
func TestEventStream_ACL(t *testing.T) {
t.Parallel()
require := require.New(t)

// start server
s, root, cleanupS := TestACLServer(t, nil)
defer cleanupS()
testutil.WaitForLeader(t, s.RPC)

policyBad := mock.NamespacePolicy("other", "", []string{acl.NamespaceCapabilityReadFS})
tokenBad := mock.CreatePolicyAndToken(t, s.State(), 1005, "invalid", policyBad)

cases := []struct {
Name string
Token string
ExpectedErr string
}{
{
Name: "no token",
Token: "",
ExpectedErr: structs.ErrPermissionDenied.Error(),
},
{
Name: "bad token",
Token: tokenBad.SecretID,
ExpectedErr: structs.ErrPermissionDenied.Error(),
},
{
Name: "root token",
Token: root.SecretID,
ExpectedErr: "subscription closed by server",
},
}

for _, tc := range cases {
t.Run(tc.Name, func(t *testing.T) {
// Create request for all topics and keys
req := structs.EventStreamRequest{
Topics: map[structs.Topic][]string{"*": {"*"}},
QueryOptions: structs.QueryOptions{
Region: s.Region(),
AuthToken: tc.Token,
},
}

handler, err := s.StreamingRpcHandler("Event.Stream")
require.Nil(err)

// create pipe
p1, p2 := net.Pipe()
defer p1.Close()
defer p2.Close()

errCh := make(chan error)
streamMsg := make(chan *structs.EventStreamWrapper)

go handler(p2)

// Start decoder
go func() {
decoder := codec.NewDecoder(p1, structs.MsgpackHandle)
for {
var msg structs.EventStreamWrapper
if err := decoder.Decode(&msg); err != nil {
if err == io.EOF || strings.Contains(err.Error(), "closed") {
return
}
errCh <- fmt.Errorf("error decoding: %w", err)
}

streamMsg <- &msg
}
}()

// send request
encoder := codec.NewEncoder(p1, structs.MsgpackHandle)
require.Nil(encoder.Encode(req))

publisher, err := s.State().EventPublisher()
require.NoError(err)

// publish some events
node := mock.Node()
publisher.Publish(&structs.Events{Index: uint64(1), Events: []structs.Event{{Topic: "test", Payload: node}}})
publisher.Publish(&structs.Events{Index: uint64(2), Events: []structs.Event{{Topic: "test", Payload: node}}})

timeout := time.After(5 * time.Second)
OUTER:
for {
select {
case <-timeout:
require.Fail("timeout waiting for response")
case err := <-errCh:
t.Fatal(err)
case msg := <-streamMsg:
// force error by closing all subscriptions
publisher.CloseAll()
if msg.Error == nil {
continue
}

if strings.Contains(msg.Error.Error(), tc.ExpectedErr) {
break OUTER
} else {
require.Fail("Unexpected error", msg.Error)
}
}
}
})
}
}

0 comments on commit 17ebff0

Please sign in to comment.