Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

xds: keep ads flow control local to xdsclient/transport package #7578

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 33 additions & 33 deletions xds/csds/csds_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,50 +70,50 @@ func Test(t *testing.T) {

type nopListenerWatcher struct{}

func (nopListenerWatcher) OnUpdate(_ *xdsresource.ListenerResourceData, onDone xdsresource.DoneNotifier) {
onDone.OnDone()
func (nopListenerWatcher) OnUpdate(_ *xdsresource.ListenerResourceData, onDone xdsresource.OnDoneFunc) {
onDone()
}
func (nopListenerWatcher) OnError(_ error, onDone xdsresource.DoneNotifier) {
onDone.OnDone()
func (nopListenerWatcher) OnError(_ error, onDone xdsresource.OnDoneFunc) {
onDone()
}
func (nopListenerWatcher) OnResourceDoesNotExist(onDone xdsresource.DoneNotifier) {
onDone.OnDone()
func (nopListenerWatcher) OnResourceDoesNotExist(onDone xdsresource.OnDoneFunc) {
onDone()
}

type nopRouteConfigWatcher struct{}

func (nopRouteConfigWatcher) OnUpdate(_ *xdsresource.RouteConfigResourceData, onDone xdsresource.DoneNotifier) {
onDone.OnDone()
func (nopRouteConfigWatcher) OnUpdate(_ *xdsresource.RouteConfigResourceData, onDone xdsresource.OnDoneFunc) {
onDone()
}
func (nopRouteConfigWatcher) OnError(_ error, onDone xdsresource.DoneNotifier) {
onDone.OnDone()
func (nopRouteConfigWatcher) OnError(_ error, onDone xdsresource.OnDoneFunc) {
onDone()
}
func (nopRouteConfigWatcher) OnResourceDoesNotExist(onDone xdsresource.DoneNotifier) {
onDone.OnDone()
func (nopRouteConfigWatcher) OnResourceDoesNotExist(onDone xdsresource.OnDoneFunc) {
onDone()
}

type nopClusterWatcher struct{}

func (nopClusterWatcher) OnUpdate(_ *xdsresource.ClusterResourceData, onDone xdsresource.DoneNotifier) {
onDone.OnDone()
func (nopClusterWatcher) OnUpdate(_ *xdsresource.ClusterResourceData, onDone xdsresource.OnDoneFunc) {
onDone()
}
func (nopClusterWatcher) OnError(_ error, onDone xdsresource.DoneNotifier) {
onDone.OnDone()
func (nopClusterWatcher) OnError(_ error, onDone xdsresource.OnDoneFunc) {
onDone()
}
func (nopClusterWatcher) OnResourceDoesNotExist(onDone xdsresource.DoneNotifier) {
onDone.OnDone()
func (nopClusterWatcher) OnResourceDoesNotExist(onDone xdsresource.OnDoneFunc) {
onDone()
}

type nopEndpointsWatcher struct{}

func (nopEndpointsWatcher) OnUpdate(_ *xdsresource.EndpointsResourceData, onDone xdsresource.DoneNotifier) {
onDone.OnDone()
func (nopEndpointsWatcher) OnUpdate(_ *xdsresource.EndpointsResourceData, onDone xdsresource.OnDoneFunc) {
onDone()
}
func (nopEndpointsWatcher) OnError(_ error, onDone xdsresource.DoneNotifier) {
onDone.OnDone()
func (nopEndpointsWatcher) OnError(_ error, onDone xdsresource.OnDoneFunc) {
onDone()
}
func (nopEndpointsWatcher) OnResourceDoesNotExist(onDone xdsresource.DoneNotifier) {
onDone.OnDone()
func (nopEndpointsWatcher) OnResourceDoesNotExist(onDone xdsresource.OnDoneFunc) {
onDone()
}

// This watcher writes the onDone callback on to a channel for the test to
Expand All @@ -126,31 +126,31 @@ func (nopEndpointsWatcher) OnResourceDoesNotExist(onDone xdsresource.DoneNotifie
// for ADS stream level flow control), and was causing CSDS to not receive any
// updates from the xDS client.
type blockingListenerWatcher struct {
testCtxDone <-chan struct{} // Closed when the test is done.
onDoneCh chan xdsresource.DoneNotifier // Channel to write the onDone callback to.
testCtxDone <-chan struct{} // Closed when the test is done.
onDoneCh chan xdsresource.OnDoneFunc // Channel to write the onDone callback to.
}

func newBlockingListenerWatcher(testCtxDone <-chan struct{}) *blockingListenerWatcher {
return &blockingListenerWatcher{
testCtxDone: testCtxDone,
onDoneCh: make(chan xdsresource.DoneNotifier, 1),
onDoneCh: make(chan xdsresource.OnDoneFunc, 1),
}
}

func (w *blockingListenerWatcher) OnUpdate(_ *xdsresource.ListenerResourceData, onDone xdsresource.DoneNotifier) {
func (w *blockingListenerWatcher) OnUpdate(_ *xdsresource.ListenerResourceData, onDone xdsresource.OnDoneFunc) {
writeOnDone(w.testCtxDone, w.onDoneCh, onDone)
}
func (w *blockingListenerWatcher) OnError(_ error, onDone xdsresource.DoneNotifier) {
func (w *blockingListenerWatcher) OnError(_ error, onDone xdsresource.OnDoneFunc) {
writeOnDone(w.testCtxDone, w.onDoneCh, onDone)
}
func (w *blockingListenerWatcher) OnResourceDoesNotExist(onDone xdsresource.DoneNotifier) {
func (w *blockingListenerWatcher) OnResourceDoesNotExist(onDone xdsresource.OnDoneFunc) {
writeOnDone(w.testCtxDone, w.onDoneCh, onDone)
}

// writeOnDone attempts to writes the onDone callback on the onDone channel. It
// returns when it can successfully write to the channel or when the test is
// done, which is signalled by testCtxDone being closed.
func writeOnDone(testCtxDone <-chan struct{}, onDoneCh chan xdsresource.DoneNotifier, onDone xdsresource.DoneNotifier) {
func writeOnDone(testCtxDone <-chan struct{}, onDoneCh chan xdsresource.OnDoneFunc, onDone xdsresource.OnDoneFunc) {
select {
case <-testCtxDone:
case onDoneCh <- onDone:
Expand Down Expand Up @@ -545,7 +545,7 @@ func (s) TestCSDS_NACK(t *testing.T) {
case <-ctx.Done():
t.Fatal("Timed out waiting for watch callback")
case onDone := <-watcher2.onDoneCh:
onDone.OnDone()
onDone()
}

// Update the second resource with an empty ApiListener field which is
Expand All @@ -564,7 +564,7 @@ func (s) TestCSDS_NACK(t *testing.T) {
case <-ctx.Done():
t.Fatal("Timed out waiting for watch callback")
case onDone := <-watcher2.onDoneCh:
onDone.OnDone()
onDone()
}

// Verify that the xDS client reports the first listener resource as being
Expand Down
18 changes: 9 additions & 9 deletions xds/internal/balancer/cdsbalancer/cluster_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,19 +32,19 @@ type clusterWatcher struct {
parent *cdsBalancer
}

func (cw *clusterWatcher) OnUpdate(u *xdsresource.ClusterResourceData, onDone xdsresource.DoneNotifier) {
handleUpdate := func(context.Context) { cw.parent.onClusterUpdate(cw.name, u.Resource); onDone.OnDone() }
cw.parent.serializer.ScheduleOr(handleUpdate, onDone.OnDone)
func (cw *clusterWatcher) OnUpdate(u *xdsresource.ClusterResourceData, onDone xdsresource.OnDoneFunc) {
handleUpdate := func(context.Context) { cw.parent.onClusterUpdate(cw.name, u.Resource); onDone() }
cw.parent.serializer.ScheduleOr(handleUpdate, onDone)
}

func (cw *clusterWatcher) OnError(err error, onDone xdsresource.DoneNotifier) {
handleError := func(context.Context) { cw.parent.onClusterError(cw.name, err); onDone.OnDone() }
cw.parent.serializer.ScheduleOr(handleError, onDone.OnDone)
func (cw *clusterWatcher) OnError(err error, onDone xdsresource.OnDoneFunc) {
handleError := func(context.Context) { cw.parent.onClusterError(cw.name, err); onDone() }
cw.parent.serializer.ScheduleOr(handleError, onDone)
}

func (cw *clusterWatcher) OnResourceDoesNotExist(onDone xdsresource.DoneNotifier) {
handleNotFound := func(context.Context) { cw.parent.onClusterResourceNotFound(cw.name); onDone.OnDone() }
cw.parent.serializer.ScheduleOr(handleNotFound, onDone.OnDone)
func (cw *clusterWatcher) OnResourceDoesNotExist(onDone xdsresource.OnDoneFunc) {
handleNotFound := func(context.Context) { cw.parent.onClusterResourceNotFound(cw.name); onDone() }
cw.parent.serializer.ScheduleOr(handleNotFound, onDone)
}

// watcherState groups the state associated with a clusterWatcher.
Expand Down
2 changes: 1 addition & 1 deletion xds/internal/balancer/clusterresolver/clusterresolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ func (b *clusterResolverBalancer) handleResourceUpdate(update *resourceUpdate) {
b.updateChildConfig()

if update.onDone != nil {
update.onDone.OnDone()
update.onDone()
}
}

Expand Down
18 changes: 9 additions & 9 deletions xds/internal/balancer/clusterresolver/resource_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ type resourceUpdate struct {
priorities []priorityConfig
// To be invoked once the update is completely processed, or is dropped in
// favor of a newer update.
onDone xdsresource.DoneNotifier
onDone xdsresource.OnDoneFunc
}

// topLevelResolver is used by concrete endpointsResolver implementations for
Expand All @@ -49,7 +49,7 @@ type topLevelResolver interface {
// endpointsResolver implementation. The onDone callback is to be invoked
// once the update is completely processed, or is dropped in favor of a
// newer update.
onUpdate(onDone xdsresource.DoneNotifier)
onUpdate(onDone xdsresource.OnDoneFunc)
}

// endpointsResolver wraps the functionality to resolve a given resource name to
Expand Down Expand Up @@ -215,7 +215,7 @@ func (rr *resourceResolver) updateMechanisms(mechanisms []DiscoveryMechanism) {
}
// Regenerate even if there's no change in discovery mechanism, in case
// priority order changed.
rr.generateLocked(xdsresource.NopDoneNotifier{})
rr.generateLocked(func() {})
}

// resolveNow is typically called to trigger re-resolve of DNS. The EDS
Expand Down Expand Up @@ -264,7 +264,7 @@ func (rr *resourceResolver) stop(closing bool) {
select {
case ru := <-rr.updateChannel:
if ru.onDone != nil {
ru.onDone.OnDone()
ru.onDone()
}
default:
}
Expand All @@ -281,14 +281,14 @@ func (rr *resourceResolver) stop(closing bool) {
// clusterresolver LB policy.
//
// Caller must hold rr.mu.
func (rr *resourceResolver) generateLocked(onDone xdsresource.DoneNotifier) {
func (rr *resourceResolver) generateLocked(onDone xdsresource.OnDoneFunc) {
var ret []priorityConfig
for _, rDM := range rr.children {
u, ok := rDM.r.lastUpdate()
if !ok {
// Don't send updates to parent until all resolvers have update to
// send.
onDone.OnDone()
onDone()
return
}
switch uu := u.(type) {
Expand All @@ -304,18 +304,18 @@ func (rr *resourceResolver) generateLocked(onDone xdsresource.DoneNotifier) {
// receive path.
case ru := <-rr.updateChannel:
if ru.onDone != nil {
ru.onDone.OnDone()
ru.onDone()
}
default:
}
rr.updateChannel <- &resourceUpdate{priorities: ret, onDone: onDone}
}

func (rr *resourceResolver) onUpdate(onDone xdsresource.DoneNotifier) {
func (rr *resourceResolver) onUpdate(onDone xdsresource.OnDoneFunc) {
handleUpdate := func(context.Context) {
rr.mu.Lock()
rr.generateLocked(onDone)
rr.mu.Unlock()
}
rr.serializer.ScheduleOr(handleUpdate, func() { onDone.OnDone() })
rr.serializer.ScheduleOr(handleUpdate, func() { onDone() })
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
"google.golang.org/grpc/internal/pretty"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/serviceconfig"
"google.golang.org/grpc/xds/internal/xdsclient/xdsresource"
)

var (
Expand Down Expand Up @@ -80,7 +79,7 @@
ret.logger.Infof("Failed to parse dns hostname %q in clusterresolver LB policy", target)
}
ret.updateReceived = true
ret.topLevelResolver.onUpdate(xdsresource.NopDoneNotifier{})
ret.topLevelResolver.onUpdate(func() {})
return ret
}

Expand All @@ -90,7 +89,7 @@
ret.logger.Infof("Failed to build DNS resolver for target %q: %v", target, err)
}
ret.updateReceived = true
ret.topLevelResolver.onUpdate(xdsresource.NopDoneNotifier{})
ret.topLevelResolver.onUpdate(func() {})

Check warning on line 92 in xds/internal/balancer/clusterresolver/resource_resolver_dns.go

View check run for this annotation

Codecov / codecov/patch

xds/internal/balancer/clusterresolver/resource_resolver_dns.go#L92

Added line #L92 was not covered by tests
return ret
}
ret.dnsR = r
Expand Down Expand Up @@ -154,7 +153,7 @@
dr.updateReceived = true
dr.mu.Unlock()

dr.topLevelResolver.onUpdate(xdsresource.NopDoneNotifier{})
dr.topLevelResolver.onUpdate(func() {})
return nil
}

Expand All @@ -177,7 +176,7 @@
dr.updateReceived = true
dr.mu.Unlock()

dr.topLevelResolver.onUpdate(xdsresource.NopDoneNotifier{})
dr.topLevelResolver.onUpdate(func() {})
}

func (dr *dnsDiscoveryMechanism) NewAddress(addresses []resolver.Address) {
Expand Down
14 changes: 7 additions & 7 deletions xds/internal/balancer/clusterresolver/resource_resolver_eds.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,9 @@
}

// OnUpdate is invoked to report an update for the resource being watched.
func (er *edsDiscoveryMechanism) OnUpdate(update *xdsresource.EndpointsResourceData, onDone xdsresource.DoneNotifier) {
func (er *edsDiscoveryMechanism) OnUpdate(update *xdsresource.EndpointsResourceData, onDone xdsresource.OnDoneFunc) {
if er.stopped.HasFired() {
onDone.OnDone()
onDone()

Check warning on line 81 in xds/internal/balancer/clusterresolver/resource_resolver_eds.go

View check run for this annotation

Codecov / codecov/patch

xds/internal/balancer/clusterresolver/resource_resolver_eds.go#L81

Added line #L81 was not covered by tests
return
}

Expand All @@ -89,9 +89,9 @@
er.topLevelResolver.onUpdate(onDone)
}

func (er *edsDiscoveryMechanism) OnError(err error, onDone xdsresource.DoneNotifier) {
func (er *edsDiscoveryMechanism) OnError(err error, onDone xdsresource.OnDoneFunc) {
if er.stopped.HasFired() {
onDone.OnDone()
onDone()

Check warning on line 94 in xds/internal/balancer/clusterresolver/resource_resolver_eds.go

View check run for this annotation

Codecov / codecov/patch

xds/internal/balancer/clusterresolver/resource_resolver_eds.go#L94

Added line #L94 was not covered by tests
return
}

Expand All @@ -104,7 +104,7 @@
// Continue using a previously received good configuration if one
// exists.
er.mu.Unlock()
onDone.OnDone()
onDone()
return
}

Expand All @@ -120,9 +120,9 @@
er.topLevelResolver.onUpdate(onDone)
}

func (er *edsDiscoveryMechanism) OnResourceDoesNotExist(onDone xdsresource.DoneNotifier) {
func (er *edsDiscoveryMechanism) OnResourceDoesNotExist(onDone xdsresource.OnDoneFunc) {
if er.stopped.HasFired() {
onDone.OnDone()
onDone()

Check warning on line 125 in xds/internal/balancer/clusterresolver/resource_resolver_eds.go

View check run for this annotation

Codecov / codecov/patch

xds/internal/balancer/clusterresolver/resource_resolver_eds.go#L125

Added line #L125 was not covered by tests
return
}

Expand Down
36 changes: 18 additions & 18 deletions xds/internal/resolver/watch_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,19 +36,19 @@
return lw
}

func (l *listenerWatcher) OnUpdate(update *xdsresource.ListenerResourceData, onDone xdsresource.DoneNotifier) {
handleUpdate := func(context.Context) { l.parent.onListenerResourceUpdate(update.Resource); onDone.OnDone() }
l.parent.serializer.ScheduleOr(handleUpdate, onDone.OnDone)
func (l *listenerWatcher) OnUpdate(update *xdsresource.ListenerResourceData, onDone xdsresource.OnDoneFunc) {
handleUpdate := func(context.Context) { l.parent.onListenerResourceUpdate(update.Resource); onDone() }
l.parent.serializer.ScheduleOr(handleUpdate, onDone)
}

func (l *listenerWatcher) OnError(err error, onDone xdsresource.DoneNotifier) {
handleError := func(context.Context) { l.parent.onListenerResourceError(err); onDone.OnDone() }
l.parent.serializer.ScheduleOr(handleError, onDone.OnDone)
func (l *listenerWatcher) OnError(err error, onDone xdsresource.OnDoneFunc) {
handleError := func(context.Context) { l.parent.onListenerResourceError(err); onDone() }
l.parent.serializer.ScheduleOr(handleError, onDone)
}

func (l *listenerWatcher) OnResourceDoesNotExist(onDone xdsresource.DoneNotifier) {
handleNotFound := func(context.Context) { l.parent.onListenerResourceNotFound(); onDone.OnDone() }
l.parent.serializer.ScheduleOr(handleNotFound, onDone.OnDone)
func (l *listenerWatcher) OnResourceDoesNotExist(onDone xdsresource.OnDoneFunc) {
handleNotFound := func(context.Context) { l.parent.onListenerResourceNotFound(); onDone() }
l.parent.serializer.ScheduleOr(handleNotFound, onDone)
}

func (l *listenerWatcher) stop() {
Expand All @@ -68,22 +68,22 @@
return rw
}

func (r *routeConfigWatcher) OnUpdate(u *xdsresource.RouteConfigResourceData, onDone xdsresource.DoneNotifier) {
func (r *routeConfigWatcher) OnUpdate(u *xdsresource.RouteConfigResourceData, onDone xdsresource.OnDoneFunc) {
handleUpdate := func(context.Context) {
r.parent.onRouteConfigResourceUpdate(r.resourceName, u.Resource)
onDone.OnDone()
onDone()
}
r.parent.serializer.ScheduleOr(handleUpdate, onDone.OnDone)
r.parent.serializer.ScheduleOr(handleUpdate, onDone)
}

func (r *routeConfigWatcher) OnError(err error, onDone xdsresource.DoneNotifier) {
handleError := func(context.Context) { r.parent.onRouteConfigResourceError(r.resourceName, err); onDone.OnDone() }
r.parent.serializer.ScheduleOr(handleError, onDone.OnDone)
func (r *routeConfigWatcher) OnError(err error, onDone xdsresource.OnDoneFunc) {
handleError := func(context.Context) { r.parent.onRouteConfigResourceError(r.resourceName, err); onDone() }
r.parent.serializer.ScheduleOr(handleError, onDone)
}

func (r *routeConfigWatcher) OnResourceDoesNotExist(onDone xdsresource.DoneNotifier) {
handleNotFound := func(context.Context) { r.parent.onRouteConfigResourceNotFound(r.resourceName); onDone.OnDone() }
r.parent.serializer.ScheduleOr(handleNotFound, onDone.OnDone)
func (r *routeConfigWatcher) OnResourceDoesNotExist(onDone xdsresource.OnDoneFunc) {
handleNotFound := func(context.Context) { r.parent.onRouteConfigResourceNotFound(r.resourceName); onDone() }
r.parent.serializer.ScheduleOr(handleNotFound, onDone)

Check warning on line 86 in xds/internal/resolver/watch_service.go

View check run for this annotation

Codecov / codecov/patch

xds/internal/resolver/watch_service.go#L84-L86

Added lines #L84 - L86 were not covered by tests
}

func (r *routeConfigWatcher) stop() {
Expand Down
Loading
Loading