diff --git a/.changelog/20876.txt b/.changelog/20876.txt new file mode 100644 index 000000000000..70a2ad468e44 --- /dev/null +++ b/.changelog/20876.txt @@ -0,0 +1,3 @@ +```release-note:bug +streaming: Handle ACL errors consistently when blocking query timeout is reached. +``` diff --git a/agent/consul/state/store_integration_test.go b/agent/consul/state/store_integration_test.go index 25a91c558646..bf51039e2508 100644 --- a/agent/consul/state/store_integration_test.go +++ b/agent/consul/state/store_integration_test.go @@ -72,7 +72,7 @@ func TestStore_IntegrationWithEventPublisher_ACLTokenUpdate(t *testing.T) { // Ensure the reset event was sent. err = assertErr(t, eventCh) - require.Equal(t, stream.ErrSubForceClosed, err) + require.Equal(t, stream.ErrACLChanged, err) // Register another subscription. subscription2 := &stream.SubscribeRequest{ @@ -101,7 +101,7 @@ func TestStore_IntegrationWithEventPublisher_ACLTokenUpdate(t *testing.T) { // Ensure the reset event was sent. err = assertErr(t, eventCh2) - require.Equal(t, stream.ErrSubForceClosed, err) + require.Equal(t, stream.ErrACLChanged, err) } func TestStore_IntegrationWithEventPublisher_ACLPolicyUpdate(t *testing.T) { @@ -191,7 +191,7 @@ func TestStore_IntegrationWithEventPublisher_ACLPolicyUpdate(t *testing.T) { // Ensure the reload event was sent. err = assertErr(t, eventCh) - require.Equal(t, stream.ErrSubForceClosed, err) + require.Equal(t, stream.ErrACLChanged, err) // Register another subscription. subscription3 := &stream.SubscribeRequest{ @@ -381,7 +381,7 @@ func assertReset(t *testing.T, eventCh <-chan nextResult, allowEOS bool) { } } require.Error(t, next.Err) - require.Equal(t, stream.ErrSubForceClosed, next.Err) + require.Equal(t, stream.ErrACLChanged, next.Err) return case <-time.After(100 * time.Millisecond): t.Fatalf("no err after 100ms") diff --git a/agent/consul/stream/event_publisher.go b/agent/consul/stream/event_publisher.go index 04aa08334b25..cf1454bce802 100644 --- a/agent/consul/stream/event_publisher.go +++ b/agent/consul/stream/event_publisher.go @@ -393,7 +393,7 @@ func (s *subscriptions) closeSubscriptionsForTokens(tokenSecretIDs []string) { for _, secretID := range tokenSecretIDs { if subs, ok := s.byToken[secretID]; ok { for _, sub := range subs { - sub.forceClose() + sub.closeACLChanged() } } } diff --git a/agent/consul/stream/subscription.go b/agent/consul/stream/subscription.go index 23911eff2e65..187b2c92d471 100644 --- a/agent/consul/stream/subscription.go +++ b/agent/consul/stream/subscription.go @@ -25,9 +25,13 @@ const ( // will not return new events. subStateUnsub = 2 - // subStateShutting down indicates the subscription was closed due to + // subStateShuttingDown indicates the subscription was closed due to // the server being shut down. subStateShuttingDown = 3 + + // subStateACLChanged indicates the subscription was closed due to + // a change in ACLs. + subStateACLChanged = 4 ) // ErrSubForceClosed is a error signalling the subscription has been @@ -39,6 +43,12 @@ var ErrSubForceClosed = errors.New("subscription closed by server, client must r // subscribe to a different server to get streaming event updates. var ErrShuttingDown = errors.New("subscription closed by server, server is shutting down") +// ErrACLChanged is an error to signal that the subscription has +// been closed because a change in ACL token or its associated roles or policies has occurred. +// If the token or policy is no longer valid, the client should resubscribe using a valid token. Otherwise, +// the client should resubscribe using the same token. +var ErrACLChanged = errors.New("subscription closed by server, ACL change occurred") + // Subscription provides events on a Topic. Events may be filtered by Key. // Events are returned by Next(), and may start with a Snapshot of events. type Subscription struct { @@ -131,6 +141,8 @@ func (s *Subscription) requireStateOpen() error { return ErrSubForceClosed case subStateShuttingDown: return ErrShuttingDown + case subStateACLChanged: + return ErrACLChanged case subStateUnsub: return fmt.Errorf("subscription was closed by unsubscribe") default: @@ -166,6 +178,14 @@ func (s *Subscription) shutDown() { } } +// Close the subscription and indicate that an ACL change occurred. This change may require +// a client to subscribe with a new token or re-subscribe with an existing token. +func (s *Subscription) closeACLChanged() { + if atomic.CompareAndSwapUint32(&s.state, subStateOpen, subStateACLChanged) { + close(s.closed) + } +} + // Unsubscribe the subscription, freeing resources. func (s *Subscription) Unsubscribe() { if atomic.CompareAndSwapUint32(&s.state, subStateOpen, subStateUnsub) { diff --git a/agent/grpc-external/services/connectca/watch_roots.go b/agent/grpc-external/services/connectca/watch_roots.go index ddd02ca56e0f..058fdd41349f 100644 --- a/agent/grpc-external/services/connectca/watch_roots.go +++ b/agent/grpc-external/services/connectca/watch_roots.go @@ -47,7 +47,7 @@ func (s *Server) WatchRoots(_ *pbconnectca.WatchRootsRequest, serverStream pbcon for { var err error idx, err = s.serveRoots(options.Token, idx, serverStream, logger) - if errors.Is(err, stream.ErrSubForceClosed) { + if errors.Is(err, stream.ErrSubForceClosed) || errors.Is(err, stream.ErrACLChanged) { logger.Trace("subscription force-closed due to an ACL change or snapshot restore, will attempt to re-auth and resume") } else { return err @@ -90,7 +90,7 @@ func (s *Server) serveRoots( for { event, err := sub.Next(serverStream.Context()) switch { - case errors.Is(err, stream.ErrSubForceClosed): + case errors.Is(err, stream.ErrSubForceClosed) || errors.Is(err, stream.ErrACLChanged): // If the subscription was closed because the state store was abandoned (e.g. // following a snapshot restore) reset idx to ensure we don't skip over the // new store's events. diff --git a/agent/grpc-external/services/serverdiscovery/watch_servers.go b/agent/grpc-external/services/serverdiscovery/watch_servers.go index 24960336c815..15586111f5b7 100644 --- a/agent/grpc-external/services/serverdiscovery/watch_servers.go +++ b/agent/grpc-external/services/serverdiscovery/watch_servers.go @@ -40,7 +40,7 @@ func (s *Server) WatchServers(req *pbserverdiscovery.WatchServersRequest, server for { var err error idx, err = s.serveReadyServers(options.Token, idx, req, serverStream, logger) - if errors.Is(err, stream.ErrSubForceClosed) { + if errors.Is(err, stream.ErrSubForceClosed) || errors.Is(err, stream.ErrACLChanged) { logger.Trace("subscription force-closed due to an ACL change or snapshot restore, will attempt to re-auth and resume") } else { return err @@ -69,7 +69,7 @@ func (s *Server) serveReadyServers(token string, index uint64, req *pbserverdisc for { event, err := sub.Next(serverStream.Context()) switch { - case errors.Is(err, stream.ErrSubForceClosed): + case errors.Is(err, stream.ErrSubForceClosed) || errors.Is(err, stream.ErrACLChanged): return index, err case errors.Is(err, context.Canceled): return 0, nil diff --git a/agent/grpc-internal/services/subscribe/subscribe.go b/agent/grpc-internal/services/subscribe/subscribe.go index a728b0164c97..5075d9e3dd51 100644 --- a/agent/grpc-internal/services/subscribe/subscribe.go +++ b/agent/grpc-internal/services/subscribe/subscribe.go @@ -77,6 +77,14 @@ func (h *Server) Subscribe(req *pbsubscribe.SubscribeRequest, serverStream pbsub case errors.Is(err, stream.ErrSubForceClosed): logger.Trace("subscription reset by server") return status.Error(codes.Aborted, err.Error()) + case errors.Is(err, stream.ErrACLChanged): + logger.Trace("ACL change occurred; re-authenticating") + _, authzErr := h.Backend.ResolveTokenAndDefaultMeta(req.Token, &entMeta, nil) + if authzErr != nil { + return authzErr + } + // Otherwise, abort the stream so the client re-subscribes. + return status.Error(codes.Aborted, err.Error()) case err != nil: return err } diff --git a/agent/grpc-internal/services/subscribe/subscribe_test.go b/agent/grpc-internal/services/subscribe/subscribe_test.go index 9f6b550cc9a7..a574da3fa825 100644 --- a/agent/grpc-internal/services/subscribe/subscribe_test.go +++ b/agent/grpc-internal/services/subscribe/subscribe_test.go @@ -6,6 +6,7 @@ package subscribe import ( "context" "errors" + "fmt" "io" "net" "testing" @@ -323,17 +324,21 @@ func getEvent(t *testing.T, ch chan eventOrError) *pbsubscribe.Event { } type testBackend struct { - publisher *stream.EventPublisher - store *state.Store - authorizer func(token string, entMeta *acl.EnterpriseMeta) acl.Authorizer - forwardConn *gogrpc.ClientConn + publisher *stream.EventPublisher + store *state.Store + authorizer func(token string, entMeta *acl.EnterpriseMeta) acl.Authorizer + resolveTokenAndDefaultMeta func(token string, entMeta *acl.EnterpriseMeta, _ *acl.AuthorizerContext) (acl.Authorizer, error) + forwardConn *gogrpc.ClientConn } func (b testBackend) ResolveTokenAndDefaultMeta( token string, entMeta *acl.EnterpriseMeta, - _ *acl.AuthorizerContext, + authCtx *acl.AuthorizerContext, ) (acl.Authorizer, error) { + if b.resolveTokenAndDefaultMeta != nil { + return b.resolveTokenAndDefaultMeta(token, entMeta, authCtx) + } return b.authorizer(token, entMeta), nil } @@ -986,6 +991,47 @@ node "node1" { t.Fatalf("timeout waiting for aborted error") } }) + + // Re-subscribe because the previous test step terminated the stream. + chEvents = make(chan eventOrError, 0) + streamClient := pbsubscribe.NewStateChangeSubscriptionClient(conn) + streamHandle, err := streamClient.Subscribe(ctx, &pbsubscribe.SubscribeRequest{ + Topic: pbsubscribe.Topic_ServiceHealth, + Subject: &pbsubscribe.SubscribeRequest_NamedSubject{ + NamedSubject: &pbsubscribe.NamedSubject{ + Key: "foo", + }, + }, + Token: token, + }) + require.NoError(t, err) + + go recvEvents(chEvents, streamHandle) + + // Stub out token authn function so that the token is no longer considered valid. + backend.resolveTokenAndDefaultMeta = func(t string, entMeta *acl.EnterpriseMeta, _ *acl.AuthorizerContext) (acl.Authorizer, error) { + return nil, fmt.Errorf("ACL not found") + } + + testutil.RunStep(t, "invalid token should return an error", func(t *testing.T) { + // Force another ACL update. + tokenID, err := uuid.GenerateUUID() + require.NoError(t, err) + + aclToken := &structs.ACLToken{ + AccessorID: tokenID, + SecretID: token, + } + require.NoError(t, backend.store.ACLTokenSet(ids.Next("update"), aclToken)) + + select { + case item := <-chEvents: + require.Error(t, item.err, "got event instead of an error: %v", item.event) + require.EqualError(t, item.err, "rpc error: code = Unknown desc = ACL not found") + case <-time.After(2 * time.Second): + t.Fatalf("timeout waiting for ACL not found error") + } + }) } func assertNoEvents(t *testing.T, chEvents chan eventOrError) {