Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle ACL errors consistently when blocking query timeout is reached. #20876

Merged
merged 5 commits into from
Mar 22, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .changelog/20876.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:bug
streaming: Handle ACL errors consistently when blocking query timeout is reached.
```
2 changes: 1 addition & 1 deletion agent/consul/stream/event_publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -393,7 +393,7 @@ func (s *subscriptions) closeSubscriptionsForTokens(tokenSecretIDs []string) {
for _, secretID := range tokenSecretIDs {
if subs, ok := s.byToken[secretID]; ok {
for _, sub := range subs {
sub.forceClose()
sub.closeACLChanged()
}
}
}
Expand Down
22 changes: 21 additions & 1 deletion agent/consul/stream/subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,13 @@ const (
// will not return new events.
subStateUnsub = 2

// subStateShutting down indicates the subscription was closed due to
// subStateShuttingDown indicates the subscription was closed due to
// the server being shut down.
subStateShuttingDown = 3

// subStateACLChanged indicates the subscription was closed due to
// a change in ACLs.
subStateACLChanged = 4
)

// ErrSubForceClosed is a error signalling the subscription has been
Expand All @@ -39,6 +43,12 @@ var ErrSubForceClosed = errors.New("subscription closed by server, client must r
// subscribe to a different server to get streaming event updates.
var ErrShuttingDown = errors.New("subscription closed by server, server is shutting down")

// ErrACLChanged is an error to signal that the subscription has
// been closed because a change in ACL token or its associated roles or policies has occurred.
// If the token or policy is no longer valid, the client should resubscribe using a valid token. Otherwise,
// the client should resubscribe using the same token.
var ErrACLChanged = errors.New("subscription closed by server, ACL change occurred")

// Subscription provides events on a Topic. Events may be filtered by Key.
// Events are returned by Next(), and may start with a Snapshot of events.
type Subscription struct {
Expand Down Expand Up @@ -131,6 +141,8 @@ func (s *Subscription) requireStateOpen() error {
return ErrSubForceClosed
case subStateShuttingDown:
return ErrShuttingDown
case subStateACLChanged:
return ErrACLChanged
case subStateUnsub:
return fmt.Errorf("subscription was closed by unsubscribe")
default:
Expand Down Expand Up @@ -166,6 +178,14 @@ func (s *Subscription) shutDown() {
}
}

// Close the subscription and indicate that an ACL change occurred. This change may require
// a client to subscribe with a new token or re-subscribe with an existing token.
func (s *Subscription) closeACLChanged() {
if atomic.CompareAndSwapUint32(&s.state, subStateOpen, subStateACLChanged) {
close(s.closed)
}
}

// Unsubscribe the subscription, freeing resources.
func (s *Subscription) Unsubscribe() {
if atomic.CompareAndSwapUint32(&s.state, subStateOpen, subStateUnsub) {
Expand Down
8 changes: 8 additions & 0 deletions agent/grpc-internal/services/subscribe/subscribe.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,14 @@ func (h *Server) Subscribe(req *pbsubscribe.SubscribeRequest, serverStream pbsub
case errors.Is(err, stream.ErrSubForceClosed):
logger.Trace("subscription reset by server")
return status.Error(codes.Aborted, err.Error())
case errors.Is(err, stream.ErrACLChanged):
logger.Trace("ACL change occurred; re-authenticating")
_, authzErr := h.Backend.ResolveTokenAndDefaultMeta(req.Token, &entMeta, nil)
if authzErr != nil {
return authzErr
}
// Otherwise, abort the stream so the client re-subscribes.
return status.Error(codes.Aborted, err.Error())
case err != nil:
return err
}
Expand Down
56 changes: 51 additions & 5 deletions agent/grpc-internal/services/subscribe/subscribe_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package subscribe
import (
"context"
"errors"
"fmt"
"io"
"net"
"testing"
Expand Down Expand Up @@ -323,17 +324,21 @@ func getEvent(t *testing.T, ch chan eventOrError) *pbsubscribe.Event {
}

type testBackend struct {
publisher *stream.EventPublisher
store *state.Store
authorizer func(token string, entMeta *acl.EnterpriseMeta) acl.Authorizer
forwardConn *gogrpc.ClientConn
publisher *stream.EventPublisher
store *state.Store
authorizer func(token string, entMeta *acl.EnterpriseMeta) acl.Authorizer
resolveTokenAndDefaultMeta func(token string, entMeta *acl.EnterpriseMeta, _ *acl.AuthorizerContext) (acl.Authorizer, error)
forwardConn *gogrpc.ClientConn
}

func (b testBackend) ResolveTokenAndDefaultMeta(
token string,
entMeta *acl.EnterpriseMeta,
_ *acl.AuthorizerContext,
authCtx *acl.AuthorizerContext,
) (acl.Authorizer, error) {
if b.resolveTokenAndDefaultMeta != nil {
return b.resolveTokenAndDefaultMeta(token, entMeta, authCtx)
}
return b.authorizer(token, entMeta), nil
}

Expand Down Expand Up @@ -986,6 +991,47 @@ node "node1" {
t.Fatalf("timeout waiting for aborted error")
}
})

testutil.RunStep(t, "invalid token should return an error", func(t *testing.T) {
// Re-subscribe because the previous test step terminated the stream.
chEvents = make(chan eventOrError, 0)
streamClient := pbsubscribe.NewStateChangeSubscriptionClient(conn)
streamHandle, err := streamClient.Subscribe(ctx, &pbsubscribe.SubscribeRequest{
Topic: pbsubscribe.Topic_ServiceHealth,
Subject: &pbsubscribe.SubscribeRequest_NamedSubject{
NamedSubject: &pbsubscribe.NamedSubject{
Key: "foo",
},
},
Token: token,
})
require.NoError(t, err)

go recvEvents(chEvents, streamHandle)

// Stub out token authn function so that the token is no longer considered valid.
backend.resolveTokenAndDefaultMeta = func(t string, entMeta *acl.EnterpriseMeta, _ *acl.AuthorizerContext) (acl.Authorizer, error) {
return nil, fmt.Errorf("ACL not found")
}

// Force another ACL update.
tokenID, err := uuid.GenerateUUID()
require.NoError(t, err)

aclToken := &structs.ACLToken{
AccessorID: tokenID,
SecretID: token,
}
require.NoError(t, backend.store.ACLTokenSet(ids.Next("update"), aclToken))

select {
case item := <-chEvents:
require.Error(t, item.err, "got event instead of an error: %v", item.event)
require.EqualError(t, item.err, "rpc error: code = Unknown desc = ACL not found")
case <-time.After(2 * time.Second):
t.Fatalf("timeout waiting for ACL not found error")
}
})
}

func assertNoEvents(t *testing.T, chEvents chan eventOrError) {
Expand Down
Loading