From d2211514f820cf166ad89eafd24203ab90ab8849 Mon Sep 17 00:00:00 2001 From: Easwar Swaminathan Date: Fri, 30 Aug 2024 21:39:23 +0000 Subject: [PATCH] xds: keep ads flow control local to xdsclient/transport package --- xds/csds/csds_e2e_test.go | 66 +++++++++---------- .../balancer/cdsbalancer/cluster_watcher.go | 18 ++--- .../clusterresolver/clusterresolver.go | 2 +- .../clusterresolver/resource_resolver.go | 18 ++--- .../clusterresolver/resource_resolver_dns.go | 9 ++- .../clusterresolver/resource_resolver_eds.go | 14 ++-- xds/internal/resolver/watch_service.go | 36 +++++----- xds/internal/server/listener_wrapper.go | 12 ++-- xds/internal/server/rds_handler.go | 12 ++-- xds/internal/testutils/resource_watcher.go | 12 ++-- xds/internal/xdsclient/authority.go | 48 +++++++++----- xds/internal/xdsclient/clientimpl_watchers.go | 4 +- .../tests/ads_stream_flow_control_test.go | 50 +++++++------- .../xdsclient/tests/cds_watchers_test.go | 24 +++---- .../xdsclient/tests/eds_watchers_test.go | 24 +++---- .../xdsclient/tests/lds_watchers_test.go | 24 +++---- .../xdsclient/tests/misc_watchers_test.go | 12 ++-- .../xdsclient/tests/rds_watchers_test.go | 24 +++---- .../xdsclient/transport/loadreport_test.go | 8 +-- xds/internal/xdsclient/transport/transport.go | 55 +++++++--------- .../transport/transport_ack_nack_test.go | 3 +- .../transport/transport_backoff_test.go | 17 +++-- .../xdsclient/transport/transport_new_test.go | 6 +- .../transport/transport_resource_test.go | 7 +- .../xdsclient/transport/transport_test.go | 4 +- .../xdsresource/cluster_resource_type.go | 27 +++----- .../xdsresource/endpoints_resource_type.go | 27 +++----- .../xdsresource/listener_resource_type.go | 27 +++----- .../xdsclient/xdsresource/resource_type.go | 26 +++----- .../xdsresource/route_config_resource_type.go | 27 +++----- 30 files changed, 306 insertions(+), 337 deletions(-) diff --git a/xds/csds/csds_e2e_test.go b/xds/csds/csds_e2e_test.go index 90dd265a6ce1..33e10ba0f53f 100644 --- a/xds/csds/csds_e2e_test.go +++ b/xds/csds/csds_e2e_test.go @@ -70,50 +70,50 @@ func Test(t *testing.T) { type nopListenerWatcher struct{} -func (nopListenerWatcher) OnUpdate(_ *xdsresource.ListenerResourceData, onDone xdsresource.DoneNotifier) { - onDone.OnDone() +func (nopListenerWatcher) OnUpdate(_ *xdsresource.ListenerResourceData, onDone xdsresource.OnDoneFunc) { + onDone() } -func (nopListenerWatcher) OnError(_ error, onDone xdsresource.DoneNotifier) { - onDone.OnDone() +func (nopListenerWatcher) OnError(_ error, onDone xdsresource.OnDoneFunc) { + onDone() } -func (nopListenerWatcher) OnResourceDoesNotExist(onDone xdsresource.DoneNotifier) { - onDone.OnDone() +func (nopListenerWatcher) OnResourceDoesNotExist(onDone xdsresource.OnDoneFunc) { + onDone() } type nopRouteConfigWatcher struct{} -func (nopRouteConfigWatcher) OnUpdate(_ *xdsresource.RouteConfigResourceData, onDone xdsresource.DoneNotifier) { - onDone.OnDone() +func (nopRouteConfigWatcher) OnUpdate(_ *xdsresource.RouteConfigResourceData, onDone xdsresource.OnDoneFunc) { + onDone() } -func (nopRouteConfigWatcher) OnError(_ error, onDone xdsresource.DoneNotifier) { - onDone.OnDone() +func (nopRouteConfigWatcher) OnError(_ error, onDone xdsresource.OnDoneFunc) { + onDone() } -func (nopRouteConfigWatcher) OnResourceDoesNotExist(onDone xdsresource.DoneNotifier) { - onDone.OnDone() +func (nopRouteConfigWatcher) OnResourceDoesNotExist(onDone xdsresource.OnDoneFunc) { + onDone() } type nopClusterWatcher struct{} -func (nopClusterWatcher) OnUpdate(_ *xdsresource.ClusterResourceData, onDone xdsresource.DoneNotifier) { - onDone.OnDone() +func (nopClusterWatcher) OnUpdate(_ *xdsresource.ClusterResourceData, onDone xdsresource.OnDoneFunc) { + onDone() } -func (nopClusterWatcher) OnError(_ error, onDone xdsresource.DoneNotifier) { - onDone.OnDone() +func (nopClusterWatcher) OnError(_ error, onDone xdsresource.OnDoneFunc) { + onDone() } -func (nopClusterWatcher) OnResourceDoesNotExist(onDone xdsresource.DoneNotifier) { - onDone.OnDone() +func (nopClusterWatcher) OnResourceDoesNotExist(onDone xdsresource.OnDoneFunc) { + onDone() } type nopEndpointsWatcher struct{} -func (nopEndpointsWatcher) OnUpdate(_ *xdsresource.EndpointsResourceData, onDone xdsresource.DoneNotifier) { - onDone.OnDone() +func (nopEndpointsWatcher) OnUpdate(_ *xdsresource.EndpointsResourceData, onDone xdsresource.OnDoneFunc) { + onDone() } -func (nopEndpointsWatcher) OnError(_ error, onDone xdsresource.DoneNotifier) { - onDone.OnDone() +func (nopEndpointsWatcher) OnError(_ error, onDone xdsresource.OnDoneFunc) { + onDone() } -func (nopEndpointsWatcher) OnResourceDoesNotExist(onDone xdsresource.DoneNotifier) { - onDone.OnDone() +func (nopEndpointsWatcher) OnResourceDoesNotExist(onDone xdsresource.OnDoneFunc) { + onDone() } // This watcher writes the onDone callback on to a channel for the test to @@ -126,31 +126,31 @@ func (nopEndpointsWatcher) OnResourceDoesNotExist(onDone xdsresource.DoneNotifie // for ADS stream level flow control), and was causing CSDS to not receive any // updates from the xDS client. type blockingListenerWatcher struct { - testCtxDone <-chan struct{} // Closed when the test is done. - onDoneCh chan xdsresource.DoneNotifier // Channel to write the onDone callback to. + testCtxDone <-chan struct{} // Closed when the test is done. + onDoneCh chan xdsresource.OnDoneFunc // Channel to write the onDone callback to. } func newBlockingListenerWatcher(testCtxDone <-chan struct{}) *blockingListenerWatcher { return &blockingListenerWatcher{ testCtxDone: testCtxDone, - onDoneCh: make(chan xdsresource.DoneNotifier, 1), + onDoneCh: make(chan xdsresource.OnDoneFunc, 1), } } -func (w *blockingListenerWatcher) OnUpdate(_ *xdsresource.ListenerResourceData, onDone xdsresource.DoneNotifier) { +func (w *blockingListenerWatcher) OnUpdate(_ *xdsresource.ListenerResourceData, onDone xdsresource.OnDoneFunc) { writeOnDone(w.testCtxDone, w.onDoneCh, onDone) } -func (w *blockingListenerWatcher) OnError(_ error, onDone xdsresource.DoneNotifier) { +func (w *blockingListenerWatcher) OnError(_ error, onDone xdsresource.OnDoneFunc) { writeOnDone(w.testCtxDone, w.onDoneCh, onDone) } -func (w *blockingListenerWatcher) OnResourceDoesNotExist(onDone xdsresource.DoneNotifier) { +func (w *blockingListenerWatcher) OnResourceDoesNotExist(onDone xdsresource.OnDoneFunc) { writeOnDone(w.testCtxDone, w.onDoneCh, onDone) } // writeOnDone attempts to writes the onDone callback on the onDone channel. It // returns when it can successfully write to the channel or when the test is // done, which is signalled by testCtxDone being closed. -func writeOnDone(testCtxDone <-chan struct{}, onDoneCh chan xdsresource.DoneNotifier, onDone xdsresource.DoneNotifier) { +func writeOnDone(testCtxDone <-chan struct{}, onDoneCh chan xdsresource.OnDoneFunc, onDone xdsresource.OnDoneFunc) { select { case <-testCtxDone: case onDoneCh <- onDone: @@ -545,7 +545,7 @@ func (s) TestCSDS_NACK(t *testing.T) { case <-ctx.Done(): t.Fatal("Timed out waiting for watch callback") case onDone := <-watcher2.onDoneCh: - onDone.OnDone() + onDone() } // Update the second resource with an empty ApiListener field which is @@ -564,7 +564,7 @@ func (s) TestCSDS_NACK(t *testing.T) { case <-ctx.Done(): t.Fatal("Timed out waiting for watch callback") case onDone := <-watcher2.onDoneCh: - onDone.OnDone() + onDone() } // Verify that the xDS client reports the first listener resource as being diff --git a/xds/internal/balancer/cdsbalancer/cluster_watcher.go b/xds/internal/balancer/cdsbalancer/cluster_watcher.go index 39dce3d56a2a..835461d0997b 100644 --- a/xds/internal/balancer/cdsbalancer/cluster_watcher.go +++ b/xds/internal/balancer/cdsbalancer/cluster_watcher.go @@ -32,19 +32,19 @@ type clusterWatcher struct { parent *cdsBalancer } -func (cw *clusterWatcher) OnUpdate(u *xdsresource.ClusterResourceData, onDone xdsresource.DoneNotifier) { - handleUpdate := func(context.Context) { cw.parent.onClusterUpdate(cw.name, u.Resource); onDone.OnDone() } - cw.parent.serializer.ScheduleOr(handleUpdate, onDone.OnDone) +func (cw *clusterWatcher) OnUpdate(u *xdsresource.ClusterResourceData, onDone xdsresource.OnDoneFunc) { + handleUpdate := func(context.Context) { cw.parent.onClusterUpdate(cw.name, u.Resource); onDone() } + cw.parent.serializer.ScheduleOr(handleUpdate, onDone) } -func (cw *clusterWatcher) OnError(err error, onDone xdsresource.DoneNotifier) { - handleError := func(context.Context) { cw.parent.onClusterError(cw.name, err); onDone.OnDone() } - cw.parent.serializer.ScheduleOr(handleError, onDone.OnDone) +func (cw *clusterWatcher) OnError(err error, onDone xdsresource.OnDoneFunc) { + handleError := func(context.Context) { cw.parent.onClusterError(cw.name, err); onDone() } + cw.parent.serializer.ScheduleOr(handleError, onDone) } -func (cw *clusterWatcher) OnResourceDoesNotExist(onDone xdsresource.DoneNotifier) { - handleNotFound := func(context.Context) { cw.parent.onClusterResourceNotFound(cw.name); onDone.OnDone() } - cw.parent.serializer.ScheduleOr(handleNotFound, onDone.OnDone) +func (cw *clusterWatcher) OnResourceDoesNotExist(onDone xdsresource.OnDoneFunc) { + handleNotFound := func(context.Context) { cw.parent.onClusterResourceNotFound(cw.name); onDone() } + cw.parent.serializer.ScheduleOr(handleNotFound, onDone) } // watcherState groups the state associated with a clusterWatcher. diff --git a/xds/internal/balancer/clusterresolver/clusterresolver.go b/xds/internal/balancer/clusterresolver/clusterresolver.go index 75bb847fb18d..749945059b88 100644 --- a/xds/internal/balancer/clusterresolver/clusterresolver.go +++ b/xds/internal/balancer/clusterresolver/clusterresolver.go @@ -216,7 +216,7 @@ func (b *clusterResolverBalancer) handleResourceUpdate(update *resourceUpdate) { b.updateChildConfig() if update.onDone != nil { - update.onDone.OnDone() + update.onDone() } } diff --git a/xds/internal/balancer/clusterresolver/resource_resolver.go b/xds/internal/balancer/clusterresolver/resource_resolver.go index 37287913c087..5bc64b86305c 100644 --- a/xds/internal/balancer/clusterresolver/resource_resolver.go +++ b/xds/internal/balancer/clusterresolver/resource_resolver.go @@ -37,7 +37,7 @@ type resourceUpdate struct { priorities []priorityConfig // To be invoked once the update is completely processed, or is dropped in // favor of a newer update. - onDone xdsresource.DoneNotifier + onDone xdsresource.OnDoneFunc } // topLevelResolver is used by concrete endpointsResolver implementations for @@ -49,7 +49,7 @@ type topLevelResolver interface { // endpointsResolver implementation. The onDone callback is to be invoked // once the update is completely processed, or is dropped in favor of a // newer update. - onUpdate(onDone xdsresource.DoneNotifier) + onUpdate(onDone xdsresource.OnDoneFunc) } // endpointsResolver wraps the functionality to resolve a given resource name to @@ -215,7 +215,7 @@ func (rr *resourceResolver) updateMechanisms(mechanisms []DiscoveryMechanism) { } // Regenerate even if there's no change in discovery mechanism, in case // priority order changed. - rr.generateLocked(xdsresource.NopDoneNotifier{}) + rr.generateLocked(func() {}) } // resolveNow is typically called to trigger re-resolve of DNS. The EDS @@ -264,7 +264,7 @@ func (rr *resourceResolver) stop(closing bool) { select { case ru := <-rr.updateChannel: if ru.onDone != nil { - ru.onDone.OnDone() + ru.onDone() } default: } @@ -281,14 +281,14 @@ func (rr *resourceResolver) stop(closing bool) { // clusterresolver LB policy. // // Caller must hold rr.mu. -func (rr *resourceResolver) generateLocked(onDone xdsresource.DoneNotifier) { +func (rr *resourceResolver) generateLocked(onDone xdsresource.OnDoneFunc) { var ret []priorityConfig for _, rDM := range rr.children { u, ok := rDM.r.lastUpdate() if !ok { // Don't send updates to parent until all resolvers have update to // send. - onDone.OnDone() + onDone() return } switch uu := u.(type) { @@ -304,18 +304,18 @@ func (rr *resourceResolver) generateLocked(onDone xdsresource.DoneNotifier) { // receive path. case ru := <-rr.updateChannel: if ru.onDone != nil { - ru.onDone.OnDone() + ru.onDone() } default: } rr.updateChannel <- &resourceUpdate{priorities: ret, onDone: onDone} } -func (rr *resourceResolver) onUpdate(onDone xdsresource.DoneNotifier) { +func (rr *resourceResolver) onUpdate(onDone xdsresource.OnDoneFunc) { handleUpdate := func(context.Context) { rr.mu.Lock() rr.generateLocked(onDone) rr.mu.Unlock() } - rr.serializer.ScheduleOr(handleUpdate, func() { onDone.OnDone() }) + rr.serializer.ScheduleOr(handleUpdate, func() { onDone() }) } diff --git a/xds/internal/balancer/clusterresolver/resource_resolver_dns.go b/xds/internal/balancer/clusterresolver/resource_resolver_dns.go index b22810e22080..cfc871d3b59d 100644 --- a/xds/internal/balancer/clusterresolver/resource_resolver_dns.go +++ b/xds/internal/balancer/clusterresolver/resource_resolver_dns.go @@ -27,7 +27,6 @@ import ( "google.golang.org/grpc/internal/pretty" "google.golang.org/grpc/resolver" "google.golang.org/grpc/serviceconfig" - "google.golang.org/grpc/xds/internal/xdsclient/xdsresource" ) var ( @@ -80,7 +79,7 @@ func newDNSResolver(target string, topLevelResolver topLevelResolver, logger *gr ret.logger.Infof("Failed to parse dns hostname %q in clusterresolver LB policy", target) } ret.updateReceived = true - ret.topLevelResolver.onUpdate(xdsresource.NopDoneNotifier{}) + ret.topLevelResolver.onUpdate(func() {}) return ret } @@ -90,7 +89,7 @@ func newDNSResolver(target string, topLevelResolver topLevelResolver, logger *gr ret.logger.Infof("Failed to build DNS resolver for target %q: %v", target, err) } ret.updateReceived = true - ret.topLevelResolver.onUpdate(xdsresource.NopDoneNotifier{}) + ret.topLevelResolver.onUpdate(func() {}) return ret } ret.dnsR = r @@ -154,7 +153,7 @@ func (dr *dnsDiscoveryMechanism) UpdateState(state resolver.State) error { dr.updateReceived = true dr.mu.Unlock() - dr.topLevelResolver.onUpdate(xdsresource.NopDoneNotifier{}) + dr.topLevelResolver.onUpdate(func() {}) return nil } @@ -177,7 +176,7 @@ func (dr *dnsDiscoveryMechanism) ReportError(err error) { dr.updateReceived = true dr.mu.Unlock() - dr.topLevelResolver.onUpdate(xdsresource.NopDoneNotifier{}) + dr.topLevelResolver.onUpdate(func() {}) } func (dr *dnsDiscoveryMechanism) NewAddress(addresses []resolver.Address) { diff --git a/xds/internal/balancer/clusterresolver/resource_resolver_eds.go b/xds/internal/balancer/clusterresolver/resource_resolver_eds.go index 16192045815c..ddb949019ee5 100644 --- a/xds/internal/balancer/clusterresolver/resource_resolver_eds.go +++ b/xds/internal/balancer/clusterresolver/resource_resolver_eds.go @@ -76,9 +76,9 @@ func newEDSResolver(nameToWatch string, producer xdsresource.Producer, topLevelR } // OnUpdate is invoked to report an update for the resource being watched. -func (er *edsDiscoveryMechanism) OnUpdate(update *xdsresource.EndpointsResourceData, onDone xdsresource.DoneNotifier) { +func (er *edsDiscoveryMechanism) OnUpdate(update *xdsresource.EndpointsResourceData, onDone xdsresource.OnDoneFunc) { if er.stopped.HasFired() { - onDone.OnDone() + onDone() return } @@ -89,9 +89,9 @@ func (er *edsDiscoveryMechanism) OnUpdate(update *xdsresource.EndpointsResourceD er.topLevelResolver.onUpdate(onDone) } -func (er *edsDiscoveryMechanism) OnError(err error, onDone xdsresource.DoneNotifier) { +func (er *edsDiscoveryMechanism) OnError(err error, onDone xdsresource.OnDoneFunc) { if er.stopped.HasFired() { - onDone.OnDone() + onDone() return } @@ -104,7 +104,7 @@ func (er *edsDiscoveryMechanism) OnError(err error, onDone xdsresource.DoneNotif // Continue using a previously received good configuration if one // exists. er.mu.Unlock() - onDone.OnDone() + onDone() return } @@ -120,9 +120,9 @@ func (er *edsDiscoveryMechanism) OnError(err error, onDone xdsresource.DoneNotif er.topLevelResolver.onUpdate(onDone) } -func (er *edsDiscoveryMechanism) OnResourceDoesNotExist(onDone xdsresource.DoneNotifier) { +func (er *edsDiscoveryMechanism) OnResourceDoesNotExist(onDone xdsresource.OnDoneFunc) { if er.stopped.HasFired() { - onDone.OnDone() + onDone() return } diff --git a/xds/internal/resolver/watch_service.go b/xds/internal/resolver/watch_service.go index b64f40c03939..0de6604484b1 100644 --- a/xds/internal/resolver/watch_service.go +++ b/xds/internal/resolver/watch_service.go @@ -36,19 +36,19 @@ func newListenerWatcher(resourceName string, parent *xdsResolver) *listenerWatch return lw } -func (l *listenerWatcher) OnUpdate(update *xdsresource.ListenerResourceData, onDone xdsresource.DoneNotifier) { - handleUpdate := func(context.Context) { l.parent.onListenerResourceUpdate(update.Resource); onDone.OnDone() } - l.parent.serializer.ScheduleOr(handleUpdate, onDone.OnDone) +func (l *listenerWatcher) OnUpdate(update *xdsresource.ListenerResourceData, onDone xdsresource.OnDoneFunc) { + handleUpdate := func(context.Context) { l.parent.onListenerResourceUpdate(update.Resource); onDone() } + l.parent.serializer.ScheduleOr(handleUpdate, onDone) } -func (l *listenerWatcher) OnError(err error, onDone xdsresource.DoneNotifier) { - handleError := func(context.Context) { l.parent.onListenerResourceError(err); onDone.OnDone() } - l.parent.serializer.ScheduleOr(handleError, onDone.OnDone) +func (l *listenerWatcher) OnError(err error, onDone xdsresource.OnDoneFunc) { + handleError := func(context.Context) { l.parent.onListenerResourceError(err); onDone() } + l.parent.serializer.ScheduleOr(handleError, onDone) } -func (l *listenerWatcher) OnResourceDoesNotExist(onDone xdsresource.DoneNotifier) { - handleNotFound := func(context.Context) { l.parent.onListenerResourceNotFound(); onDone.OnDone() } - l.parent.serializer.ScheduleOr(handleNotFound, onDone.OnDone) +func (l *listenerWatcher) OnResourceDoesNotExist(onDone xdsresource.OnDoneFunc) { + handleNotFound := func(context.Context) { l.parent.onListenerResourceNotFound(); onDone() } + l.parent.serializer.ScheduleOr(handleNotFound, onDone) } func (l *listenerWatcher) stop() { @@ -68,22 +68,22 @@ func newRouteConfigWatcher(resourceName string, parent *xdsResolver) *routeConfi return rw } -func (r *routeConfigWatcher) OnUpdate(u *xdsresource.RouteConfigResourceData, onDone xdsresource.DoneNotifier) { +func (r *routeConfigWatcher) OnUpdate(u *xdsresource.RouteConfigResourceData, onDone xdsresource.OnDoneFunc) { handleUpdate := func(context.Context) { r.parent.onRouteConfigResourceUpdate(r.resourceName, u.Resource) - onDone.OnDone() + onDone() } - r.parent.serializer.ScheduleOr(handleUpdate, onDone.OnDone) + r.parent.serializer.ScheduleOr(handleUpdate, onDone) } -func (r *routeConfigWatcher) OnError(err error, onDone xdsresource.DoneNotifier) { - handleError := func(context.Context) { r.parent.onRouteConfigResourceError(r.resourceName, err); onDone.OnDone() } - r.parent.serializer.ScheduleOr(handleError, onDone.OnDone) +func (r *routeConfigWatcher) OnError(err error, onDone xdsresource.OnDoneFunc) { + handleError := func(context.Context) { r.parent.onRouteConfigResourceError(r.resourceName, err); onDone() } + r.parent.serializer.ScheduleOr(handleError, onDone) } -func (r *routeConfigWatcher) OnResourceDoesNotExist(onDone xdsresource.DoneNotifier) { - handleNotFound := func(context.Context) { r.parent.onRouteConfigResourceNotFound(r.resourceName); onDone.OnDone() } - r.parent.serializer.ScheduleOr(handleNotFound, onDone.OnDone) +func (r *routeConfigWatcher) OnResourceDoesNotExist(onDone xdsresource.OnDoneFunc) { + handleNotFound := func(context.Context) { r.parent.onRouteConfigResourceNotFound(r.resourceName); onDone() } + r.parent.serializer.ScheduleOr(handleNotFound, onDone) } func (r *routeConfigWatcher) stop() { diff --git a/xds/internal/server/listener_wrapper.go b/xds/internal/server/listener_wrapper.go index cdbc897f1bc7..e2246f41afec 100644 --- a/xds/internal/server/listener_wrapper.go +++ b/xds/internal/server/listener_wrapper.go @@ -410,8 +410,8 @@ type ldsWatcher struct { name string } -func (lw *ldsWatcher) OnUpdate(update *xdsresource.ListenerResourceData, onDone xdsresource.DoneNotifier) { - defer onDone.OnDone() +func (lw *ldsWatcher) OnUpdate(update *xdsresource.ListenerResourceData, onDone xdsresource.OnDoneFunc) { + defer onDone() if lw.parent.closed.HasFired() { lw.logger.Warningf("Resource %q received update: %#v after listener was closed", lw.name, update) return @@ -422,8 +422,8 @@ func (lw *ldsWatcher) OnUpdate(update *xdsresource.ListenerResourceData, onDone lw.parent.handleLDSUpdate(update.Resource) } -func (lw *ldsWatcher) OnError(err error, onDone xdsresource.DoneNotifier) { - defer onDone.OnDone() +func (lw *ldsWatcher) OnError(err error, onDone xdsresource.OnDoneFunc) { + defer onDone() if lw.parent.closed.HasFired() { lw.logger.Warningf("Resource %q received error: %v after listener was closed", lw.name, err) return @@ -435,8 +435,8 @@ func (lw *ldsWatcher) OnError(err error, onDone xdsresource.DoneNotifier) { // continue to use the old configuration. } -func (lw *ldsWatcher) OnResourceDoesNotExist(onDone xdsresource.DoneNotifier) { - defer onDone.OnDone() +func (lw *ldsWatcher) OnResourceDoesNotExist(onDone xdsresource.OnDoneFunc) { + defer onDone() if lw.parent.closed.HasFired() { lw.logger.Warningf("Resource %q received resource-does-not-exist error after listener was closed", lw.name) return diff --git a/xds/internal/server/rds_handler.go b/xds/internal/server/rds_handler.go index 069db8e5d3d2..bcd3938e6f1a 100644 --- a/xds/internal/server/rds_handler.go +++ b/xds/internal/server/rds_handler.go @@ -147,8 +147,8 @@ type rdsWatcher struct { canceled bool // eats callbacks if true } -func (rw *rdsWatcher) OnUpdate(update *xdsresource.RouteConfigResourceData, onDone xdsresource.DoneNotifier) { - defer onDone.OnDone() +func (rw *rdsWatcher) OnUpdate(update *xdsresource.RouteConfigResourceData, onDone xdsresource.OnDoneFunc) { + defer onDone() rw.mu.Lock() if rw.canceled { rw.mu.Unlock() @@ -161,8 +161,8 @@ func (rw *rdsWatcher) OnUpdate(update *xdsresource.RouteConfigResourceData, onDo rw.parent.handleRouteUpdate(rw.routeName, rdsWatcherUpdate{data: &update.Resource}) } -func (rw *rdsWatcher) OnError(err error, onDone xdsresource.DoneNotifier) { - defer onDone.OnDone() +func (rw *rdsWatcher) OnError(err error, onDone xdsresource.OnDoneFunc) { + defer onDone() rw.mu.Lock() if rw.canceled { rw.mu.Unlock() @@ -175,8 +175,8 @@ func (rw *rdsWatcher) OnError(err error, onDone xdsresource.DoneNotifier) { rw.parent.handleRouteUpdate(rw.routeName, rdsWatcherUpdate{err: err}) } -func (rw *rdsWatcher) OnResourceDoesNotExist(onDone xdsresource.DoneNotifier) { - defer onDone.OnDone() +func (rw *rdsWatcher) OnResourceDoesNotExist(onDone xdsresource.OnDoneFunc) { + defer onDone() rw.mu.Lock() if rw.canceled { rw.mu.Unlock() diff --git a/xds/internal/testutils/resource_watcher.go b/xds/internal/testutils/resource_watcher.go index 0bb3e6586adf..dae72e2a7733 100644 --- a/xds/internal/testutils/resource_watcher.go +++ b/xds/internal/testutils/resource_watcher.go @@ -37,8 +37,8 @@ type TestResourceWatcher struct { // OnUpdate is invoked by the xDS client to report the latest update on the resource // being watched. -func (w *TestResourceWatcher) OnUpdate(data xdsresource.ResourceData, onDone xdsresource.DoneNotifier) { - defer onDone.OnDone() +func (w *TestResourceWatcher) OnUpdate(data xdsresource.ResourceData, onDone xdsresource.OnDoneFunc) { + defer onDone() select { case <-w.UpdateCh: default: @@ -47,8 +47,8 @@ func (w *TestResourceWatcher) OnUpdate(data xdsresource.ResourceData, onDone xds } // OnError is invoked by the xDS client to report the latest error. -func (w *TestResourceWatcher) OnError(err error, onDone xdsresource.DoneNotifier) { - defer onDone.OnDone() +func (w *TestResourceWatcher) OnError(err error, onDone xdsresource.OnDoneFunc) { + defer onDone() select { case <-w.ErrorCh: default: @@ -58,8 +58,8 @@ func (w *TestResourceWatcher) OnError(err error, onDone xdsresource.DoneNotifier // OnResourceDoesNotExist is used by the xDS client to report that the resource // being watched no longer exists. -func (w *TestResourceWatcher) OnResourceDoesNotExist(onDone xdsresource.DoneNotifier) { - defer onDone.OnDone() +func (w *TestResourceWatcher) OnResourceDoesNotExist(onDone xdsresource.OnDoneFunc) { + defer onDone() select { case <-w.ResourceDoesNotExistCh: default: diff --git a/xds/internal/xdsclient/authority.go b/xds/internal/xdsclient/authority.go index 0e0a4d901316..3251737f181e 100644 --- a/xds/internal/xdsclient/authority.go +++ b/xds/internal/xdsclient/authority.go @@ -23,6 +23,7 @@ import ( "fmt" "strings" "sync" + "sync/atomic" "time" "google.golang.org/grpc/internal/grpclog" @@ -148,7 +149,7 @@ func (a *authority) transportOnSendHandler(u *transport.ResourceSendInfo) { a.startWatchTimersLocked(rType, u.ResourceNames) } -func (a *authority) handleResourceUpdate(resourceUpdate transport.ResourceUpdate, fc *transport.ADSFlowControl) error { +func (a *authority) handleResourceUpdate(resourceUpdate transport.ResourceUpdate, onDone func()) error { rType := a.resourceTypeGetter(resourceUpdate.URL) if rType == nil { return xdsresource.NewErrorf(xdsresource.ErrorTypeResourceTypeUnsupported, "Resource URL %v unknown in response from server", resourceUpdate.URL) @@ -159,24 +160,37 @@ func (a *authority) handleResourceUpdate(resourceUpdate transport.ResourceUpdate ServerConfig: a.serverCfg, } updates, md, err := decodeAllResources(opts, rType, resourceUpdate) - a.updateResourceStateAndScheduleCallbacks(rType, updates, md, fc) + a.updateResourceStateAndScheduleCallbacks(rType, updates, md, onDone) return err } -func (a *authority) updateResourceStateAndScheduleCallbacks(rType xdsresource.Type, updates map[string]resourceDataErrTuple, md xdsresource.UpdateMetadata, fc *transport.ADSFlowControl) { +func (a *authority) updateResourceStateAndScheduleCallbacks(rType xdsresource.Type, updates map[string]resourceDataErrTuple, md xdsresource.UpdateMetadata, onDone func()) { a.resourcesMu.Lock() defer a.resourcesMu.Unlock() // We build a list of callback funcs to invoke, and invoke them at the end // of this method instead of inline (when handling the update for a // particular resource), because we want to make sure that all calls to - // `fc.Add` happen before any callbacks are invoked. This will ensure that - // the next read is never attempted before all callbacks are invoked, and - // the watchers have processed the update. + // increment watcherCnt happen before any callbacks are invoked. This will + // ensure that the onDone callback is never invoked before all watcher + // callbacks are invoked, and the watchers have processed the update. + watcherCnt := new(atomic.Int64) + done := func() { + watcherCnt.Add(-1) + if watcherCnt.Load() == 0 { + onDone() + } + } funcsToSchedule := []func(context.Context){} defer func() { + if len(funcsToSchedule) == 0 { + // When there are no watchers for the resources received as part of + // this update, invoke onDone explicitly to unblock the next read on + // the ADS stream. + onDone() + } for _, f := range funcsToSchedule { - a.serializer.ScheduleOr(f, fc.OnDone) + a.serializer.ScheduleOr(f, onDone) } }() @@ -223,8 +237,8 @@ func (a *authority) updateResourceStateAndScheduleCallbacks(rType xdsresource.Ty for watcher := range state.watchers { watcher := watcher err := uErr.err - fc.Add() - funcsToSchedule = append(funcsToSchedule, func(context.Context) { watcher.OnError(err, fc) }) + watcherCnt.Add(1) + funcsToSchedule = append(funcsToSchedule, func(context.Context) { watcher.OnError(err, done) }) } continue } @@ -239,8 +253,8 @@ func (a *authority) updateResourceStateAndScheduleCallbacks(rType xdsresource.Ty for watcher := range state.watchers { watcher := watcher resource := uErr.resource - fc.Add() - funcsToSchedule = append(funcsToSchedule, func(context.Context) { watcher.OnUpdate(resource, fc) }) + watcherCnt.Add(1) + funcsToSchedule = append(funcsToSchedule, func(context.Context) { watcher.OnUpdate(resource, done) }) } } // Sync cache. @@ -315,8 +329,8 @@ func (a *authority) updateResourceStateAndScheduleCallbacks(rType xdsresource.Ty state.md = xdsresource.UpdateMetadata{Status: xdsresource.ServiceStatusNotExist} for watcher := range state.watchers { watcher := watcher - fc.Add() - funcsToSchedule = append(funcsToSchedule, func(context.Context) { watcher.OnResourceDoesNotExist(fc) }) + watcherCnt.Add(1) + funcsToSchedule = append(funcsToSchedule, func(context.Context) { watcher.OnResourceDoesNotExist(done) }) } } } @@ -445,7 +459,7 @@ func (a *authority) newConnectionError(err error) { for watcher := range state.watchers { watcher := watcher a.serializer.TrySchedule(func(context.Context) { - watcher.OnError(xdsresource.NewErrorf(xdsresource.ErrorTypeConnection, "xds: error received from xDS stream: %v", err), xdsresource.NopDoneNotifier{}) + watcher.OnError(xdsresource.NewErrorf(xdsresource.ErrorTypeConnection, "xds: error received from xDS stream: %v", err), func() {}) }) } } @@ -511,7 +525,7 @@ func (a *authority) watchResource(rType xdsresource.Type, resourceName string, w a.logger.Infof("Resource type %q with resource name %q found in cache: %s", rType.TypeName(), resourceName, state.cache.ToJSON()) } resource := state.cache - a.serializer.TrySchedule(func(context.Context) { watcher.OnUpdate(resource, xdsresource.NopDoneNotifier{}) }) + a.serializer.TrySchedule(func(context.Context) { watcher.OnUpdate(resource, func() {}) }) } return func() { @@ -564,7 +578,7 @@ func (a *authority) handleWatchTimerExpiryLocked(rType xdsresource.Type, resourc state.md = xdsresource.UpdateMetadata{Status: xdsresource.ServiceStatusNotExist} for watcher := range state.watchers { watcher := watcher - a.serializer.TrySchedule(func(context.Context) { watcher.OnResourceDoesNotExist(xdsresource.NopDoneNotifier{}) }) + a.serializer.TrySchedule(func(context.Context) { watcher.OnResourceDoesNotExist(func() {}) }) } } @@ -590,7 +604,7 @@ func (a *authority) triggerResourceNotFoundForTesting(rType xdsresource.Type, re state.md = xdsresource.UpdateMetadata{Status: xdsresource.ServiceStatusNotExist} for watcher := range state.watchers { watcher := watcher - a.serializer.TrySchedule(func(context.Context) { watcher.OnResourceDoesNotExist(xdsresource.NopDoneNotifier{}) }) + a.serializer.TrySchedule(func(context.Context) { watcher.OnResourceDoesNotExist(func() {}) }) } } diff --git a/xds/internal/xdsclient/clientimpl_watchers.go b/xds/internal/xdsclient/clientimpl_watchers.go index 7a5dddfd2b8b..b9af85db63a8 100644 --- a/xds/internal/xdsclient/clientimpl_watchers.go +++ b/xds/internal/xdsclient/clientimpl_watchers.go @@ -44,7 +44,7 @@ func (c *clientImpl) WatchResource(rType xdsresource.Type, resourceName string, if err := c.resourceTypes.maybeRegister(rType); err != nil { logger.Warningf("Watch registered for name %q of type %q which is already registered", rType.TypeName(), resourceName) - c.serializer.TrySchedule(func(context.Context) { watcher.OnError(err, xdsresource.NopDoneNotifier{}) }) + c.serializer.TrySchedule(func(context.Context) { watcher.OnError(err, func() {}) }) return func() {} } @@ -54,7 +54,7 @@ func (c *clientImpl) WatchResource(rType xdsresource.Type, resourceName string, a, unref, err := c.findAuthority(n) if err != nil { logger.Warningf("Watch registered for name %q of type %q, authority %q is not found", rType.TypeName(), resourceName, n.Authority) - c.serializer.TrySchedule(func(context.Context) { watcher.OnError(err, xdsresource.NopDoneNotifier{}) }) + c.serializer.TrySchedule(func(context.Context) { watcher.OnError(err, func() {}) }) return func() {} } cancelF := a.watchResource(rType, n.String(), watcher) diff --git a/xds/internal/xdsclient/tests/ads_stream_flow_control_test.go b/xds/internal/xdsclient/tests/ads_stream_flow_control_test.go index 02aec63c514f..d6cc154d0b5c 100644 --- a/xds/internal/xdsclient/tests/ads_stream_flow_control_test.go +++ b/xds/internal/xdsclient/tests/ads_stream_flow_control_test.go @@ -45,22 +45,22 @@ import ( // DoneNotifier passed to the callback available to the test, thereby enabling // the test to block this watcher for as long as required. type blockingListenerWatcher struct { - doneNotifierCh chan xdsresource.DoneNotifier // DoneNotifier passed to the callback. - updateCh chan struct{} // Written to when an update is received. - errorCh chan struct{} // Written to when an error is received. - notFoundCh chan struct{} // Written to when the resource is not found. + doneNotifierCh chan xdsresource.OnDoneFunc // DoneNotifier passed to the callback. + updateCh chan struct{} // Written to when an update is received. + errorCh chan struct{} // Written to when an error is received. + notFoundCh chan struct{} // Written to when the resource is not found. } func newBLockingListenerWatcher() *blockingListenerWatcher { return &blockingListenerWatcher{ - doneNotifierCh: make(chan xdsresource.DoneNotifier, 1), + doneNotifierCh: make(chan xdsresource.OnDoneFunc, 1), updateCh: make(chan struct{}, 1), errorCh: make(chan struct{}, 1), notFoundCh: make(chan struct{}, 1), } } -func (lw *blockingListenerWatcher) OnUpdate(update *xdsresource.ListenerResourceData, done xdsresource.DoneNotifier) { +func (lw *blockingListenerWatcher) OnUpdate(update *xdsresource.ListenerResourceData, done xdsresource.OnDoneFunc) { // Notify receipt of the update. select { case lw.updateCh <- struct{}{}: @@ -73,7 +73,7 @@ func (lw *blockingListenerWatcher) OnUpdate(update *xdsresource.ListenerResource } } -func (lw *blockingListenerWatcher) OnError(err error, done xdsresource.DoneNotifier) { +func (lw *blockingListenerWatcher) OnError(err error, done xdsresource.OnDoneFunc) { // Notify receipt of an error. select { case lw.errorCh <- struct{}{}: @@ -86,7 +86,7 @@ func (lw *blockingListenerWatcher) OnError(err error, done xdsresource.DoneNotif } } -func (lw *blockingListenerWatcher) OnResourceDoesNotExist(done xdsresource.DoneNotifier) { +func (lw *blockingListenerWatcher) OnResourceDoesNotExist(done xdsresource.OnDoneFunc) { // Notify receipt of resource not found. select { case lw.notFoundCh <- struct{}{}: @@ -250,8 +250,8 @@ func (s) TestADSFlowControl_ResourceUpdates_SingleResource(t *testing.T) { } // Unblock one watcher. - done := <-watcher1.doneNotifierCh - done.OnDone() + onDone := <-watcher1.doneNotifierCh + onDone() // Wait for a short duration and ensure that there is no read on the stream. select { @@ -261,8 +261,8 @@ func (s) TestADSFlowControl_ResourceUpdates_SingleResource(t *testing.T) { } // Unblock the second watcher. - done = <-watcher2.doneNotifierCh - done.OnDone() + onDone = <-watcher2.doneNotifierCh + onDone() // Ensure that there is a read on the stream, now that the previous update // has been consumed by all watchers. @@ -394,16 +394,16 @@ func (s) TestADSFlowControl_ResourceUpdates_MultipleResources(t *testing.T) { // guaranteed. So, we select on both of them and unblock the first watcher // whose callback is invoked. var otherWatcherUpdateCh chan struct{} - var otherWatcherDoneCh chan xdsresource.DoneNotifier + var otherWatcherDoneCh chan xdsresource.OnDoneFunc select { case <-watcher1.updateCh: - done := <-watcher1.doneNotifierCh - done.OnDone() + onDone := <-watcher1.doneNotifierCh + onDone() otherWatcherUpdateCh = watcher2.updateCh otherWatcherDoneCh = watcher2.doneNotifierCh case <-watcher2.updateCh: - done := <-watcher2.doneNotifierCh - done.OnDone() + onDone := <-watcher2.doneNotifierCh + onDone() otherWatcherUpdateCh = watcher1.updateCh otherWatcherDoneCh = watcher1.doneNotifierCh case <-ctx.Done(): @@ -420,8 +420,8 @@ func (s) TestADSFlowControl_ResourceUpdates_MultipleResources(t *testing.T) { // Wait for the update on the second watcher and unblock it. select { case <-otherWatcherUpdateCh: - done := <-otherWatcherDoneCh - done.OnDone() + onDone := <-otherWatcherDoneCh + onDone() case <-ctx.Done(): t.Fatal("Timed out waiting for update to reach second watcher") } @@ -504,8 +504,8 @@ func (s) TestADSFlowControl_ResourceErrors(t *testing.T) { } // Unblock one watcher. - done := <-watcher.doneNotifierCh - done.OnDone() + onDone := <-watcher.doneNotifierCh + onDone() // Ensure that there is a read on the stream, now that the previous error // has been consumed by the watcher. @@ -573,8 +573,8 @@ func (s) TestADSFlowControl_ResourceDoesNotExist(t *testing.T) { // Wait for the update to reach the watcher and unblock it. select { case <-watcher.updateCh: - done := <-watcher.doneNotifierCh - done.OnDone() + onDone := <-watcher.doneNotifierCh + onDone() case <-ctx.Done(): t.Fatalf("Timed out waiting for update to reach watcher 1") } @@ -611,8 +611,8 @@ func (s) TestADSFlowControl_ResourceDoesNotExist(t *testing.T) { } // Unblock the watcher. - done := <-watcher.doneNotifierCh - done.OnDone() + onDone := <-watcher.doneNotifierCh + onDone() // Ensure that there is a read on the stream. select { diff --git a/xds/internal/xdsclient/tests/cds_watchers_test.go b/xds/internal/xdsclient/tests/cds_watchers_test.go index d1fa42b95ad7..9b5c00df570a 100644 --- a/xds/internal/xdsclient/tests/cds_watchers_test.go +++ b/xds/internal/xdsclient/tests/cds_watchers_test.go @@ -43,14 +43,14 @@ import ( type noopClusterWatcher struct{} -func (noopClusterWatcher) OnUpdate(update *xdsresource.ClusterResourceData, done xdsresource.DoneNotifier) { - done.OnDone() +func (noopClusterWatcher) OnUpdate(update *xdsresource.ClusterResourceData, onDone xdsresource.OnDoneFunc) { + onDone() } -func (noopClusterWatcher) OnError(err error, done xdsresource.DoneNotifier) { - done.OnDone() +func (noopClusterWatcher) OnError(err error, onDone xdsresource.OnDoneFunc) { + onDone() } -func (noopClusterWatcher) OnResourceDoesNotExist(done xdsresource.DoneNotifier) { - done.OnDone() +func (noopClusterWatcher) OnResourceDoesNotExist(onDone xdsresource.OnDoneFunc) { + onDone() } type clusterUpdateErrTuple struct { @@ -66,23 +66,23 @@ func newClusterWatcher() *clusterWatcher { return &clusterWatcher{updateCh: testutils.NewChannel()} } -func (cw *clusterWatcher) OnUpdate(update *xdsresource.ClusterResourceData, done xdsresource.DoneNotifier) { +func (cw *clusterWatcher) OnUpdate(update *xdsresource.ClusterResourceData, onDone xdsresource.OnDoneFunc) { cw.updateCh.Send(clusterUpdateErrTuple{update: update.Resource}) - done.OnDone() + onDone() } -func (cw *clusterWatcher) OnError(err error, done xdsresource.DoneNotifier) { +func (cw *clusterWatcher) OnError(err error, onDone xdsresource.OnDoneFunc) { // When used with a go-control-plane management server that continuously // resends resources which are NACKed by the xDS client, using a `Replace()` // here and in OnResourceDoesNotExist() simplifies tests which will have // access to the most recently received error. cw.updateCh.Replace(clusterUpdateErrTuple{err: err}) - done.OnDone() + onDone() } -func (cw *clusterWatcher) OnResourceDoesNotExist(done xdsresource.DoneNotifier) { +func (cw *clusterWatcher) OnResourceDoesNotExist(onDone xdsresource.OnDoneFunc) { cw.updateCh.Replace(clusterUpdateErrTuple{err: xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "Cluster not found in received response")}) - done.OnDone() + onDone() } // badClusterResource returns a cluster resource for the given name which diff --git a/xds/internal/xdsclient/tests/eds_watchers_test.go b/xds/internal/xdsclient/tests/eds_watchers_test.go index 6ed7d962f31f..ef27178fac22 100644 --- a/xds/internal/xdsclient/tests/eds_watchers_test.go +++ b/xds/internal/xdsclient/tests/eds_watchers_test.go @@ -53,14 +53,14 @@ const ( type noopEndpointsWatcher struct{} -func (noopEndpointsWatcher) OnUpdate(update *xdsresource.EndpointsResourceData, done xdsresource.DoneNotifier) { - done.OnDone() +func (noopEndpointsWatcher) OnUpdate(update *xdsresource.EndpointsResourceData, onDone xdsresource.OnDoneFunc) { + onDone() } -func (noopEndpointsWatcher) OnError(err error, done xdsresource.DoneNotifier) { - done.OnDone() +func (noopEndpointsWatcher) OnError(err error, onDone xdsresource.OnDoneFunc) { + onDone() } -func (noopEndpointsWatcher) OnResourceDoesNotExist(done xdsresource.DoneNotifier) { - done.OnDone() +func (noopEndpointsWatcher) OnResourceDoesNotExist(onDone xdsresource.OnDoneFunc) { + onDone() } type endpointsUpdateErrTuple struct { @@ -76,23 +76,23 @@ func newEndpointsWatcher() *endpointsWatcher { return &endpointsWatcher{updateCh: testutils.NewChannel()} } -func (ew *endpointsWatcher) OnUpdate(update *xdsresource.EndpointsResourceData, done xdsresource.DoneNotifier) { +func (ew *endpointsWatcher) OnUpdate(update *xdsresource.EndpointsResourceData, onDone xdsresource.OnDoneFunc) { ew.updateCh.Send(endpointsUpdateErrTuple{update: update.Resource}) - done.OnDone() + onDone() } -func (ew *endpointsWatcher) OnError(err error, done xdsresource.DoneNotifier) { +func (ew *endpointsWatcher) OnError(err error, onDone xdsresource.OnDoneFunc) { // When used with a go-control-plane management server that continuously // resends resources which are NACKed by the xDS client, using a `Replace()` // here and in OnResourceDoesNotExist() simplifies tests which will have // access to the most recently received error. ew.updateCh.Replace(endpointsUpdateErrTuple{err: err}) - done.OnDone() + onDone() } -func (ew *endpointsWatcher) OnResourceDoesNotExist(done xdsresource.DoneNotifier) { +func (ew *endpointsWatcher) OnResourceDoesNotExist(onDone xdsresource.OnDoneFunc) { ew.updateCh.Replace(endpointsUpdateErrTuple{err: xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "Endpoints not found in received response")}) - done.OnDone() + onDone() } // badEndpointsResource returns a endpoints resource for the given diff --git a/xds/internal/xdsclient/tests/lds_watchers_test.go b/xds/internal/xdsclient/tests/lds_watchers_test.go index f3f412a2cb51..2ea2c50ce18b 100644 --- a/xds/internal/xdsclient/tests/lds_watchers_test.go +++ b/xds/internal/xdsclient/tests/lds_watchers_test.go @@ -48,14 +48,14 @@ import ( type noopListenerWatcher struct{} -func (noopListenerWatcher) OnUpdate(update *xdsresource.ListenerResourceData, done xdsresource.DoneNotifier) { - done.OnDone() +func (noopListenerWatcher) OnUpdate(update *xdsresource.ListenerResourceData, onDone xdsresource.OnDoneFunc) { + onDone() } -func (noopListenerWatcher) OnError(err error, done xdsresource.DoneNotifier) { - done.OnDone() +func (noopListenerWatcher) OnError(err error, onDone xdsresource.OnDoneFunc) { + onDone() } -func (noopListenerWatcher) OnResourceDoesNotExist(done xdsresource.DoneNotifier) { - done.OnDone() +func (noopListenerWatcher) OnResourceDoesNotExist(onDone xdsresource.OnDoneFunc) { + onDone() } type listenerUpdateErrTuple struct { @@ -71,23 +71,23 @@ func newListenerWatcher() *listenerWatcher { return &listenerWatcher{updateCh: testutils.NewChannel()} } -func (cw *listenerWatcher) OnUpdate(update *xdsresource.ListenerResourceData, done xdsresource.DoneNotifier) { +func (cw *listenerWatcher) OnUpdate(update *xdsresource.ListenerResourceData, onDone xdsresource.OnDoneFunc) { cw.updateCh.Send(listenerUpdateErrTuple{update: update.Resource}) - done.OnDone() + onDone() } -func (cw *listenerWatcher) OnError(err error, done xdsresource.DoneNotifier) { +func (cw *listenerWatcher) OnError(err error, onDone xdsresource.OnDoneFunc) { // When used with a go-control-plane management server that continuously // resends resources which are NACKed by the xDS client, using a `Replace()` // here and in OnResourceDoesNotExist() simplifies tests which will have // access to the most recently received error. cw.updateCh.Replace(listenerUpdateErrTuple{err: err}) - done.OnDone() + onDone() } -func (cw *listenerWatcher) OnResourceDoesNotExist(done xdsresource.DoneNotifier) { +func (cw *listenerWatcher) OnResourceDoesNotExist(onDone xdsresource.OnDoneFunc) { cw.updateCh.Replace(listenerUpdateErrTuple{err: xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "Listener not found in received response")}) - done.OnDone() + onDone() } // badListenerResource returns a listener resource for the given name which does diff --git a/xds/internal/xdsclient/tests/misc_watchers_test.go b/xds/internal/xdsclient/tests/misc_watchers_test.go index 4e29e488b59d..6b8152620231 100644 --- a/xds/internal/xdsclient/tests/misc_watchers_test.go +++ b/xds/internal/xdsclient/tests/misc_watchers_test.go @@ -69,26 +69,26 @@ func newTestRouteConfigWatcher(client xdsclient.XDSClient, name1, name2 string) } } -func (rw *testRouteConfigWatcher) OnUpdate(update *xdsresource.RouteConfigResourceData, done xdsresource.DoneNotifier) { +func (rw *testRouteConfigWatcher) OnUpdate(update *xdsresource.RouteConfigResourceData, onDone xdsresource.OnDoneFunc) { rw.updateCh.Send(routeConfigUpdateErrTuple{update: update.Resource}) rw.cancel1 = xdsresource.WatchRouteConfig(rw.client, rw.name1, rw.rcw1) rw.cancel2 = xdsresource.WatchRouteConfig(rw.client, rw.name2, rw.rcw2) - done.OnDone() + onDone() } -func (rw *testRouteConfigWatcher) OnError(err error, done xdsresource.DoneNotifier) { +func (rw *testRouteConfigWatcher) OnError(err error, onDone xdsresource.OnDoneFunc) { // When used with a go-control-plane management server that continuously // resends resources which are NACKed by the xDS client, using a `Replace()` // here and in OnResourceDoesNotExist() simplifies tests which will have // access to the most recently received error. rw.updateCh.Replace(routeConfigUpdateErrTuple{err: err}) - done.OnDone() + onDone() } -func (rw *testRouteConfigWatcher) OnResourceDoesNotExist(done xdsresource.DoneNotifier) { +func (rw *testRouteConfigWatcher) OnResourceDoesNotExist(onDone xdsresource.OnDoneFunc) { rw.updateCh.Replace(routeConfigUpdateErrTuple{err: xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "RouteConfiguration not found in received response")}) - done.OnDone() + onDone() } func (rw *testRouteConfigWatcher) cancel() { diff --git a/xds/internal/xdsclient/tests/rds_watchers_test.go b/xds/internal/xdsclient/tests/rds_watchers_test.go index c6042d5cfb05..b8dd1c72f465 100644 --- a/xds/internal/xdsclient/tests/rds_watchers_test.go +++ b/xds/internal/xdsclient/tests/rds_watchers_test.go @@ -43,14 +43,14 @@ import ( type noopRouteConfigWatcher struct{} -func (noopRouteConfigWatcher) OnUpdate(update *xdsresource.RouteConfigResourceData, done xdsresource.DoneNotifier) { - done.OnDone() +func (noopRouteConfigWatcher) OnUpdate(update *xdsresource.RouteConfigResourceData, onDone xdsresource.OnDoneFunc) { + onDone() } -func (noopRouteConfigWatcher) OnError(err error, done xdsresource.DoneNotifier) { - done.OnDone() +func (noopRouteConfigWatcher) OnError(err error, onDone xdsresource.OnDoneFunc) { + onDone() } -func (noopRouteConfigWatcher) OnResourceDoesNotExist(done xdsresource.DoneNotifier) { - done.OnDone() +func (noopRouteConfigWatcher) OnResourceDoesNotExist(onDone xdsresource.OnDoneFunc) { + onDone() } type routeConfigUpdateErrTuple struct { @@ -66,23 +66,23 @@ func newRouteConfigWatcher() *routeConfigWatcher { return &routeConfigWatcher{updateCh: testutils.NewChannel()} } -func (rw *routeConfigWatcher) OnUpdate(update *xdsresource.RouteConfigResourceData, done xdsresource.DoneNotifier) { +func (rw *routeConfigWatcher) OnUpdate(update *xdsresource.RouteConfigResourceData, onDone xdsresource.OnDoneFunc) { rw.updateCh.Send(routeConfigUpdateErrTuple{update: update.Resource}) - done.OnDone() + onDone() } -func (rw *routeConfigWatcher) OnError(err error, done xdsresource.DoneNotifier) { +func (rw *routeConfigWatcher) OnError(err error, onDone xdsresource.OnDoneFunc) { // When used with a go-control-plane management server that continuously // resends resources which are NACKed by the xDS client, using a `Replace()` // here and in OnResourceDoesNotExist() simplifies tests which will have // access to the most recently received error. rw.updateCh.Replace(routeConfigUpdateErrTuple{err: err}) - done.OnDone() + onDone() } -func (rw *routeConfigWatcher) OnResourceDoesNotExist(done xdsresource.DoneNotifier) { +func (rw *routeConfigWatcher) OnResourceDoesNotExist(onDone xdsresource.OnDoneFunc) { rw.updateCh.Replace(routeConfigUpdateErrTuple{err: xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "RouteConfiguration not found in received response")}) - done.OnDone() + onDone() } // badRouteConfigResource returns a RouteConfiguration resource for the given diff --git a/xds/internal/xdsclient/transport/loadreport_test.go b/xds/internal/xdsclient/transport/loadreport_test.go index 0f61c50d5b2a..c359025c0b1c 100644 --- a/xds/internal/xdsclient/transport/loadreport_test.go +++ b/xds/internal/xdsclient/transport/loadreport_test.go @@ -68,10 +68,10 @@ func (s) TestReportLoad(t *testing.T) { tr, err := transport.New(transport.Options{ ServerCfg: serverCfg, NodeProto: nodeProto, - OnRecvHandler: func(transport.ResourceUpdate, *transport.ADSFlowControl) error { return nil }, // No ADS validation. - OnErrorHandler: func(error) {}, // No ADS stream error handling. - OnSendHandler: func(*transport.ResourceSendInfo) {}, // No ADS stream update handling. - Backoff: func(int) time.Duration { return time.Duration(0) }, // No backoff. + OnRecvHandler: noopRecvHandler, // No ADS validation. + OnErrorHandler: func(error) {}, // No ADS stream error handling. + OnSendHandler: func(*transport.ResourceSendInfo) {}, // No ADS stream update handling. + Backoff: func(int) time.Duration { return time.Duration(0) }, // No backoff. }) if err != nil { t.Fatalf("Failed to create xDS transport: %v", err) diff --git a/xds/internal/xdsclient/transport/transport.go b/xds/internal/xdsclient/transport/transport.go index 6f156398b9bd..0bc0d386802d 100644 --- a/xds/internal/xdsclient/transport/transport.go +++ b/xds/internal/xdsclient/transport/transport.go @@ -122,10 +122,9 @@ type Transport struct { // error is returned from this function when the data model layer believes // otherwise, and this will cause the transport layer to send a NACK. // -// The implementation is expected to use the ADS flow control object passed to -// it, and increment the number of watchers to whom the update is sent to, and -// eventually decrement the number once the update is consumed by the watchers. -type OnRecvHandlerFunc func(update ResourceUpdate, fc *ADSFlowControl) error +// The implementation is expected to invoke onDone when local processing of the +// update is complete, i.e. it is consumed by all watchers. +type OnRecvHandlerFunc func(update ResourceUpdate, onDone func()) error // OnSendHandlerFunc is the implementation at the authority, which handles state // changes for the resource watch and stop watch timers accordingly. @@ -469,12 +468,12 @@ func (t *Transport) sendExisting(stream adsStream) (sentNodeProto bool, err erro func (t *Transport) recv(ctx context.Context, stream adsStream) bool { // Initialize the flow control quota for the stream. This helps to block the // next read until the previous one is consumed by all watchers. - fc := NewADSStreamFlowControl() + fc := newADSFlowControl() msgReceived := false for { // Wait for ADS stream level flow control to be available. - if !fc.Wait(ctx) { + if !fc.wait(ctx) { if t.logger.V(2) { t.logger.Infof("ADS stream context canceled") } @@ -503,7 +502,8 @@ func (t *Transport) recv(ctx context.Context, stream adsStream) bool { URL: url, Version: rVersion, } - if err = t.onRecvHandler(u, fc); xdsresource.ErrType(err) == xdsresource.ErrorTypeResourceTypeUnsupported { + fc.setPending() + if err = t.onRecvHandler(u, fc.onDone); xdsresource.ErrType(err) == xdsresource.ErrorTypeResourceTypeUnsupported { t.logger.Warningf("%v", err) continue } @@ -638,40 +638,37 @@ func (t *Transport) ChannelConnectivityStateForTesting() connectivity.State { return t.cc.GetState() } -// ADSFlowControl implements ADS stream level flow control that enables the +// adsFlowControl implements ADS stream level flow control that enables the // transport to block the reading of the next message off of the stream until // the previous update is consumed by all watchers. // // The lifetime of the flow control is tied to the lifetime of the stream. -// -// New instances must be created with a call to NewADSStreamFlowControl. -type ADSFlowControl struct { +type adsFlowControl struct { logger *grpclog.PrefixLogger - // Count of watchers yet to consume the most recent update. - pending atomic.Int64 + // Whether the most recent update is pending consumption by all watchers. + pending atomic.Bool // Channel used to notify when all the watchers have consumed the most // recent update. Wait() blocks on reading a value from this channel. readyCh chan struct{} } -// NewADSStreamFlowControl returns a new ADSFlowControl. -func NewADSStreamFlowControl() *ADSFlowControl { - return &ADSFlowControl{readyCh: make(chan struct{}, 1)} +// newADSFlowControl returns a new adsFlowControl. +func newADSFlowControl() *adsFlowControl { + return &adsFlowControl{readyCh: make(chan struct{}, 1)} } -// Add increments the number of watchers (by one) who are yet to consume the -// most recent update received on the ADS stream. -func (fc *ADSFlowControl) Add() { - fc.pending.Add(1) +// setPending changes the internal state to indicate that there is an update +// pending consumption by all watchers. +func (fc *adsFlowControl) setPending() { + fc.pending.Store(true) } -// Wait blocks until all the watchers have consumed the most recent update and +// wait blocks until all the watchers have consumed the most recent update and // returns true. If the context expires before that, it returns false. -func (fc *ADSFlowControl) Wait(ctx context.Context) bool { - // If there are no watchers or none with pending updates, there is no need - // to block. - if n := fc.pending.Load(); n == 0 { +func (fc *adsFlowControl) wait(ctx context.Context) bool { + // If there is no pending update, there is no need to block. + if !fc.pending.Load() { // If all watchers finished processing the most recent update before the // `recv` goroutine made the next call to `Wait()`, there would be an // entry in the readyCh channel that needs to be drained to ensure that @@ -691,11 +688,9 @@ func (fc *ADSFlowControl) Wait(ctx context.Context) bool { } } -// OnDone indicates that a watcher has consumed the most recent update. -func (fc *ADSFlowControl) OnDone() { - if pending := fc.pending.Add(-1); pending != 0 { - return - } +// onDone indicates that all watchers have consumed the most recent update. +func (fc *adsFlowControl) onDone() { + fc.pending.Store(false) select { // Writes to the readyCh channel should not block ideally. The default diff --git a/xds/internal/xdsclient/transport/transport_ack_nack_test.go b/xds/internal/xdsclient/transport/transport_ack_nack_test.go index ca65101d4f18..73b2635eff6a 100644 --- a/xds/internal/xdsclient/transport/transport_ack_nack_test.go +++ b/xds/internal/xdsclient/transport/transport_ack_nack_test.go @@ -49,7 +49,8 @@ var ( // A simple update handler for listener resources which validates only the // `use_original_dst` field. - dataModelValidator = func(update transport.ResourceUpdate, _ *transport.ADSFlowControl) error { + dataModelValidator = func(update transport.ResourceUpdate, onDone func()) error { + defer onDone() for _, r := range update.Resources { inner := &v3discoverypb.Resource{} if err := proto.Unmarshal(r.GetValue(), inner); err != nil { diff --git a/xds/internal/xdsclient/transport/transport_backoff_test.go b/xds/internal/xdsclient/transport/transport_backoff_test.go index fd0b0e226455..f85c10b5466d 100644 --- a/xds/internal/xdsclient/transport/transport_backoff_test.go +++ b/xds/internal/xdsclient/transport/transport_backoff_test.go @@ -44,6 +44,11 @@ import ( var strSort = func(s1, s2 string) bool { return s1 < s2 } +var noopRecvHandler = func(_ transport.ResourceUpdate, onDone func()) error { + onDone() + return nil +} + // TestTransport_BackoffAfterStreamFailure tests the case where the management // server returns an error in the ADS streaming RPC. The test verifies the // following: @@ -101,7 +106,7 @@ func (s) TestTransport_BackoffAfterStreamFailure(t *testing.T) { nodeID := uuid.New().String() tr, err := transport.New(transport.Options{ ServerCfg: serverCfg, - OnRecvHandler: func(transport.ResourceUpdate, *transport.ADSFlowControl) error { return nil }, // No data model layer validation. + OnRecvHandler: noopRecvHandler, // No data model layer validation. OnErrorHandler: func(err error) { select { case streamErrCh <- err: @@ -262,7 +267,7 @@ func (s) TestTransport_RetriesAfterBrokenStream(t *testing.T) { // we can pass a no-op data model layer implementation. tr, err := transport.New(transport.Options{ ServerCfg: serverCfg, - OnRecvHandler: func(transport.ResourceUpdate, *transport.ADSFlowControl) error { return nil }, // No data model layer validation. + OnRecvHandler: noopRecvHandler, // No data model layer validation. OnErrorHandler: func(err error) { select { case streamErrCh <- err: @@ -394,10 +399,10 @@ func (s) TestTransport_ResourceRequestedBeforeStreamCreation(t *testing.T) { nodeID := uuid.New().String() tr, err := transport.New(transport.Options{ ServerCfg: serverCfg, - OnRecvHandler: func(transport.ResourceUpdate, *transport.ADSFlowControl) error { return nil }, // No data model layer validation. - OnErrorHandler: func(error) {}, // No stream error handling. - OnSendHandler: func(*transport.ResourceSendInfo) {}, // No on send handler - Backoff: func(int) time.Duration { return time.Duration(0) }, // No backoff. + OnRecvHandler: noopRecvHandler, // No data model layer validation. + OnErrorHandler: func(error) {}, // No stream error handling. + OnSendHandler: func(*transport.ResourceSendInfo) {}, // No on send handler + Backoff: func(int) time.Duration { return time.Duration(0) }, // No backoff. NodeProto: &v3corepb.Node{Id: nodeID}, }) if err != nil { diff --git a/xds/internal/xdsclient/transport/transport_new_test.go b/xds/internal/xdsclient/transport/transport_new_test.go index a9d9adbfbe7c..3bd5ac9d2486 100644 --- a/xds/internal/xdsclient/transport/transport_new_test.go +++ b/xds/internal/xdsclient/transport/transport_new_test.go @@ -53,7 +53,7 @@ func (s) TestNew(t *testing.T) { opts: transport.Options{ ServerCfg: serverCfg, NodeProto: &v3corepb.Node{}, - OnRecvHandler: func(transport.ResourceUpdate, *transport.ADSFlowControl) error { return nil }, + OnRecvHandler: noopRecvHandler, // No data model layer validation. OnSendHandler: func(*transport.ResourceSendInfo) {}, }, wantErrStr: "missing OnError callback handler when creating a new transport", @@ -64,7 +64,7 @@ func (s) TestNew(t *testing.T) { opts: transport.Options{ ServerCfg: serverCfg, NodeProto: &v3corepb.Node{}, - OnRecvHandler: func(transport.ResourceUpdate, *transport.ADSFlowControl) error { return nil }, + OnRecvHandler: noopRecvHandler, // No data model layer validation. OnErrorHandler: func(error) {}, }, wantErrStr: "missing OnSend callback handler when creating a new transport", @@ -74,7 +74,7 @@ func (s) TestNew(t *testing.T) { opts: transport.Options{ ServerCfg: serverCfg, NodeProto: &v3corepb.Node{}, - OnRecvHandler: func(transport.ResourceUpdate, *transport.ADSFlowControl) error { return nil }, + OnRecvHandler: noopRecvHandler, // No data model layer validation. OnErrorHandler: func(error) {}, OnSendHandler: func(*transport.ResourceSendInfo) {}, }, diff --git a/xds/internal/xdsclient/transport/transport_resource_test.go b/xds/internal/xdsclient/transport/transport_resource_test.go index e83d9d7cd338..70874c0c6e97 100644 --- a/xds/internal/xdsclient/transport/transport_resource_test.go +++ b/xds/internal/xdsclient/transport/transport_resource_test.go @@ -185,12 +185,13 @@ func (s) TestHandleResponseFromManagementServer(t *testing.T) { tr, err := transport.New(transport.Options{ ServerCfg: serverCfg, // No validation. Simply push received resources on a channel. - OnRecvHandler: func(update transport.ResourceUpdate, _ *transport.ADSFlowControl) error { + OnRecvHandler: func(update transport.ResourceUpdate, onDone func()) error { resourcesCh.Send(&resourcesWithTypeURL{ resources: update.Resources, url: update.URL, // Ignore resource version here. }) + onDone() return nil }, OnSendHandler: func(*transport.ResourceSendInfo) {}, // No onSend handling. @@ -239,7 +240,7 @@ func (s) TestEmptyListenerResourceOnStreamRestart(t *testing.T) { nodeProto := &v3corepb.Node{Id: uuid.New().String()} tr, err := transport.New(transport.Options{ ServerCfg: serverCfg, - OnRecvHandler: func(transport.ResourceUpdate, *transport.ADSFlowControl) error { return nil }, + OnRecvHandler: noopRecvHandler, // No data model layer validation. OnSendHandler: func(*transport.ResourceSendInfo) {}, // No onSend handling. OnErrorHandler: func(error) {}, // No stream error handling. Backoff: func(int) time.Duration { return time.Duration(0) }, // No backoff. @@ -330,7 +331,7 @@ func (s) TestEmptyClusterResourceOnStreamRestartWithListener(t *testing.T) { nodeProto := &v3corepb.Node{Id: uuid.New().String()} tr, err := transport.New(transport.Options{ ServerCfg: serverCfg, - OnRecvHandler: func(transport.ResourceUpdate, *transport.ADSFlowControl) error { return nil }, + OnRecvHandler: noopRecvHandler, // No data model layer validation. OnSendHandler: func(*transport.ResourceSendInfo) {}, // No onSend handling. OnErrorHandler: func(error) {}, // No stream error handling. Backoff: func(int) time.Duration { return time.Duration(0) }, // No backoff. diff --git a/xds/internal/xdsclient/transport/transport_test.go b/xds/internal/xdsclient/transport/transport_test.go index 4cf85a7a2c6b..24aad924bd92 100644 --- a/xds/internal/xdsclient/transport/transport_test.go +++ b/xds/internal/xdsclient/transport/transport_test.go @@ -47,8 +47,8 @@ func (s) TestNewWithGRPCDial(t *testing.T) { opts := transport.Options{ ServerCfg: serverCfg, NodeProto: &v3corepb.Node{}, - OnRecvHandler: func(update transport.ResourceUpdate, fc *transport.ADSFlowControl) error { - fc.OnDone() + OnRecvHandler: func(update transport.ResourceUpdate, onDone func()) error { + onDone() return nil }, OnErrorHandler: func(error) {}, diff --git a/xds/internal/xdsclient/xdsresource/cluster_resource_type.go b/xds/internal/xdsclient/xdsresource/cluster_resource_type.go index fb6f66f20dbd..18d47cbc101d 100644 --- a/xds/internal/xdsclient/xdsresource/cluster_resource_type.go +++ b/xds/internal/xdsclient/xdsresource/cluster_resource_type.go @@ -111,10 +111,7 @@ func (c *ClusterResourceData) Raw() *anypb.Any { // corresponding to the cluster resource being watched. type ClusterWatcher interface { // OnUpdate is invoked to report an update for the resource being watched. - // - // The watcher is expected to call Done() on the DoneNotifier once it has - // processed the update. - OnUpdate(*ClusterResourceData, DoneNotifier) + OnUpdate(*ClusterResourceData, OnDoneFunc) // OnError is invoked under different error conditions including but not // limited to the following: @@ -124,34 +121,28 @@ type ClusterWatcher interface { // - resource validation error // - ADS stream failure // - connection failure - // - // The watcher is expected to call Done() on the DoneNotifier once it has - // processed the update. - OnError(error, DoneNotifier) + OnError(error, OnDoneFunc) // OnResourceDoesNotExist is invoked for a specific error condition where // the requested resource is not found on the xDS management server. - // - // The watcher is expected to call Done() on the DoneNotifier once it has - // processed the update. - OnResourceDoesNotExist(DoneNotifier) + OnResourceDoesNotExist(OnDoneFunc) } type delegatingClusterWatcher struct { watcher ClusterWatcher } -func (d *delegatingClusterWatcher) OnUpdate(data ResourceData, done DoneNotifier) { +func (d *delegatingClusterWatcher) OnUpdate(data ResourceData, onDone OnDoneFunc) { c := data.(*ClusterResourceData) - d.watcher.OnUpdate(c, done) + d.watcher.OnUpdate(c, onDone) } -func (d *delegatingClusterWatcher) OnError(err error, done DoneNotifier) { - d.watcher.OnError(err, done) +func (d *delegatingClusterWatcher) OnError(err error, onDone OnDoneFunc) { + d.watcher.OnError(err, onDone) } -func (d *delegatingClusterWatcher) OnResourceDoesNotExist(done DoneNotifier) { - d.watcher.OnResourceDoesNotExist(done) +func (d *delegatingClusterWatcher) OnResourceDoesNotExist(onDone OnDoneFunc) { + d.watcher.OnResourceDoesNotExist(onDone) } // WatchCluster uses xDS to discover the configuration associated with the diff --git a/xds/internal/xdsclient/xdsresource/endpoints_resource_type.go b/xds/internal/xdsclient/xdsresource/endpoints_resource_type.go index 68e3a2548e64..e18c30db3a13 100644 --- a/xds/internal/xdsclient/xdsresource/endpoints_resource_type.go +++ b/xds/internal/xdsclient/xdsresource/endpoints_resource_type.go @@ -107,10 +107,7 @@ func (e *EndpointsResourceData) Raw() *anypb.Any { // events corresponding to the endpoints resource being watched. type EndpointsWatcher interface { // OnUpdate is invoked to report an update for the resource being watched. - // - // The watcher is expected to call Done() on the DoneNotifier once it has - // processed the update. - OnUpdate(*EndpointsResourceData, DoneNotifier) + OnUpdate(*EndpointsResourceData, OnDoneFunc) // OnError is invoked under different error conditions including but not // limited to the following: @@ -120,34 +117,28 @@ type EndpointsWatcher interface { // - resource validation error // - ADS stream failure // - connection failure - // - // The watcher is expected to call Done() on the DoneNotifier once it has - // processed the update. - OnError(error, DoneNotifier) + OnError(error, OnDoneFunc) // OnResourceDoesNotExist is invoked for a specific error condition where // the requested resource is not found on the xDS management server. - // - // The watcher is expected to call Done() on the DoneNotifier once it has - // processed the update. - OnResourceDoesNotExist(DoneNotifier) + OnResourceDoesNotExist(OnDoneFunc) } type delegatingEndpointsWatcher struct { watcher EndpointsWatcher } -func (d *delegatingEndpointsWatcher) OnUpdate(data ResourceData, done DoneNotifier) { +func (d *delegatingEndpointsWatcher) OnUpdate(data ResourceData, onDone OnDoneFunc) { e := data.(*EndpointsResourceData) - d.watcher.OnUpdate(e, done) + d.watcher.OnUpdate(e, onDone) } -func (d *delegatingEndpointsWatcher) OnError(err error, done DoneNotifier) { - d.watcher.OnError(err, done) +func (d *delegatingEndpointsWatcher) OnError(err error, onDone OnDoneFunc) { + d.watcher.OnError(err, onDone) } -func (d *delegatingEndpointsWatcher) OnResourceDoesNotExist(done DoneNotifier) { - d.watcher.OnResourceDoesNotExist(done) +func (d *delegatingEndpointsWatcher) OnResourceDoesNotExist(onDone OnDoneFunc) { + d.watcher.OnResourceDoesNotExist(onDone) } // WatchEndpoints uses xDS to discover the configuration associated with the diff --git a/xds/internal/xdsclient/xdsresource/listener_resource_type.go b/xds/internal/xdsclient/xdsresource/listener_resource_type.go index 419161e69a83..80fa5e6a21ec 100644 --- a/xds/internal/xdsclient/xdsresource/listener_resource_type.go +++ b/xds/internal/xdsclient/xdsresource/listener_resource_type.go @@ -144,10 +144,7 @@ func (l *ListenerResourceData) Raw() *anypb.Any { // events corresponding to the listener resource being watched. type ListenerWatcher interface { // OnUpdate is invoked to report an update for the resource being watched. - // - // The watcher is expected to call Done() on the DoneNotifier once it has - // processed the update. - OnUpdate(*ListenerResourceData, DoneNotifier) + OnUpdate(*ListenerResourceData, OnDoneFunc) // OnError is invoked under different error conditions including but not // limited to the following: @@ -157,34 +154,28 @@ type ListenerWatcher interface { // - resource validation error // - ADS stream failure // - connection failure - // - // The watcher is expected to call Done() on the DoneNotifier once it has - // processed the update. - OnError(error, DoneNotifier) + OnError(error, OnDoneFunc) // OnResourceDoesNotExist is invoked for a specific error condition where // the requested resource is not found on the xDS management server. - // - // The watcher is expected to call Done() on the DoneNotifier once it has - // processed the update. - OnResourceDoesNotExist(DoneNotifier) + OnResourceDoesNotExist(OnDoneFunc) } type delegatingListenerWatcher struct { watcher ListenerWatcher } -func (d *delegatingListenerWatcher) OnUpdate(data ResourceData, done DoneNotifier) { +func (d *delegatingListenerWatcher) OnUpdate(data ResourceData, onDone OnDoneFunc) { l := data.(*ListenerResourceData) - d.watcher.OnUpdate(l, done) + d.watcher.OnUpdate(l, onDone) } -func (d *delegatingListenerWatcher) OnError(err error, done DoneNotifier) { - d.watcher.OnError(err, done) +func (d *delegatingListenerWatcher) OnError(err error, onDone OnDoneFunc) { + d.watcher.OnError(err, onDone) } -func (d *delegatingListenerWatcher) OnResourceDoesNotExist(done DoneNotifier) { - d.watcher.OnResourceDoesNotExist(done) +func (d *delegatingListenerWatcher) OnResourceDoesNotExist(onDone OnDoneFunc) { + d.watcher.OnResourceDoesNotExist(onDone) } // WatchListener uses xDS to discover the configuration associated with the diff --git a/xds/internal/xdsclient/xdsresource/resource_type.go b/xds/internal/xdsclient/xdsresource/resource_type.go index d4377ed50496..55cfd6fbb15b 100644 --- a/xds/internal/xdsclient/xdsresource/resource_type.go +++ b/xds/internal/xdsclient/xdsresource/resource_type.go @@ -52,18 +52,11 @@ type Producer interface { WatchResource(rType Type, resourceName string, watcher ResourceWatcher) (cancel func()) } -// DoneNotifier wraps the OnDone callback to be invoked once a resource update -// is processed by the watcher. -type DoneNotifier interface { - OnDone() -} - -// NopDoneNotifier is a concrete implementation of the DoneNotifier interface, -// that serves as a convenient placeholder when the callback is not needed. -type NopDoneNotifier struct{} - -// OnDone implements the DoneNotifier interface. -func (NopDoneNotifier) OnDone() {} +// OnDoneFunc is a function to be invoked by watcher implementations upon +// completing the processing of a callback from the xDS client. Failure to +// invoke this callback prevents the xDS client from reading further messages +// from the xDS server. +type OnDoneFunc func() // ResourceWatcher wraps the callbacks to be invoked for different events // corresponding to the resource being watched. @@ -71,10 +64,7 @@ type ResourceWatcher interface { // OnUpdate is invoked to report an update for the resource being watched. // The ResourceData parameter needs to be type asserted to the appropriate // type for the resource being watched. - // - // The watcher is expected to call Done() on the DoneNotifier once it has - // processed the update. - OnUpdate(ResourceData, DoneNotifier) + OnUpdate(ResourceData, OnDoneFunc) // OnError is invoked under different error conditions including but not // limited to the following: @@ -84,11 +74,11 @@ type ResourceWatcher interface { // - resource validation error // - ADS stream failure // - connection failure - OnError(error, DoneNotifier) + OnError(error, OnDoneFunc) // OnResourceDoesNotExist is invoked for a specific error condition where // the requested resource is not found on the xDS management server. - OnResourceDoesNotExist(DoneNotifier) + OnResourceDoesNotExist(OnDoneFunc) } // TODO: Once the implementation is complete, rename this interface as diff --git a/xds/internal/xdsclient/xdsresource/route_config_resource_type.go b/xds/internal/xdsclient/xdsresource/route_config_resource_type.go index cd8b86d81b37..d370b732ac64 100644 --- a/xds/internal/xdsclient/xdsresource/route_config_resource_type.go +++ b/xds/internal/xdsclient/xdsresource/route_config_resource_type.go @@ -108,10 +108,7 @@ func (r *RouteConfigResourceData) Raw() *anypb.Any { // events corresponding to the route configuration resource being watched. type RouteConfigWatcher interface { // OnUpdate is invoked to report an update for the resource being watched. - // - // The watcher is expected to call Done() on the DoneNotifier once it has - // processed the update. - OnUpdate(*RouteConfigResourceData, DoneNotifier) + OnUpdate(*RouteConfigResourceData, OnDoneFunc) // OnError is invoked under different error conditions including but not // limited to the following: @@ -121,34 +118,28 @@ type RouteConfigWatcher interface { // - resource validation error // - ADS stream failure // - connection failure - // - // The watcher is expected to call Done() on the DoneNotifier once it has - // processed the update. - OnError(error, DoneNotifier) + OnError(error, OnDoneFunc) // OnResourceDoesNotExist is invoked for a specific error condition where // the requested resource is not found on the xDS management server. - // - // The watcher is expected to call Done() on the DoneNotifier once it has - // processed the update. - OnResourceDoesNotExist(DoneNotifier) + OnResourceDoesNotExist(OnDoneFunc) } type delegatingRouteConfigWatcher struct { watcher RouteConfigWatcher } -func (d *delegatingRouteConfigWatcher) OnUpdate(data ResourceData, done DoneNotifier) { +func (d *delegatingRouteConfigWatcher) OnUpdate(data ResourceData, onDone OnDoneFunc) { rc := data.(*RouteConfigResourceData) - d.watcher.OnUpdate(rc, done) + d.watcher.OnUpdate(rc, onDone) } -func (d *delegatingRouteConfigWatcher) OnError(err error, done DoneNotifier) { - d.watcher.OnError(err, done) +func (d *delegatingRouteConfigWatcher) OnError(err error, onDone OnDoneFunc) { + d.watcher.OnError(err, onDone) } -func (d *delegatingRouteConfigWatcher) OnResourceDoesNotExist(done DoneNotifier) { - d.watcher.OnResourceDoesNotExist(done) +func (d *delegatingRouteConfigWatcher) OnResourceDoesNotExist(onDone OnDoneFunc) { + d.watcher.OnResourceDoesNotExist(onDone) } // WatchRouteConfig uses xDS to discover the configuration associated with the