From 0f73133e3aa3bf4505007ebd0cf3508dfc952a22 Mon Sep 17 00:00:00 2001 From: Easwar Swaminathan Date: Tue, 18 Aug 2020 15:40:27 -0700 Subject: [PATCH] xds: Export a WatchListener() method on the xdsClient. (#3817) --- xds/internal/client/client.go | 66 +++++++- xds/internal/client/client_callback.go | 12 +- xds/internal/client/client_test.go | 34 ++-- xds/internal/client/client_watchers.go | 92 +++++----- .../client/client_watchers_cluster_test.go | 21 ++- .../client/client_watchers_endpoints_test.go | 16 +- .../client/client_watchers_lds_test.go | 17 +- .../client/client_watchers_rds_test.go | 13 +- .../client/client_watchers_service_test.go | 40 ++--- xds/internal/client/client_xds.go | 22 +-- xds/internal/client/transport_helper.go | 105 ++++++------ xds/internal/client/v2/client.go | 62 ++++--- xds/internal/client/v2/client_ack_test.go | 157 +++++++++--------- xds/internal/client/v2/client_cds_test.go | 4 +- xds/internal/client/v2/client_eds_test.go | 4 +- xds/internal/client/v2/client_lds_test.go | 6 +- xds/internal/client/v2/client_rds_test.go | 10 +- xds/internal/client/v2/client_test.go | 52 +++--- xds/internal/client/v3/client.go | 64 ++++--- xds/xds.go | 1 + 20 files changed, 445 insertions(+), 353 deletions(-) diff --git a/xds/internal/client/client.go b/xds/internal/client/client.go index 7088c63befcb..2f260643777b 100644 --- a/xds/internal/client/client.go +++ b/xds/internal/client/client.go @@ -96,10 +96,10 @@ type APIClientBuilder interface { // version specific implementations of the xDS client. type APIClient interface { // AddWatch adds a watch for an xDS resource given its type and name. - AddWatch(resourceType, resourceName string) + AddWatch(ResourceType, string) // RemoveWatch cancels an already registered watch for an xDS resource // given its type and name. - RemoveWatch(resourceType, resourceName string) + RemoveWatch(ResourceType, string) // Close cleans up resources allocated by the API client. Close() } @@ -396,3 +396,65 @@ func (c *Client) Close() { c.cc.Close() c.logger.Infof("Shutdown") } + +// ResourceType identifies resources in a transport protocol agnostic way. These +// will be used in transport version agnostic code, while the versioned API +// clients will map these to appropriate version URLs. +type ResourceType int + +// Version agnostic resource type constants. +const ( + UnknownResource ResourceType = iota + ListenerResource + HTTPConnManagerResource + RouteConfigResource + ClusterResource + EndpointsResource +) + +func (r ResourceType) String() string { + switch r { + case ListenerResource: + return "ListenerResource" + case HTTPConnManagerResource: + return "HTTPConnManagerResource" + case RouteConfigResource: + return "RouteConfigResource" + case ClusterResource: + return "ClusterResource" + case EndpointsResource: + return "EndpointsResource" + default: + return "UnknownResource" + } +} + +// IsListenerResource returns true if the provider URL corresponds to an xDS +// Listener resource. +func IsListenerResource(url string) bool { + return url == version.V2ListenerURL || url == version.V3ListenerURL +} + +// IsHTTPConnManagerResource returns true if the provider URL corresponds to an xDS +// HTTPConnManager resource. +func IsHTTPConnManagerResource(url string) bool { + return url == version.V2HTTPConnManagerURL || url == version.V3HTTPConnManagerURL +} + +// IsRouteConfigResource returns true if the provider URL corresponds to an xDS +// RouteConfig resource. +func IsRouteConfigResource(url string) bool { + return url == version.V2RouteConfigURL || url == version.V3RouteConfigURL +} + +// IsClusterResource returns true if the provider URL corresponds to an xDS +// Cluster resource. +func IsClusterResource(url string) bool { + return url == version.V2ClusterURL || url == version.V3ClusterURL +} + +// IsEndpointsResource returns true if the provider URL corresponds to an xDS +// Endpoints resource. +func IsEndpointsResource(url string) bool { + return url == version.V2EndpointsURL || url == version.V3EndpointsURL +} diff --git a/xds/internal/client/client_callback.go b/xds/internal/client/client_callback.go index a00257e4e658..a135dae745b9 100644 --- a/xds/internal/client/client_callback.go +++ b/xds/internal/client/client_callback.go @@ -18,8 +18,6 @@ package client -import "google.golang.org/grpc/xds/internal/version" - type watcherInfoWithUpdate struct { wi *watchInfo update interface{} @@ -46,20 +44,20 @@ func (c *Client) callCallback(wiu *watcherInfoWithUpdate) { // window that a watcher's callback could be called after the watcher is // canceled, and the user needs to take care of it. var ccb func() - switch wiu.wi.typeURL { - case version.V2ListenerURL: + switch wiu.wi.rType { + case ListenerResource: if s, ok := c.ldsWatchers[wiu.wi.target]; ok && s[wiu.wi] { ccb = func() { wiu.wi.ldsCallback(wiu.update.(ListenerUpdate), wiu.err) } } - case version.V2RouteConfigURL: + case RouteConfigResource: if s, ok := c.rdsWatchers[wiu.wi.target]; ok && s[wiu.wi] { ccb = func() { wiu.wi.rdsCallback(wiu.update.(RouteConfigUpdate), wiu.err) } } - case version.V2ClusterURL: + case ClusterResource: if s, ok := c.cdsWatchers[wiu.wi.target]; ok && s[wiu.wi] { ccb = func() { wiu.wi.cdsCallback(wiu.update.(ClusterUpdate), wiu.err) } } - case version.V2EndpointsURL: + case EndpointsResource: if s, ok := c.edsWatchers[wiu.wi.target]; ok && s[wiu.wi] { ccb = func() { wiu.wi.edsCallback(wiu.update.(EndpointsUpdate), wiu.err) } } diff --git a/xds/internal/client/client_test.go b/xds/internal/client/client_test.go index 9584e28b9d17..729be4cc0726 100644 --- a/xds/internal/client/client_test.go +++ b/xds/internal/client/client_test.go @@ -67,8 +67,8 @@ func clientOpts(balancerName string, overrideWatchExpiryTImeout bool) Options { type testAPIClient struct { r UpdateHandler - addWatches map[string]*testutils.Channel - removeWatches map[string]*testutils.Channel + addWatches map[ResourceType]*testutils.Channel + removeWatches map[ResourceType]*testutils.Channel } func overrideNewAPIClient() (<-chan *testAPIClient, func()) { @@ -83,16 +83,18 @@ func overrideNewAPIClient() (<-chan *testAPIClient, func()) { } func newTestAPIClient(r UpdateHandler) *testAPIClient { - addWatches := make(map[string]*testutils.Channel) - addWatches[version.V2ListenerURL] = testutils.NewChannel() - addWatches[version.V2RouteConfigURL] = testutils.NewChannel() - addWatches[version.V2ClusterURL] = testutils.NewChannel() - addWatches[version.V2EndpointsURL] = testutils.NewChannel() - removeWatches := make(map[string]*testutils.Channel) - removeWatches[version.V2ListenerURL] = testutils.NewChannel() - removeWatches[version.V2RouteConfigURL] = testutils.NewChannel() - removeWatches[version.V2ClusterURL] = testutils.NewChannel() - removeWatches[version.V2EndpointsURL] = testutils.NewChannel() + addWatches := map[ResourceType]*testutils.Channel{ + ListenerResource: testutils.NewChannel(), + RouteConfigResource: testutils.NewChannel(), + ClusterResource: testutils.NewChannel(), + EndpointsResource: testutils.NewChannel(), + } + removeWatches := map[ResourceType]*testutils.Channel{ + ListenerResource: testutils.NewChannel(), + RouteConfigResource: testutils.NewChannel(), + ClusterResource: testutils.NewChannel(), + EndpointsResource: testutils.NewChannel(), + } return &testAPIClient{ r: r, addWatches: addWatches, @@ -100,11 +102,11 @@ func newTestAPIClient(r UpdateHandler) *testAPIClient { } } -func (c *testAPIClient) AddWatch(resourceType, resourceName string) { +func (c *testAPIClient) AddWatch(resourceType ResourceType, resourceName string) { c.addWatches[resourceType].Send(resourceName) } -func (c *testAPIClient) RemoveWatch(resourceType, resourceName string) { +func (c *testAPIClient) RemoveWatch(resourceType ResourceType, resourceName string) { c.removeWatches[resourceType].Send(resourceName) } @@ -130,12 +132,12 @@ func (s) TestWatchCallAnotherWatch(t *testing.T) { clusterUpdateCh.Send(clusterUpdateErr{u: update, err: err}) // Calls another watch inline, to ensure there's deadlock. c.WatchCluster("another-random-name", func(ClusterUpdate, error) {}) - if _, err := v2Client.addWatches[version.V2ClusterURL].Receive(); firstTime && err != nil { + if _, err := v2Client.addWatches[ClusterResource].Receive(); firstTime && err != nil { t.Fatalf("want new watch to start, got error %v", err) } firstTime = false }) - if _, err := v2Client.addWatches[version.V2ClusterURL].Receive(); err != nil { + if _, err := v2Client.addWatches[ClusterResource].Receive(); err != nil { t.Fatalf("want new watch to start, got error %v", err) } diff --git a/xds/internal/client/client_watchers.go b/xds/internal/client/client_watchers.go index 2cfec37dcc5f..7f67be894226 100644 --- a/xds/internal/client/client_watchers.go +++ b/xds/internal/client/client_watchers.go @@ -22,8 +22,6 @@ import ( "fmt" "sync" "time" - - "google.golang.org/grpc/xds/internal/version" ) type watchInfoState int @@ -37,9 +35,9 @@ const ( // watchInfo holds all the information from a watch() call. type watchInfo struct { - c *Client - typeURL string - target string + c *Client + rType ResourceType + target string ldsCallback func(ListenerUpdate, error) rdsCallback func(RouteConfigUpdate, error) @@ -74,7 +72,7 @@ func (wi *watchInfo) resourceNotFound() { } wi.state = watchInfoStateRespReceived wi.expiryTimer.Stop() - wi.sendErrorLocked(NewErrorf(ErrorTypeResourceNotFound, "xds: %s target %s not found in received response", wi.typeURL, wi.target)) + wi.sendErrorLocked(NewErrorf(ErrorTypeResourceNotFound, "xds: %v target %s not found in received response", wi.rType, wi.target)) } func (wi *watchInfo) timeout() { @@ -84,7 +82,7 @@ func (wi *watchInfo) timeout() { return } wi.state = watchInfoStateTimeout - wi.sendErrorLocked(fmt.Errorf("xds: %s target %s not found, watcher timeout", wi.typeURL, wi.target)) + wi.sendErrorLocked(fmt.Errorf("xds: %v target %s not found, watcher timeout", wi.rType, wi.target)) } // Caller must hold wi.mu. @@ -92,14 +90,14 @@ func (wi *watchInfo) sendErrorLocked(err error) { var ( u interface{} ) - switch wi.typeURL { - case version.V2ListenerURL: + switch wi.rType { + case ListenerResource: u = ListenerUpdate{} - case version.V2RouteConfigURL: + case RouteConfigResource: u = RouteConfigUpdate{} - case version.V2ClusterURL: + case ClusterResource: u = ClusterUpdate{} - case version.V2EndpointsURL: + case EndpointsResource: u = EndpointsUpdate{} } wi.c.scheduleCallback(wi, u, err) @@ -118,54 +116,54 @@ func (wi *watchInfo) cancel() { func (c *Client) watch(wi *watchInfo) (cancel func()) { c.mu.Lock() defer c.mu.Unlock() - c.logger.Debugf("new watch for type %v, resource name %v", wi.typeURL, wi.target) + c.logger.Debugf("new watch for type %v, resource name %v", wi.rType, wi.target) var watchers map[string]map[*watchInfo]bool - switch wi.typeURL { - case version.V2ListenerURL: + switch wi.rType { + case ListenerResource: watchers = c.ldsWatchers - case version.V2RouteConfigURL: + case RouteConfigResource: watchers = c.rdsWatchers - case version.V2ClusterURL: + case ClusterResource: watchers = c.cdsWatchers - case version.V2EndpointsURL: + case EndpointsResource: watchers = c.edsWatchers } resourceName := wi.target s, ok := watchers[wi.target] if !ok { - // If this is a new watcher, will ask lower level to send a new request with - // the resource name. + // If this is a new watcher, will ask lower level to send a new request + // with the resource name. // - // If this type+name is already being watched, will not notify the - // underlying xdsv2Client. - c.logger.Debugf("first watch for type %v, resource name %v, will send a new xDS request", wi.typeURL, wi.target) + // If this (type+name) is already being watched, will not notify the + // underlying versioned apiClient. + c.logger.Debugf("first watch for type %v, resource name %v, will send a new xDS request", wi.rType, wi.target) s = make(map[*watchInfo]bool) watchers[resourceName] = s - c.apiClient.AddWatch(wi.typeURL, resourceName) + c.apiClient.AddWatch(wi.rType, resourceName) } // No matter what, add the new watcher to the set, so it's callback will be // call for new responses. s[wi] = true // If the resource is in cache, call the callback with the value. - switch wi.typeURL { - case version.V2ListenerURL: + switch wi.rType { + case ListenerResource: if v, ok := c.ldsCache[resourceName]; ok { c.logger.Debugf("LDS resource with name %v found in cache: %+v", wi.target, v) wi.newUpdate(v) } - case version.V2RouteConfigURL: + case RouteConfigResource: if v, ok := c.rdsCache[resourceName]; ok { c.logger.Debugf("RDS resource with name %v found in cache: %+v", wi.target, v) wi.newUpdate(v) } - case version.V2ClusterURL: + case ClusterResource: if v, ok := c.cdsCache[resourceName]; ok { c.logger.Debugf("CDS resource with name %v found in cache: %+v", wi.target, v) wi.newUpdate(v) } - case version.V2EndpointsURL: + case EndpointsResource: if v, ok := c.edsCache[resourceName]; ok { c.logger.Debugf("EDS resource with name %v found in cache: %+v", wi.target, v) wi.newUpdate(v) @@ -173,7 +171,7 @@ func (c *Client) watch(wi *watchInfo) (cancel func()) { } return func() { - c.logger.Debugf("watch for type %v, resource name %v canceled", wi.typeURL, wi.target) + c.logger.Debugf("watch for type %v, resource name %v canceled", wi.rType, wi.target) wi.cancel() c.mu.Lock() defer c.mu.Unlock() @@ -182,22 +180,22 @@ func (c *Client) watch(wi *watchInfo) (cancel func()) { // future. delete(s, wi) if len(s) == 0 { - c.logger.Debugf("last watch for type %v, resource name %v canceled, will send a new xDS request", wi.typeURL, wi.target) + c.logger.Debugf("last watch for type %v, resource name %v canceled, will send a new xDS request", wi.rType, wi.target) // If this was the last watcher, also tell xdsv2Client to stop // watching this resource. delete(watchers, resourceName) - c.apiClient.RemoveWatch(wi.typeURL, resourceName) + c.apiClient.RemoveWatch(wi.rType, resourceName) // Remove the resource from cache. When a watch for this // resource is added later, it will trigger a xDS request with // resource names, and client will receive new xDS responses. - switch wi.typeURL { - case version.V2ListenerURL: + switch wi.rType { + case ListenerResource: delete(c.ldsCache, resourceName) - case version.V2RouteConfigURL: + case RouteConfigResource: delete(c.rdsCache, resourceName) - case version.V2ClusterURL: + case ClusterResource: delete(c.cdsCache, resourceName) - case version.V2EndpointsURL: + case EndpointsResource: delete(c.edsCache, resourceName) } } @@ -205,6 +203,18 @@ func (c *Client) watch(wi *watchInfo) (cancel func()) { } } +// WatchListener uses LDS to discover information about the provided listener. +// +// WatchListener is expected to called only from the server side implementation +// of xDS. Clients will use WatchService instead. +// +// Note that during race (e.g. an xDS response is received while the user is +// calling cancel()), there's a small window where the callback can be called +// after the watcher is canceled. The caller needs to handle this case. +func (c *Client) WatchListener(listener string, cb func(ListenerUpdate, error)) (cancel func()) { + return c.watchLDS(listener, cb) +} + // watchLDS starts a listener watcher for the service.. // // Note that during race (e.g. an xDS response is received while the user is @@ -213,7 +223,7 @@ func (c *Client) watch(wi *watchInfo) (cancel func()) { func (c *Client) watchLDS(serviceName string, cb func(ListenerUpdate, error)) (cancel func()) { wi := &watchInfo{ c: c, - typeURL: version.V2ListenerURL, + rType: ListenerResource, target: serviceName, ldsCallback: cb, } @@ -232,7 +242,7 @@ func (c *Client) watchLDS(serviceName string, cb func(ListenerUpdate, error)) (c func (c *Client) watchRDS(routeName string, cb func(RouteConfigUpdate, error)) (cancel func()) { wi := &watchInfo{ c: c, - typeURL: version.V2RouteConfigURL, + rType: RouteConfigResource, target: routeName, rdsCallback: cb, } @@ -358,7 +368,7 @@ func (w *serviceUpdateWatcher) close() { func (c *Client) WatchCluster(clusterName string, cb func(ClusterUpdate, error)) (cancel func()) { wi := &watchInfo{ c: c, - typeURL: version.V2ClusterURL, + rType: ClusterResource, target: clusterName, cdsCallback: cb, } @@ -380,7 +390,7 @@ func (c *Client) WatchCluster(clusterName string, cb func(ClusterUpdate, error)) func (c *Client) WatchEndpoints(clusterName string, cb func(EndpointsUpdate, error)) (cancel func()) { wi := &watchInfo{ c: c, - typeURL: version.V2EndpointsURL, + rType: EndpointsResource, target: clusterName, edsCallback: cb, } diff --git a/xds/internal/client/client_watchers_cluster_test.go b/xds/internal/client/client_watchers_cluster_test.go index 93d32113a310..eedda3e0a2d5 100644 --- a/xds/internal/client/client_watchers_cluster_test.go +++ b/xds/internal/client/client_watchers_cluster_test.go @@ -22,7 +22,6 @@ import ( "testing" "google.golang.org/grpc/xds/internal/testutils" - "google.golang.org/grpc/xds/internal/version" ) type clusterUpdateErr struct { @@ -53,7 +52,7 @@ func (s) TestClusterWatch(t *testing.T) { cancelWatch := c.WatchCluster(testCDSName, func(update ClusterUpdate, err error) { clusterUpdateCh.Send(clusterUpdateErr{u: update, err: err}) }) - if _, err := v2Client.addWatches[version.V2ClusterURL].Receive(); err != nil { + if _, err := v2Client.addWatches[ClusterResource].Receive(); err != nil { t.Fatalf("want new watch to start, got error %v", err) } @@ -117,7 +116,7 @@ func (s) TestClusterTwoWatchSameResourceName(t *testing.T) { cancelLastWatch = c.WatchCluster(testCDSName, func(update ClusterUpdate, err error) { clusterUpdateCh.Send(clusterUpdateErr{u: update, err: err}) }) - if _, err := v2Client.addWatches[version.V2ClusterURL].Receive(); i == 0 && err != nil { + if _, err := v2Client.addWatches[ClusterResource].Receive(); i == 0 && err != nil { t.Fatalf("want new watch to start, got error %v", err) } } @@ -174,7 +173,7 @@ func (s) TestClusterThreeWatchDifferentResourceName(t *testing.T) { c.WatchCluster(testCDSName+"1", func(update ClusterUpdate, err error) { clusterUpdateCh.Send(clusterUpdateErr{u: update, err: err}) }) - if _, err := v2Client.addWatches[version.V2ClusterURL].Receive(); i == 0 && err != nil { + if _, err := v2Client.addWatches[ClusterResource].Receive(); i == 0 && err != nil { t.Fatalf("want new watch to start, got error %v", err) } } @@ -184,7 +183,7 @@ func (s) TestClusterThreeWatchDifferentResourceName(t *testing.T) { c.WatchCluster(testCDSName+"2", func(update ClusterUpdate, err error) { clusterUpdateCh2.Send(clusterUpdateErr{u: update, err: err}) }) - if _, err := v2Client.addWatches[version.V2ClusterURL].Receive(); err != nil { + if _, err := v2Client.addWatches[ClusterResource].Receive(); err != nil { t.Fatalf("want new watch to start, got error %v", err) } @@ -224,7 +223,7 @@ func (s) TestClusterWatchAfterCache(t *testing.T) { c.WatchCluster(testCDSName, func(update ClusterUpdate, err error) { clusterUpdateCh.Send(clusterUpdateErr{u: update, err: err}) }) - if _, err := v2Client.addWatches[version.V2ClusterURL].Receive(); err != nil { + if _, err := v2Client.addWatches[ClusterResource].Receive(); err != nil { t.Fatalf("want new watch to start, got error %v", err) } @@ -242,7 +241,7 @@ func (s) TestClusterWatchAfterCache(t *testing.T) { c.WatchCluster(testCDSName, func(update ClusterUpdate, err error) { clusterUpdateCh2.Send(clusterUpdateErr{u: update, err: err}) }) - if n, err := v2Client.addWatches[version.V2ClusterURL].Receive(); err == nil { + if n, err := v2Client.addWatches[ClusterResource].Receive(); err == nil { t.Fatalf("want no new watch to start (recv timeout), got resource name: %v error %v", n, err) } @@ -276,7 +275,7 @@ func (s) TestClusterWatchExpiryTimer(t *testing.T) { c.WatchCluster(testCDSName, func(u ClusterUpdate, err error) { clusterUpdateCh.Send(clusterUpdateErr{u: u, err: err}) }) - if _, err := v2Client.addWatches[version.V2ClusterURL].Receive(); err != nil { + if _, err := v2Client.addWatches[ClusterResource].Receive(); err != nil { t.Fatalf("want new watch to start, got error %v", err) } @@ -312,7 +311,7 @@ func (s) TestClusterWatchExpiryTimerStop(t *testing.T) { c.WatchCluster(testCDSName, func(u ClusterUpdate, err error) { clusterUpdateCh.Send(clusterUpdateErr{u: u, err: err}) }) - if _, err := v2Client.addWatches[version.V2ClusterURL].Receive(); err != nil { + if _, err := v2Client.addWatches[ClusterResource].Receive(); err != nil { t.Fatalf("want new watch to start, got error %v", err) } @@ -354,7 +353,7 @@ func (s) TestClusterResourceRemoved(t *testing.T) { c.WatchCluster(testCDSName+"1", func(update ClusterUpdate, err error) { clusterUpdateCh1.Send(clusterUpdateErr{u: update, err: err}) }) - if _, err := v2Client.addWatches[version.V2ClusterURL].Receive(); err != nil { + if _, err := v2Client.addWatches[ClusterResource].Receive(); err != nil { t.Fatalf("want new watch to start, got error %v", err) } // Another watch for a different name. @@ -362,7 +361,7 @@ func (s) TestClusterResourceRemoved(t *testing.T) { c.WatchCluster(testCDSName+"2", func(update ClusterUpdate, err error) { clusterUpdateCh2.Send(clusterUpdateErr{u: update, err: err}) }) - if _, err := v2Client.addWatches[version.V2ClusterURL].Receive(); err != nil { + if _, err := v2Client.addWatches[ClusterResource].Receive(); err != nil { t.Fatalf("want new watch to start, got error %v", err) } diff --git a/xds/internal/client/client_watchers_endpoints_test.go b/xds/internal/client/client_watchers_endpoints_test.go index c7e8bd8e52e0..09dc80817284 100644 --- a/xds/internal/client/client_watchers_endpoints_test.go +++ b/xds/internal/client/client_watchers_endpoints_test.go @@ -23,9 +23,9 @@ import ( "github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp/cmpopts" + "google.golang.org/grpc/xds/internal" "google.golang.org/grpc/xds/internal/testutils" - "google.golang.org/grpc/xds/internal/version" ) var ( @@ -71,7 +71,7 @@ func (s) TestEndpointsWatch(t *testing.T) { cancelWatch := c.WatchEndpoints(testCDSName, func(update EndpointsUpdate, err error) { endpointsUpdateCh.Send(endpointsUpdateErr{u: update, err: err}) }) - if _, err := v2Client.addWatches[version.V2EndpointsURL].Receive(); err != nil { + if _, err := v2Client.addWatches[EndpointsResource].Receive(); err != nil { t.Fatalf("want new watch to start, got error %v", err) } @@ -129,7 +129,7 @@ func (s) TestEndpointsTwoWatchSameResourceName(t *testing.T) { cancelLastWatch = c.WatchEndpoints(testCDSName, func(update EndpointsUpdate, err error) { endpointsUpdateCh.Send(endpointsUpdateErr{u: update, err: err}) }) - if _, err := v2Client.addWatches[version.V2EndpointsURL].Receive(); i == 0 && err != nil { + if _, err := v2Client.addWatches[EndpointsResource].Receive(); i == 0 && err != nil { t.Fatalf("want new watch to start, got error %v", err) } } @@ -186,7 +186,7 @@ func (s) TestEndpointsThreeWatchDifferentResourceName(t *testing.T) { c.WatchEndpoints(testCDSName+"1", func(update EndpointsUpdate, err error) { endpointsUpdateCh.Send(endpointsUpdateErr{u: update, err: err}) }) - if _, err := v2Client.addWatches[version.V2EndpointsURL].Receive(); i == 0 && err != nil { + if _, err := v2Client.addWatches[EndpointsResource].Receive(); i == 0 && err != nil { t.Fatalf("want new watch to start, got error %v", err) } } @@ -196,7 +196,7 @@ func (s) TestEndpointsThreeWatchDifferentResourceName(t *testing.T) { c.WatchEndpoints(testCDSName+"2", func(update EndpointsUpdate, err error) { endpointsUpdateCh2.Send(endpointsUpdateErr{u: update, err: err}) }) - if _, err := v2Client.addWatches[version.V2EndpointsURL].Receive(); err != nil { + if _, err := v2Client.addWatches[EndpointsResource].Receive(); err != nil { t.Fatalf("want new watch to start, got error %v", err) } @@ -236,7 +236,7 @@ func (s) TestEndpointsWatchAfterCache(t *testing.T) { c.WatchEndpoints(testCDSName, func(update EndpointsUpdate, err error) { endpointsUpdateCh.Send(endpointsUpdateErr{u: update, err: err}) }) - if _, err := v2Client.addWatches[version.V2EndpointsURL].Receive(); err != nil { + if _, err := v2Client.addWatches[EndpointsResource].Receive(); err != nil { t.Fatalf("want new watch to start, got error %v", err) } @@ -254,7 +254,7 @@ func (s) TestEndpointsWatchAfterCache(t *testing.T) { c.WatchEndpoints(testCDSName, func(update EndpointsUpdate, err error) { endpointsUpdateCh2.Send(endpointsUpdateErr{u: update, err: err}) }) - if n, err := v2Client.addWatches[version.V2EndpointsURL].Receive(); err == nil { + if n, err := v2Client.addWatches[EndpointsResource].Receive(); err == nil { t.Fatalf("want no new watch to start (recv timeout), got resource name: %v error %v", n, err) } @@ -288,7 +288,7 @@ func (s) TestEndpointsWatchExpiryTimer(t *testing.T) { c.WatchEndpoints(testCDSName, func(update EndpointsUpdate, err error) { endpointsUpdateCh.Send(endpointsUpdateErr{u: update, err: err}) }) - if _, err := v2Client.addWatches[version.V2EndpointsURL].Receive(); err != nil { + if _, err := v2Client.addWatches[EndpointsResource].Receive(); err != nil { t.Fatalf("want new watch to start, got error %v", err) } diff --git a/xds/internal/client/client_watchers_lds_test.go b/xds/internal/client/client_watchers_lds_test.go index c052e8bf4ce5..b2bcef8c7237 100644 --- a/xds/internal/client/client_watchers_lds_test.go +++ b/xds/internal/client/client_watchers_lds_test.go @@ -22,7 +22,6 @@ import ( "testing" "google.golang.org/grpc/xds/internal/testutils" - "google.golang.org/grpc/xds/internal/version" ) type ldsUpdateErr struct { @@ -50,7 +49,7 @@ func (s) TestLDSWatch(t *testing.T) { cancelWatch := c.watchLDS(testLDSName, func(update ListenerUpdate, err error) { ldsUpdateCh.Send(ldsUpdateErr{u: update, err: err}) }) - if _, err := v2Client.addWatches[version.V2ListenerURL].Receive(); err != nil { + if _, err := v2Client.addWatches[ListenerResource].Receive(); err != nil { t.Fatalf("want new watch to start, got error %v", err) } @@ -109,7 +108,7 @@ func (s) TestLDSTwoWatchSameResourceName(t *testing.T) { cancelLastWatch = c.watchLDS(testLDSName, func(update ListenerUpdate, err error) { ldsUpdateCh.Send(ldsUpdateErr{u: update, err: err}) }) - if _, err := v2Client.addWatches[version.V2ListenerURL].Receive(); i == 0 && err != nil { + if _, err := v2Client.addWatches[ListenerResource].Receive(); i == 0 && err != nil { t.Fatalf("want new watch to start, got error %v", err) } } @@ -166,7 +165,7 @@ func (s) TestLDSThreeWatchDifferentResourceName(t *testing.T) { c.watchLDS(testLDSName+"1", func(update ListenerUpdate, err error) { ldsUpdateCh.Send(ldsUpdateErr{u: update, err: err}) }) - if _, err := v2Client.addWatches[version.V2ListenerURL].Receive(); i == 0 && err != nil { + if _, err := v2Client.addWatches[ListenerResource].Receive(); i == 0 && err != nil { t.Fatalf("want new watch to start, got error %v", err) } } @@ -176,7 +175,7 @@ func (s) TestLDSThreeWatchDifferentResourceName(t *testing.T) { c.watchLDS(testLDSName+"2", func(update ListenerUpdate, err error) { ldsUpdateCh2.Send(ldsUpdateErr{u: update, err: err}) }) - if _, err := v2Client.addWatches[version.V2ListenerURL].Receive(); err != nil { + if _, err := v2Client.addWatches[ListenerResource].Receive(); err != nil { t.Fatalf("want new watch to start, got error %v", err) } @@ -216,7 +215,7 @@ func (s) TestLDSWatchAfterCache(t *testing.T) { c.watchLDS(testLDSName, func(update ListenerUpdate, err error) { ldsUpdateCh.Send(ldsUpdateErr{u: update, err: err}) }) - if _, err := v2Client.addWatches[version.V2ListenerURL].Receive(); err != nil { + if _, err := v2Client.addWatches[ListenerResource].Receive(); err != nil { t.Fatalf("want new watch to start, got error %v", err) } @@ -234,7 +233,7 @@ func (s) TestLDSWatchAfterCache(t *testing.T) { c.watchLDS(testLDSName, func(update ListenerUpdate, err error) { ldsUpdateCh2.Send(ldsUpdateErr{u: update, err: err}) }) - if n, err := v2Client.addWatches[version.V2ListenerURL].Receive(); err == nil { + if n, err := v2Client.addWatches[ListenerResource].Receive(); err == nil { t.Fatalf("want no new watch to start (recv timeout), got resource name: %v error %v", n, err) } @@ -271,7 +270,7 @@ func (s) TestLDSResourceRemoved(t *testing.T) { c.watchLDS(testLDSName+"1", func(update ListenerUpdate, err error) { ldsUpdateCh1.Send(ldsUpdateErr{u: update, err: err}) }) - if _, err := v2Client.addWatches[version.V2ListenerURL].Receive(); err != nil { + if _, err := v2Client.addWatches[ListenerResource].Receive(); err != nil { t.Fatalf("want new watch to start, got error %v", err) } // Another watch for a different name. @@ -279,7 +278,7 @@ func (s) TestLDSResourceRemoved(t *testing.T) { c.watchLDS(testLDSName+"2", func(update ListenerUpdate, err error) { ldsUpdateCh2.Send(ldsUpdateErr{u: update, err: err}) }) - if _, err := v2Client.addWatches[version.V2ListenerURL].Receive(); err != nil { + if _, err := v2Client.addWatches[ListenerResource].Receive(); err != nil { t.Fatalf("want new watch to start, got error %v", err) } diff --git a/xds/internal/client/client_watchers_rds_test.go b/xds/internal/client/client_watchers_rds_test.go index 6156d195c740..f9bb30d559f4 100644 --- a/xds/internal/client/client_watchers_rds_test.go +++ b/xds/internal/client/client_watchers_rds_test.go @@ -23,7 +23,6 @@ import ( "github.com/google/go-cmp/cmp" "google.golang.org/grpc/xds/internal/testutils" - "google.golang.org/grpc/xds/internal/version" ) type rdsUpdateErr struct { @@ -51,7 +50,7 @@ func (s) TestRDSWatch(t *testing.T) { cancelWatch := c.watchRDS(testRDSName, func(update RouteConfigUpdate, err error) { rdsUpdateCh.Send(rdsUpdateErr{u: update, err: err}) }) - if _, err := v2Client.addWatches[version.V2RouteConfigURL].Receive(); err != nil { + if _, err := v2Client.addWatches[RouteConfigResource].Receive(); err != nil { t.Fatalf("want new watch to start, got error %v", err) } @@ -109,7 +108,7 @@ func (s) TestRDSTwoWatchSameResourceName(t *testing.T) { cancelLastWatch = c.watchRDS(testRDSName, func(update RouteConfigUpdate, err error) { rdsUpdateCh.Send(rdsUpdateErr{u: update, err: err}) }) - if _, err := v2Client.addWatches[version.V2RouteConfigURL].Receive(); i == 0 && err != nil { + if _, err := v2Client.addWatches[RouteConfigResource].Receive(); i == 0 && err != nil { t.Fatalf("want new watch to start, got error %v", err) } } @@ -166,7 +165,7 @@ func (s) TestRDSThreeWatchDifferentResourceName(t *testing.T) { c.watchRDS(testRDSName+"1", func(update RouteConfigUpdate, err error) { rdsUpdateCh.Send(rdsUpdateErr{u: update, err: err}) }) - if _, err := v2Client.addWatches[version.V2RouteConfigURL].Receive(); i == 0 && err != nil { + if _, err := v2Client.addWatches[RouteConfigResource].Receive(); i == 0 && err != nil { t.Fatalf("want new watch to start, got error %v", err) } } @@ -176,7 +175,7 @@ func (s) TestRDSThreeWatchDifferentResourceName(t *testing.T) { c.watchRDS(testRDSName+"2", func(update RouteConfigUpdate, err error) { rdsUpdateCh2.Send(rdsUpdateErr{u: update, err: err}) }) - if _, err := v2Client.addWatches[version.V2RouteConfigURL].Receive(); err != nil { + if _, err := v2Client.addWatches[RouteConfigResource].Receive(); err != nil { t.Fatalf("want new watch to start, got error %v", err) } @@ -216,7 +215,7 @@ func (s) TestRDSWatchAfterCache(t *testing.T) { c.watchRDS(testRDSName, func(update RouteConfigUpdate, err error) { rdsUpdateCh.Send(rdsUpdateErr{u: update, err: err}) }) - if _, err := v2Client.addWatches[version.V2RouteConfigURL].Receive(); err != nil { + if _, err := v2Client.addWatches[RouteConfigResource].Receive(); err != nil { t.Fatalf("want new watch to start, got error %v", err) } @@ -234,7 +233,7 @@ func (s) TestRDSWatchAfterCache(t *testing.T) { c.watchRDS(testRDSName, func(update RouteConfigUpdate, err error) { rdsUpdateCh2.Send(rdsUpdateErr{u: update, err: err}) }) - if n, err := v2Client.addWatches[version.V2RouteConfigURL].Receive(); err == nil { + if n, err := v2Client.addWatches[RouteConfigResource].Receive(); err == nil { t.Fatalf("want no new watch to start (recv timeout), got resource name: %v error %v", n, err) } diff --git a/xds/internal/client/client_watchers_service_test.go b/xds/internal/client/client_watchers_service_test.go index 7009373b4e92..6f994686b8e4 100644 --- a/xds/internal/client/client_watchers_service_test.go +++ b/xds/internal/client/client_watchers_service_test.go @@ -23,8 +23,8 @@ import ( "github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp/cmpopts" + "google.golang.org/grpc/xds/internal/testutils" - "google.golang.org/grpc/xds/internal/version" ) type serviceUpdateErr struct { @@ -56,13 +56,13 @@ func (s) TestServiceWatch(t *testing.T) { wantUpdate := ServiceUpdate{Routes: []*Route{{Prefix: newStringP(""), Action: map[string]uint32{testCDSName: 1}}}} - if _, err := v2Client.addWatches[version.V2ListenerURL].Receive(); err != nil { + if _, err := v2Client.addWatches[ListenerResource].Receive(); err != nil { t.Fatalf("want new watch to start, got error %v", err) } v2Client.r.NewListeners(map[string]ListenerUpdate{ testLDSName: {RouteConfigName: testRDSName}, }) - if _, err := v2Client.addWatches[version.V2RouteConfigURL].Receive(); err != nil { + if _, err := v2Client.addWatches[RouteConfigResource].Receive(); err != nil { t.Fatalf("want new watch to start, got error %v", err) } v2Client.r.NewRouteConfigs(map[string]RouteConfigUpdate{ @@ -114,13 +114,13 @@ func (s) TestServiceWatchLDSUpdate(t *testing.T) { wantUpdate := ServiceUpdate{Routes: []*Route{{Prefix: newStringP(""), Action: map[string]uint32{testCDSName: 1}}}} - if _, err := v2Client.addWatches[version.V2ListenerURL].Receive(); err != nil { + if _, err := v2Client.addWatches[ListenerResource].Receive(); err != nil { t.Fatalf("want new watch to start, got error %v", err) } v2Client.r.NewListeners(map[string]ListenerUpdate{ testLDSName: {RouteConfigName: testRDSName}, }) - if _, err := v2Client.addWatches[version.V2RouteConfigURL].Receive(); err != nil { + if _, err := v2Client.addWatches[RouteConfigResource].Receive(); err != nil { t.Fatalf("want new watch to start, got error %v", err) } v2Client.r.NewRouteConfigs(map[string]RouteConfigUpdate{ @@ -135,7 +135,7 @@ func (s) TestServiceWatchLDSUpdate(t *testing.T) { v2Client.r.NewListeners(map[string]ListenerUpdate{ testLDSName: {RouteConfigName: testRDSName + "2"}, }) - if _, err := v2Client.addWatches[version.V2RouteConfigURL].Receive(); err != nil { + if _, err := v2Client.addWatches[RouteConfigResource].Receive(); err != nil { t.Fatalf("want new watch to start, got error %v", err) } @@ -181,13 +181,13 @@ func (s) TestServiceWatchSecond(t *testing.T) { wantUpdate := ServiceUpdate{Routes: []*Route{{Prefix: newStringP(""), Action: map[string]uint32{testCDSName: 1}}}} - if _, err := v2Client.addWatches[version.V2ListenerURL].Receive(); err != nil { + if _, err := v2Client.addWatches[ListenerResource].Receive(); err != nil { t.Fatalf("want new watch to start, got error %v", err) } v2Client.r.NewListeners(map[string]ListenerUpdate{ testLDSName: {RouteConfigName: testRDSName}, }) - if _, err := v2Client.addWatches[version.V2RouteConfigURL].Receive(); err != nil { + if _, err := v2Client.addWatches[RouteConfigResource].Receive(); err != nil { t.Fatalf("want new watch to start, got error %v", err) } v2Client.r.NewRouteConfigs(map[string]RouteConfigUpdate{ @@ -253,7 +253,7 @@ func (s) TestServiceWatchWithNoResponseFromServer(t *testing.T) { c.WatchService(testLDSName, func(update ServiceUpdate, err error) { serviceUpdateCh.Send(serviceUpdateErr{u: update, err: err}) }) - if _, err := v2Client.addWatches[version.V2ListenerURL].Receive(); err != nil { + if _, err := v2Client.addWatches[ListenerResource].Receive(); err != nil { t.Fatalf("want new watch to start, got error %v", err) } u, err := serviceUpdateCh.TimedReceive(defaultTestWatchExpiryTimeout * 2) @@ -288,13 +288,13 @@ func (s) TestServiceWatchEmptyRDS(t *testing.T) { serviceUpdateCh.Send(serviceUpdateErr{u: update, err: err}) }) - if _, err := v2Client.addWatches[version.V2ListenerURL].Receive(); err != nil { + if _, err := v2Client.addWatches[ListenerResource].Receive(); err != nil { t.Fatalf("want new watch to start, got error %v", err) } v2Client.r.NewListeners(map[string]ListenerUpdate{ testLDSName: {RouteConfigName: testRDSName}, }) - if _, err := v2Client.addWatches[version.V2RouteConfigURL].Receive(); err != nil { + if _, err := v2Client.addWatches[RouteConfigResource].Receive(); err != nil { t.Fatalf("want new watch to start, got error %v", err) } v2Client.r.NewRouteConfigs(map[string]RouteConfigUpdate{}) @@ -331,13 +331,13 @@ func (s) TestServiceWatchWithClientClose(t *testing.T) { serviceUpdateCh.Send(serviceUpdateErr{u: update, err: err}) }) - if _, err := v2Client.addWatches[version.V2ListenerURL].Receive(); err != nil { + if _, err := v2Client.addWatches[ListenerResource].Receive(); err != nil { t.Fatalf("want new watch to start, got error %v", err) } v2Client.r.NewListeners(map[string]ListenerUpdate{ testLDSName: {RouteConfigName: testRDSName}, }) - if _, err := v2Client.addWatches[version.V2RouteConfigURL].Receive(); err != nil { + if _, err := v2Client.addWatches[RouteConfigResource].Receive(); err != nil { t.Fatalf("want new watch to start, got error %v", err) } // Client is closed before it receives the RDS response. @@ -369,13 +369,13 @@ func (s) TestServiceNotCancelRDSOnSameLDSUpdate(t *testing.T) { wantUpdate := ServiceUpdate{Routes: []*Route{{Prefix: newStringP(""), Action: map[string]uint32{testCDSName: 1}}}} - if _, err := v2Client.addWatches[version.V2ListenerURL].Receive(); err != nil { + if _, err := v2Client.addWatches[ListenerResource].Receive(); err != nil { t.Fatalf("want new watch to start, got error %v", err) } v2Client.r.NewListeners(map[string]ListenerUpdate{ testLDSName: {RouteConfigName: testRDSName}, }) - if _, err := v2Client.addWatches[version.V2RouteConfigURL].Receive(); err != nil { + if _, err := v2Client.addWatches[RouteConfigResource].Receive(); err != nil { t.Fatalf("want new watch to start, got error %v", err) } v2Client.r.NewRouteConfigs(map[string]RouteConfigUpdate{ @@ -390,7 +390,7 @@ func (s) TestServiceNotCancelRDSOnSameLDSUpdate(t *testing.T) { v2Client.r.NewListeners(map[string]ListenerUpdate{ testLDSName: {RouteConfigName: testRDSName}, }) - if v, err := v2Client.removeWatches[version.V2RouteConfigURL].Receive(); err == nil { + if v, err := v2Client.removeWatches[RouteConfigResource].Receive(); err == nil { t.Fatalf("unexpected rds watch cancel: %v", v) } } @@ -420,13 +420,13 @@ func (s) TestServiceResourceRemoved(t *testing.T) { wantUpdate := ServiceUpdate{Routes: []*Route{{Prefix: newStringP(""), Action: map[string]uint32{testCDSName: 1}}}} - if _, err := v2Client.addWatches[version.V2ListenerURL].Receive(); err != nil { + if _, err := v2Client.addWatches[ListenerResource].Receive(); err != nil { t.Fatalf("want new watch to start, got error %v", err) } v2Client.r.NewListeners(map[string]ListenerUpdate{ testLDSName: {RouteConfigName: testRDSName}, }) - if _, err := v2Client.addWatches[version.V2RouteConfigURL].Receive(); err != nil { + if _, err := v2Client.addWatches[RouteConfigResource].Receive(); err != nil { t.Fatalf("want new watch to start, got error %v", err) } v2Client.r.NewRouteConfigs(map[string]RouteConfigUpdate{ @@ -440,7 +440,7 @@ func (s) TestServiceResourceRemoved(t *testing.T) { // Remove LDS resource, should cancel the RDS watch, and trigger resource // removed error. v2Client.r.NewListeners(map[string]ListenerUpdate{}) - if _, err := v2Client.removeWatches[version.V2RouteConfigURL].Receive(); err != nil { + if _, err := v2Client.removeWatches[RouteConfigResource].Receive(); err != nil { t.Fatalf("want watch to be canceled, got error %v", err) } if u, err := serviceUpdateCh.Receive(); err != nil || ErrType(u.(serviceUpdateErr).err) != ErrorTypeResourceNotFound { @@ -462,7 +462,7 @@ func (s) TestServiceResourceRemoved(t *testing.T) { v2Client.r.NewListeners(map[string]ListenerUpdate{ testLDSName: {RouteConfigName: testRDSName}, }) - if _, err := v2Client.addWatches[version.V2RouteConfigURL].Receive(); err != nil { + if _, err := v2Client.addWatches[RouteConfigResource].Receive(); err != nil { t.Fatalf("want new watch to start, got error %v", err) } if u, err := serviceUpdateCh.Receive(); err != testutils.ErrRecvTimeout { diff --git a/xds/internal/client/client_xds.go b/xds/internal/client/client_xds.go index 4a1a16df3add..f5f60a4f9460 100644 --- a/xds/internal/client/client_xds.go +++ b/xds/internal/client/client_xds.go @@ -33,9 +33,9 @@ import ( v3typepb "github.com/envoyproxy/go-control-plane/envoy/type/v3" "github.com/golang/protobuf/proto" anypb "github.com/golang/protobuf/ptypes/any" + "google.golang.org/grpc/internal/grpclog" "google.golang.org/grpc/xds/internal" - "google.golang.org/grpc/xds/internal/version" ) // UnmarshalListener processes resources received in an LDS response, validates @@ -44,8 +44,8 @@ import ( func UnmarshalListener(resources []*anypb.Any, logger *grpclog.PrefixLogger) (map[string]ListenerUpdate, error) { update := make(map[string]ListenerUpdate) for _, r := range resources { - if t := r.GetTypeUrl(); t != version.V2ListenerURL && t != version.V3ListenerURL { - return nil, fmt.Errorf("xds: unexpected resource type: %s in LDS response", t) + if !IsListenerResource(r.GetTypeUrl()) { + return nil, fmt.Errorf("xds: unexpected resource type: %s in LDS response", r.GetTypeUrl()) } lis := &v3listenerpb.Listener{} if err := proto.Unmarshal(r.GetValue(), lis); err != nil { @@ -68,8 +68,8 @@ func getRouteConfigNameFromListener(lis *v3listenerpb.Listener, logger *grpclog. return "", fmt.Errorf("xds: no api_listener field in LDS response %+v", lis) } apiLisAny := lis.GetApiListener().GetApiListener() - if t := apiLisAny.GetTypeUrl(); t != version.V3HTTPConnManagerURL && t != version.V2HTTPConnManagerURL { - return "", fmt.Errorf("xds: unexpected resource type: %s in LDS response", t) + if !IsHTTPConnManagerResource(apiLisAny.GetTypeUrl()) { + return "", fmt.Errorf("xds: unexpected resource type: %s in LDS response", apiLisAny.GetTypeUrl()) } apiLis := &v3httppb.HttpConnectionManager{} if err := proto.Unmarshal(apiLisAny.GetValue(), apiLis); err != nil { @@ -105,8 +105,8 @@ func getRouteConfigNameFromListener(lis *v3listenerpb.Listener, logger *grpclog. func UnmarshalRouteConfig(resources []*anypb.Any, hostname string, logger *grpclog.PrefixLogger) (map[string]RouteConfigUpdate, error) { update := make(map[string]RouteConfigUpdate) for _, r := range resources { - if t := r.GetTypeUrl(); t != version.V2RouteConfigURL && t != version.V3RouteConfigURL { - return nil, fmt.Errorf("xds: unexpected resource type: %s in RDS response", t) + if !IsRouteConfigResource(r.GetTypeUrl()) { + return nil, fmt.Errorf("xds: unexpected resource type: %s in RDS response", r.GetTypeUrl()) } rc := &v3routepb.RouteConfiguration{} if err := proto.Unmarshal(r.GetValue(), rc); err != nil { @@ -369,8 +369,8 @@ func findBestMatchingVirtualHost(host string, vHosts []*v3routepb.VirtualHost) * func UnmarshalCluster(resources []*anypb.Any, logger *grpclog.PrefixLogger) (map[string]ClusterUpdate, error) { update := make(map[string]ClusterUpdate) for _, r := range resources { - if t := r.GetTypeUrl(); t != version.V2ClusterURL && t != version.V3ClusterURL { - return nil, fmt.Errorf("xds: unexpected resource type: %s in CDS response", t) + if !IsClusterResource(r.GetTypeUrl()) { + return nil, fmt.Errorf("xds: unexpected resource type: %s in CDS response", r.GetTypeUrl()) } cluster := &v3clusterpb.Cluster{} @@ -417,8 +417,8 @@ func validateCluster(cluster *v3clusterpb.Cluster) (ClusterUpdate, error) { func UnmarshalEndpoints(resources []*anypb.Any, logger *grpclog.PrefixLogger) (map[string]EndpointsUpdate, error) { update := make(map[string]EndpointsUpdate) for _, r := range resources { - if t := r.GetTypeUrl(); t != version.V2EndpointsURL && t != version.V3EndpointsURL { - return nil, fmt.Errorf("xds: unexpected resource type: %s in EDS response", t) + if !IsEndpointsResource(r.GetTypeUrl()) { + return nil, fmt.Errorf("xds: unexpected resource type: %s in EDS response", r.GetTypeUrl()) } cla := &v3endpointpb.ClusterLoadAssignment{} diff --git a/xds/internal/client/transport_helper.go b/xds/internal/client/transport_helper.go index 6627fbffb9d5..976003605041 100644 --- a/xds/internal/client/transport_helper.go +++ b/xds/internal/client/transport_helper.go @@ -48,11 +48,9 @@ type VersionedClient interface { // transport protocol version. NewStream(ctx context.Context) (grpc.ClientStream, error) - // params: resources, typeURL, version, nonce - // SendRequest constructs and sends out a DiscoveryRequest message specific // to the underlying transport protocol version. - SendRequest(s grpc.ClientStream, resourceNames []string, typeURL string, version string, nonce string) error + SendRequest(s grpc.ClientStream, resourceNames []string, rType ResourceType, version string, nonce string) error // RecvResponse uses the provided stream to receive a response specific to // the underlying transport protocol version. @@ -61,11 +59,11 @@ type VersionedClient interface { // HandleResponse parses and validates the received response and notifies // the top-level client which in turn notifies the registered watchers. // - // Return values are: typeURL, version, nonce, error. + // Return values are: resourceType, version, nonce, error. // If the provided protobuf message contains a resource type which is not // supported, implementations must return an error of type // ErrResourceTypeUnsupported. - HandleResponse(proto.Message) (string, string, string, error) + HandleResponse(proto.Message) (ResourceType, string, string, error) } // TransportHelper contains all xDS transport protocol related functionality @@ -95,14 +93,14 @@ type TransportHelper struct { // messages. When the user of this client object cancels a watch call, // these are set to nil. All accesses to the map protected and any value // inside the map should be protected with the above mutex. - watchMap map[string]map[string]bool + watchMap map[ResourceType]map[string]bool // versionMap contains the version that was acked (the version in the ack - // request that was sent on wire). The key is typeURL, the value is the + // request that was sent on wire). The key is rType, the value is the // version string, becaues the versions for different resource types should // be independent. - versionMap map[string]string + versionMap map[ResourceType]string // nonceMap contains the nonce from the most recent received response. - nonceMap map[string]string + nonceMap map[ResourceType]string } // NewTransportHelper creates a new transport helper to be used by versioned @@ -117,9 +115,9 @@ func NewTransportHelper(vc VersionedClient, logger *grpclog.PrefixLogger, backof streamCh: make(chan grpc.ClientStream, 1), sendCh: buffer.NewUnbounded(), - watchMap: make(map[string]map[string]bool), - versionMap: make(map[string]string), - nonceMap: make(map[string]string), + watchMap: make(map[ResourceType]map[string]bool), + versionMap: make(map[ResourceType]string), + nonceMap: make(map[ResourceType]string), } go t.run(ctx) @@ -127,9 +125,9 @@ func NewTransportHelper(vc VersionedClient, logger *grpclog.PrefixLogger, backof } // AddWatch adds a watch for an xDS resource given its type and name. -func (t *TransportHelper) AddWatch(resourceType, resourceName string) { +func (t *TransportHelper) AddWatch(rType ResourceType, resourceName string) { t.sendCh.Put(&watchAction{ - typeURL: resourceType, + rType: rType, remove: false, resource: resourceName, }) @@ -137,9 +135,9 @@ func (t *TransportHelper) AddWatch(resourceType, resourceName string) { // RemoveWatch cancels an already registered watch for an xDS resource // given its type and name. -func (t *TransportHelper) RemoveWatch(resourceType, resourceName string) { +func (t *TransportHelper) RemoveWatch(rType ResourceType, resourceName string) { t.sendCh.Put(&watchAction{ - typeURL: resourceType, + rType: rType, remove: true, resource: resourceName, }) @@ -228,15 +226,16 @@ func (t *TransportHelper) send(ctx context.Context) { t.sendCh.Load() var ( - target []string - typeURL, version, nonce string - send bool + target []string + rType ResourceType + version, nonce string + send bool ) switch update := u.(type) { case *watchAction: - target, typeURL, version, nonce = t.processWatchInfo(update) + target, rType, version, nonce = t.processWatchInfo(update) case *ackAction: - target, typeURL, version, nonce, send = t.processAckInfo(update, stream) + target, rType, version, nonce, send = t.processAckInfo(update, stream) if !send { continue } @@ -248,8 +247,8 @@ func (t *TransportHelper) send(ctx context.Context) { // sending response back). continue } - if err := t.vClient.SendRequest(stream, target, typeURL, version, nonce); err != nil { - t.logger.Warningf("ADS request for {target: %q, type: %q, version: %q, nonce: %q} failed: %v", target, typeURL, version, nonce, err) + if err := t.vClient.SendRequest(stream, target, rType, version, nonce); err != nil { + t.logger.Warningf("ADS request for {target: %q, type: %v, version: %q, nonce: %q} failed: %v", target, rType, version, nonce, err) // send failed, clear the current stream. stream = nil } @@ -269,11 +268,11 @@ func (t *TransportHelper) sendExisting(stream grpc.ClientStream) bool { defer t.mu.Unlock() // Reset the ack versions when the stream restarts. - t.versionMap = make(map[string]string) - t.nonceMap = make(map[string]string) + t.versionMap = make(map[ResourceType]string) + t.nonceMap = make(map[ResourceType]string) - for typeURL, s := range t.watchMap { - if err := t.vClient.SendRequest(stream, mapToSlice(s), typeURL, "", ""); err != nil { + for rType, s := range t.watchMap { + if err := t.vClient.SendRequest(stream, mapToSlice(s), rType, "", ""); err != nil { t.logger.Errorf("ADS request failed: %v", err) return false } @@ -292,28 +291,28 @@ func (t *TransportHelper) recv(stream grpc.ClientStream) bool { t.logger.Warningf("ADS stream is closed with error: %v", err) return success } - typeURL, version, nonce, err := t.vClient.HandleResponse(resp) + rType, version, nonce, err := t.vClient.HandleResponse(resp) if e, ok := err.(ErrResourceTypeUnsupported); ok { t.logger.Warningf("%s", e.ErrStr) continue } if err != nil { t.sendCh.Put(&ackAction{ - typeURL: typeURL, + rType: rType, version: "", nonce: nonce, stream: stream, }) - t.logger.Warningf("Sending NACK for response type: %v, version: %v, nonce: %v, reason: %v", typeURL, version, nonce, err) + t.logger.Warningf("Sending NACK for response type: %v, version: %v, nonce: %v, reason: %v", rType, version, nonce, err) continue } t.sendCh.Put(&ackAction{ - typeURL: typeURL, + rType: rType, version: version, nonce: nonce, stream: stream, }) - t.logger.Infof("Sending ACK for response type: %v, version: %v, nonce: %v", typeURL, version, nonce) + t.logger.Infof("Sending ACK for response type: %v, version: %v, nonce: %v", rType, version, nonce) success = true } } @@ -326,46 +325,46 @@ func mapToSlice(m map[string]bool) (ret []string) { } type watchAction struct { - typeURL string + rType ResourceType remove bool // Whether this is to remove watch for the resource. resource string } // processWatchInfo pulls the fields needed by the request from a watchAction. // -// It also updates the watch map in v2c. -func (t *TransportHelper) processWatchInfo(w *watchAction) (target []string, typeURL, ver, nonce string) { +// It also updates the watch map. +func (t *TransportHelper) processWatchInfo(w *watchAction) (target []string, rType ResourceType, ver, nonce string) { t.mu.Lock() defer t.mu.Unlock() var current map[string]bool - current, ok := t.watchMap[w.typeURL] + current, ok := t.watchMap[w.rType] if !ok { current = make(map[string]bool) - t.watchMap[w.typeURL] = current + t.watchMap[w.rType] = current } if w.remove { delete(current, w.resource) if len(current) == 0 { - delete(t.watchMap, w.typeURL) + delete(t.watchMap, w.rType) } } else { current[w.resource] = true } - typeURL = w.typeURL + rType = w.rType target = mapToSlice(current) // We don't reset version or nonce when a new watch is started. The version // and nonce from previous response are carried by the request unless the // stream is recreated. - ver = t.versionMap[typeURL] - nonce = t.nonceMap[typeURL] - return target, typeURL, ver, nonce + ver = t.versionMap[rType] + nonce = t.nonceMap[rType] + return target, rType, ver, nonce } type ackAction struct { - typeURL string + rType ResourceType version string // NACK if version is an empty string. nonce string // ACK/NACK are tagged with the stream it's for. When the stream is down, @@ -377,15 +376,15 @@ type ackAction struct { // processAckInfo pulls the fields needed by the ack request from a ackAction. // // If no active watch is found for this ack, it returns false for send. -func (t *TransportHelper) processAckInfo(ack *ackAction, stream grpc.ClientStream) (target []string, typeURL, version, nonce string, send bool) { +func (t *TransportHelper) processAckInfo(ack *ackAction, stream grpc.ClientStream) (target []string, rType ResourceType, version, nonce string, send bool) { if ack.stream != stream { // If ACK's stream isn't the current sending stream, this means the ACK // was pushed to queue before the old stream broke, and a new stream has // been started since. Return immediately here so we don't update the // nonce for the new stream. - return nil, "", "", "", false + return nil, UnknownResource, "", "", false } - typeURL = ack.typeURL + rType = ack.rType t.mu.Lock() defer t.mu.Unlock() @@ -394,16 +393,16 @@ func (t *TransportHelper) processAckInfo(ack *ackAction, stream grpc.ClientStrea // wire. We may not send the request if the watch is canceled. But the nonce // needs to be updated so the next request will have the right nonce. nonce = ack.nonce - t.nonceMap[typeURL] = nonce + t.nonceMap[rType] = nonce - s, ok := t.watchMap[typeURL] + s, ok := t.watchMap[rType] if !ok || len(s) == 0 { // We don't send the request ack if there's no active watch (this can be // either the server sends responses before any request, or the watch is // canceled while the ackAction is in queue), because there's no resource // name. And if we send a request with empty resource name list, the // server may treat it as a wild card and send us everything. - return nil, "", "", "", false + return nil, UnknownResource, "", "", false } send = true target = mapToSlice(s) @@ -411,12 +410,12 @@ func (t *TransportHelper) processAckInfo(ack *ackAction, stream grpc.ClientStrea version = ack.version if version == "" { // This is a nack, get the previous acked version. - version = t.versionMap[typeURL] - // version will still be an empty string if typeURL isn't + version = t.versionMap[rType] + // version will still be an empty string if rType isn't // found in versionMap, this can happen if there wasn't any ack // before. } else { - t.versionMap[typeURL] = version + t.versionMap[rType] = version } - return target, typeURL, version, nonce, send + return target, rType, version, nonce, send } diff --git a/xds/internal/client/v2/client.go b/xds/internal/client/v2/client.go index 278dc4381d2c..96bd5e9b5686 100644 --- a/xds/internal/client/v2/client.go +++ b/xds/internal/client/v2/client.go @@ -39,6 +39,15 @@ func init() { xdsclient.RegisterAPIClientBuilder(clientBuilder{}) } +var ( + resourceTypeToURL = map[xdsclient.ResourceType]string{ + xdsclient.ListenerResource: version.V2ListenerURL, + xdsclient.RouteConfigResource: version.V2RouteConfigURL, + xdsclient.ClusterResource: version.V2ClusterURL, + xdsclient.EndpointsResource: version.V2EndpointsURL, + } +) + type clientBuilder struct{} func (clientBuilder) Build(cc *grpc.ClientConn, opts xdsclient.BuildOptions) (xdsclient.APIClient, error) { @@ -95,30 +104,30 @@ type client struct { // AddWatch overrides the transport helper's AddWatch to save the LDS // resource_name. This is required when handling an RDS response to perform host // matching. -func (v2c *client) AddWatch(resourceType, resourceName string) { +func (v2c *client) AddWatch(rType xdsclient.ResourceType, rName string) { v2c.mu.Lock() // Special handling for LDS, because RDS needs the LDS resource_name for // response host matching. - if resourceType == version.V2ListenerURL || resourceType == version.V3ListenerURL { + if rType == xdsclient.ListenerResource { // Set hostname to the first LDS resource_name, and reset it when the // last LDS watch is removed. The upper level Client isn't expected to // watchLDS more than once. v2c.ldsWatchCount++ if v2c.ldsWatchCount == 1 { - v2c.ldsResourceName = resourceName + v2c.ldsResourceName = rName } } v2c.mu.Unlock() - v2c.TransportHelper.AddWatch(resourceType, resourceName) + v2c.TransportHelper.AddWatch(rType, rName) } // RemoveWatch overrides the transport helper's RemoveWatch to clear the LDS // resource_name when the last watch is removed. -func (v2c *client) RemoveWatch(resourceType, resourceName string) { +func (v2c *client) RemoveWatch(rType xdsclient.ResourceType, rName string) { v2c.mu.Lock() // Special handling for LDS, because RDS needs the LDS resource_name for // response host matching. - if resourceType == version.V2ListenerURL || resourceType == version.V3ListenerURL { + if rType == xdsclient.ListenerResource { // Set hostname to the first LDS resource_name, and reset it when the // last LDS watch is removed. The upper level Client isn't expected to // watchLDS more than once. @@ -128,30 +137,29 @@ func (v2c *client) RemoveWatch(resourceType, resourceName string) { } } v2c.mu.Unlock() - v2c.TransportHelper.RemoveWatch(resourceType, resourceName) + v2c.TransportHelper.RemoveWatch(rType, rName) } func (v2c *client) NewStream(ctx context.Context) (grpc.ClientStream, error) { return v2adsgrpc.NewAggregatedDiscoveryServiceClient(v2c.cc).StreamAggregatedResources(v2c.ctx, grpc.WaitForReady(true)) } -// sendRequest sends a request for provided typeURL and resource on the provided -// stream. +// sendRequest sends out a DiscoveryRequest for the given resourceNames, of type +// rType, on the provided stream. // // version is the ack version to be sent with the request -// - If this is the new request (not an ack/nack), version will be an empty -// string -// - If this is an ack, version will be the version from the response +// - If this is the new request (not an ack/nack), version will be empty. +// - If this is an ack, version will be the version from the response. // - If this is a nack, version will be the previous acked version (from -// versionMap). If there was no ack before, it will be an empty string -func (v2c *client) SendRequest(s grpc.ClientStream, resourceNames []string, typeURL, version, nonce string) error { +// versionMap). If there was no ack before, it will be empty. +func (v2c *client) SendRequest(s grpc.ClientStream, resourceNames []string, rType xdsclient.ResourceType, version, nonce string) error { stream, ok := s.(adsStream) if !ok { return fmt.Errorf("xds: Attempt to send request on unsupported stream type: %T", s) } req := &v2xdspb.DiscoveryRequest{ Node: v2c.nodeProto, - TypeUrl: typeURL, + TypeUrl: resourceTypeToURL[rType], ResourceNames: resourceNames, VersionInfo: version, ResponseNonce: nonce, @@ -182,10 +190,11 @@ func (v2c *client) RecvResponse(s grpc.ClientStream) (proto.Message, error) { return resp, nil } -func (v2c *client) HandleResponse(r proto.Message) (string, string, string, error) { +func (v2c *client) HandleResponse(r proto.Message) (xdsclient.ResourceType, string, string, error) { + rType := xdsclient.UnknownResource resp, ok := r.(*v2xdspb.DiscoveryResponse) if !ok { - return "", "", "", fmt.Errorf("xds: unsupported message type: %T", resp) + return rType, "", "", fmt.Errorf("xds: unsupported message type: %T", resp) } // Note that the xDS transport protocol is versioned independently of @@ -193,21 +202,26 @@ func (v2c *client) HandleResponse(r proto.Message) (string, string, string, erro // of resource types using new versions of the transport protocol, or // vice-versa. Hence we need to handle v3 type_urls as well here. var err error - switch resp.GetTypeUrl() { - case version.V2ListenerURL, version.V3ListenerURL: + url := resp.GetTypeUrl() + switch { + case xdsclient.IsListenerResource(url): err = v2c.handleLDSResponse(resp) - case version.V2RouteConfigURL, version.V3RouteConfigURL: + rType = xdsclient.ListenerResource + case xdsclient.IsRouteConfigResource(url): err = v2c.handleRDSResponse(resp) - case version.V2ClusterURL, version.V3ClusterURL: + rType = xdsclient.RouteConfigResource + case xdsclient.IsClusterResource(url): err = v2c.handleCDSResponse(resp) - case version.V2EndpointsURL, version.V3EndpointsURL: + rType = xdsclient.ClusterResource + case xdsclient.IsEndpointsResource(url): err = v2c.handleEDSResponse(resp) + rType = xdsclient.EndpointsResource default: - return "", "", "", xdsclient.ErrResourceTypeUnsupported{ + return rType, "", "", xdsclient.ErrResourceTypeUnsupported{ ErrStr: fmt.Sprintf("Resource type %v unknown in response from server", resp.GetTypeUrl()), } } - return resp.GetTypeUrl(), resp.GetVersionInfo(), resp.GetNonce(), err + return rType, resp.GetVersionInfo(), resp.GetNonce(), err } // handleLDSResponse processes an LDS response received from the xDS server. On diff --git a/xds/internal/client/v2/client_ack_test.go b/xds/internal/client/v2/client_ack_test.go index d8d1ad834968..40f5668297e2 100644 --- a/xds/internal/client/v2/client_ack_test.go +++ b/xds/internal/client/v2/client_ack_test.go @@ -28,6 +28,7 @@ import ( anypb "github.com/golang/protobuf/ptypes/any" "github.com/google/go-cmp/cmp" "google.golang.org/grpc" + xdsclient "google.golang.org/grpc/xds/internal/client" "google.golang.org/grpc/xds/internal/testutils" "google.golang.org/grpc/xds/internal/testutils/fakeserver" "google.golang.org/grpc/xds/internal/version" @@ -39,22 +40,22 @@ func startXDSV2Client(t *testing.T, cc *grpc.ClientConn) (v2c *client, cbLDS, cb cbCDS = testutils.NewChannel() cbEDS = testutils.NewChannel() v2c, err := newV2Client(&testUpdateReceiver{ - f: func(typeURL string, d map[string]interface{}) { - t.Logf("Received %s callback with {%+v}", typeURL, d) - switch typeURL { - case version.V2ListenerURL: + f: func(rType xdsclient.ResourceType, d map[string]interface{}) { + t.Logf("Received %v callback with {%+v}", rType, d) + switch rType { + case xdsclient.ListenerResource: if _, ok := d[goodLDSTarget1]; ok { cbLDS.Send(struct{}{}) } - case version.V2RouteConfigURL: + case xdsclient.RouteConfigResource: if _, ok := d[goodRouteName1]; ok { cbRDS.Send(struct{}{}) } - case version.V2ClusterURL: + case xdsclient.ClusterResource: if _, ok := d[goodClusterName1]; ok { cbCDS.Send(struct{}{}) } - case version.V2EndpointsURL: + case xdsclient.EndpointsResource: if _, ok := d[goodEDSName]; ok { cbEDS.Send(struct{}{}) } @@ -98,30 +99,24 @@ func sendXDSRespWithVersion(ch chan<- *fakeserver.Response, respWithoutVersion * // startXDS calls watch to send the first request. It then sends a good response // and checks for ack. -func startXDS(t *testing.T, xdsname string, v2c *client, reqChan *testutils.Channel, req *xdspb.DiscoveryRequest, preVersion string, preNonce string) { - var ( - nameToWatch, typeURLToWatch string - ) - switch xdsname { - case "LDS": - typeURLToWatch = version.V2ListenerURL +func startXDS(t *testing.T, rType xdsclient.ResourceType, v2c *client, reqChan *testutils.Channel, req *xdspb.DiscoveryRequest, preVersion string, preNonce string) { + nameToWatch := "" + switch rType { + case xdsclient.ListenerResource: nameToWatch = goodLDSTarget1 - case "RDS": - typeURLToWatch = version.V2RouteConfigURL + case xdsclient.RouteConfigResource: nameToWatch = goodRouteName1 - case "CDS": - typeURLToWatch = version.V2ClusterURL + case xdsclient.ClusterResource: nameToWatch = goodClusterName1 - case "EDS": - typeURLToWatch = version.V2EndpointsURL + case xdsclient.EndpointsResource: nameToWatch = goodEDSName } - v2c.AddWatch(typeURLToWatch, nameToWatch) + v2c.AddWatch(rType, nameToWatch) if err := compareXDSRequest(reqChan, req, preVersion, preNonce); err != nil { - t.Fatalf("Failed to receive %s request: %v", xdsname, err) + t.Fatalf("Failed to receive %v request: %v", rType, err) } - t.Logf("FakeServer received %s request...", xdsname) + t.Logf("FakeServer received %v request...", rType) } // sendGoodResp sends the good response, with the given version, and a random @@ -129,19 +124,19 @@ func startXDS(t *testing.T, xdsname string, v2c *client, reqChan *testutils.Chan // // It also waits and checks that the ack request contains the given version, and // the generated nonce. -func sendGoodResp(t *testing.T, xdsname string, fakeServer *fakeserver.Server, ver int, goodResp *xdspb.DiscoveryResponse, wantReq *xdspb.DiscoveryRequest, callbackCh *testutils.Channel) (string, error) { +func sendGoodResp(t *testing.T, rType xdsclient.ResourceType, fakeServer *fakeserver.Server, ver int, goodResp *xdspb.DiscoveryResponse, wantReq *xdspb.DiscoveryRequest, callbackCh *testutils.Channel) (string, error) { nonce := sendXDSRespWithVersion(fakeServer.XDSResponseChan, goodResp, ver) - t.Logf("Good %s response pushed to fakeServer...", xdsname) + t.Logf("Good %v response pushed to fakeServer...", rType) if err := compareXDSRequest(fakeServer.XDSRequestChan, wantReq, strconv.Itoa(ver), nonce); err != nil { - return "", fmt.Errorf("failed to receive %s request: %v", xdsname, err) + return "", fmt.Errorf("failed to receive %v request: %v", rType, err) } - t.Logf("Good %s response acked", xdsname) + t.Logf("Good %v response acked", rType) if _, err := callbackCh.Receive(); err != nil { - return "", fmt.Errorf("timeout when expecting %s update", xdsname) + return "", fmt.Errorf("timeout when expecting %v update", rType) } - t.Logf("Good %s response callback executed", xdsname) + t.Logf("Good %v response callback executed", rType) return nonce, nil } @@ -149,27 +144,27 @@ func sendGoodResp(t *testing.T, xdsname string, fakeServer *fakeserver.Server, v // be nacked, so we expect a request with the previous version (version-1). // // But the nonce in request should be the new nonce. -func sendBadResp(t *testing.T, xdsname string, fakeServer *fakeserver.Server, ver int, wantReq *xdspb.DiscoveryRequest) error { +func sendBadResp(t *testing.T, rType xdsclient.ResourceType, fakeServer *fakeserver.Server, ver int, wantReq *xdspb.DiscoveryRequest) error { var typeURL string - switch xdsname { - case "LDS": + switch rType { + case xdsclient.ListenerResource: typeURL = version.V2ListenerURL - case "RDS": + case xdsclient.RouteConfigResource: typeURL = version.V2RouteConfigURL - case "CDS": + case xdsclient.ClusterResource: typeURL = version.V2ClusterURL - case "EDS": + case xdsclient.EndpointsResource: typeURL = version.V2EndpointsURL } nonce := sendXDSRespWithVersion(fakeServer.XDSResponseChan, &xdspb.DiscoveryResponse{ Resources: []*anypb.Any{{}}, TypeUrl: typeURL, }, ver) - t.Logf("Bad %s response pushed to fakeServer...", xdsname) + t.Logf("Bad %v response pushed to fakeServer...", rType) if err := compareXDSRequest(fakeServer.XDSRequestChan, wantReq, strconv.Itoa(ver-1), nonce); err != nil { - return fmt.Errorf("failed to receive %s request: %v", xdsname, err) + return fmt.Errorf("failed to receive %v request: %v", rType, err) } - t.Logf("Bad %s response nacked", xdsname) + t.Logf("Bad %v response nacked", rType) return nil } @@ -192,59 +187,59 @@ func (s) TestV2ClientAck(t *testing.T) { defer v2cCleanup() // Start the watch, send a good response, and check for ack. - startXDS(t, "LDS", v2c, fakeServer.XDSRequestChan, goodLDSRequest, "", "") - if _, err := sendGoodResp(t, "LDS", fakeServer, versionLDS, goodLDSResponse1, goodLDSRequest, cbLDS); err != nil { + startXDS(t, xdsclient.ListenerResource, v2c, fakeServer.XDSRequestChan, goodLDSRequest, "", "") + if _, err := sendGoodResp(t, xdsclient.ListenerResource, fakeServer, versionLDS, goodLDSResponse1, goodLDSRequest, cbLDS); err != nil { t.Fatal(err) } versionLDS++ - startXDS(t, "RDS", v2c, fakeServer.XDSRequestChan, goodRDSRequest, "", "") - if _, err := sendGoodResp(t, "RDS", fakeServer, versionRDS, goodRDSResponse1, goodRDSRequest, cbRDS); err != nil { + startXDS(t, xdsclient.RouteConfigResource, v2c, fakeServer.XDSRequestChan, goodRDSRequest, "", "") + if _, err := sendGoodResp(t, xdsclient.RouteConfigResource, fakeServer, versionRDS, goodRDSResponse1, goodRDSRequest, cbRDS); err != nil { t.Fatal(err) } versionRDS++ - startXDS(t, "CDS", v2c, fakeServer.XDSRequestChan, goodCDSRequest, "", "") - if _, err := sendGoodResp(t, "CDS", fakeServer, versionCDS, goodCDSResponse1, goodCDSRequest, cbCDS); err != nil { + startXDS(t, xdsclient.ClusterResource, v2c, fakeServer.XDSRequestChan, goodCDSRequest, "", "") + if _, err := sendGoodResp(t, xdsclient.ClusterResource, fakeServer, versionCDS, goodCDSResponse1, goodCDSRequest, cbCDS); err != nil { t.Fatal(err) } versionCDS++ - startXDS(t, "EDS", v2c, fakeServer.XDSRequestChan, goodEDSRequest, "", "") - if _, err := sendGoodResp(t, "EDS", fakeServer, versionEDS, goodEDSResponse1, goodEDSRequest, cbEDS); err != nil { + startXDS(t, xdsclient.EndpointsResource, v2c, fakeServer.XDSRequestChan, goodEDSRequest, "", "") + if _, err := sendGoodResp(t, xdsclient.EndpointsResource, fakeServer, versionEDS, goodEDSResponse1, goodEDSRequest, cbEDS); err != nil { t.Fatal(err) } versionEDS++ // Send a bad response, and check for nack. - if err := sendBadResp(t, "LDS", fakeServer, versionLDS, goodLDSRequest); err != nil { + if err := sendBadResp(t, xdsclient.ListenerResource, fakeServer, versionLDS, goodLDSRequest); err != nil { t.Fatal(err) } versionLDS++ - if err := sendBadResp(t, "RDS", fakeServer, versionRDS, goodRDSRequest); err != nil { + if err := sendBadResp(t, xdsclient.RouteConfigResource, fakeServer, versionRDS, goodRDSRequest); err != nil { t.Fatal(err) } versionRDS++ - if err := sendBadResp(t, "CDS", fakeServer, versionCDS, goodCDSRequest); err != nil { + if err := sendBadResp(t, xdsclient.ClusterResource, fakeServer, versionCDS, goodCDSRequest); err != nil { t.Fatal(err) } versionCDS++ - if err := sendBadResp(t, "EDS", fakeServer, versionEDS, goodEDSRequest); err != nil { + if err := sendBadResp(t, xdsclient.EndpointsResource, fakeServer, versionEDS, goodEDSRequest); err != nil { t.Fatal(err) } versionEDS++ // send another good response, and check for ack, with the new version. - if _, err := sendGoodResp(t, "LDS", fakeServer, versionLDS, goodLDSResponse1, goodLDSRequest, cbLDS); err != nil { + if _, err := sendGoodResp(t, xdsclient.ListenerResource, fakeServer, versionLDS, goodLDSResponse1, goodLDSRequest, cbLDS); err != nil { t.Fatal(err) } versionLDS++ - if _, err := sendGoodResp(t, "RDS", fakeServer, versionRDS, goodRDSResponse1, goodRDSRequest, cbRDS); err != nil { + if _, err := sendGoodResp(t, xdsclient.RouteConfigResource, fakeServer, versionRDS, goodRDSResponse1, goodRDSRequest, cbRDS); err != nil { t.Fatal(err) } versionRDS++ - if _, err := sendGoodResp(t, "CDS", fakeServer, versionCDS, goodCDSResponse1, goodCDSRequest, cbCDS); err != nil { + if _, err := sendGoodResp(t, xdsclient.ClusterResource, fakeServer, versionCDS, goodCDSResponse1, goodCDSRequest, cbCDS); err != nil { t.Fatal(err) } versionCDS++ - if _, err := sendGoodResp(t, "EDS", fakeServer, versionEDS, goodEDSResponse1, goodEDSRequest, cbEDS); err != nil { + if _, err := sendGoodResp(t, xdsclient.EndpointsResource, fakeServer, versionEDS, goodEDSResponse1, goodEDSRequest, cbEDS); err != nil { t.Fatal(err) } versionEDS++ @@ -262,7 +257,7 @@ func (s) TestV2ClientAckFirstIsNack(t *testing.T) { defer v2cCleanup() // Start the watch, send a good response, and check for ack. - startXDS(t, "LDS", v2c, fakeServer.XDSRequestChan, goodLDSRequest, "", "") + startXDS(t, xdsclient.ListenerResource, v2c, fakeServer.XDSRequestChan, goodLDSRequest, "", "") nonce := sendXDSRespWithVersion(fakeServer.XDSResponseChan, &xdspb.DiscoveryResponse{ Resources: []*anypb.Any{{}}, @@ -278,7 +273,7 @@ func (s) TestV2ClientAckFirstIsNack(t *testing.T) { t.Logf("Bad response nacked") versionLDS++ - sendGoodResp(t, "LDS", fakeServer, versionLDS, goodLDSResponse1, goodLDSRequest, cbLDS) + sendGoodResp(t, xdsclient.ListenerResource, fakeServer, versionLDS, goodLDSResponse1, goodLDSRequest, cbLDS) versionLDS++ } @@ -294,14 +289,14 @@ func (s) TestV2ClientAckNackAfterNewWatch(t *testing.T) { defer v2cCleanup() // Start the watch, send a good response, and check for ack. - startXDS(t, "LDS", v2c, fakeServer.XDSRequestChan, goodLDSRequest, "", "") - nonce, err := sendGoodResp(t, "LDS", fakeServer, versionLDS, goodLDSResponse1, goodLDSRequest, cbLDS) + startXDS(t, xdsclient.ListenerResource, v2c, fakeServer.XDSRequestChan, goodLDSRequest, "", "") + nonce, err := sendGoodResp(t, xdsclient.ListenerResource, fakeServer, versionLDS, goodLDSResponse1, goodLDSRequest, cbLDS) if err != nil { t.Fatal(err) } // Start a new watch. The version in the new request should be the version // from the previous response, thus versionLDS before ++. - startXDS(t, "LDS", v2c, fakeServer.XDSRequestChan, goodLDSRequest, strconv.Itoa(versionLDS), nonce) + startXDS(t, xdsclient.ListenerResource, v2c, fakeServer.XDSRequestChan, goodLDSRequest, strconv.Itoa(versionLDS), nonce) versionLDS++ // This is an invalid response after the new watch. @@ -318,7 +313,7 @@ func (s) TestV2ClientAckNackAfterNewWatch(t *testing.T) { t.Logf("Bad response nacked") versionLDS++ - if _, err := sendGoodResp(t, "LDS", fakeServer, versionLDS, goodLDSResponse1, goodLDSRequest, cbLDS); err != nil { + if _, err := sendGoodResp(t, xdsclient.ListenerResource, fakeServer, versionLDS, goodLDSResponse1, goodLDSRequest, cbLDS); err != nil { t.Fatal(err) } versionLDS++ @@ -336,42 +331,42 @@ func (s) TestV2ClientAckNewWatchAfterCancel(t *testing.T) { defer v2cCleanup() // Start a CDS watch. - v2c.AddWatch(version.V2ClusterURL, goodClusterName1) + v2c.AddWatch(xdsclient.ClusterResource, goodClusterName1) if err := compareXDSRequest(fakeServer.XDSRequestChan, goodCDSRequest, "", ""); err != nil { t.Fatal(err) } - t.Logf("FakeServer received %s request...", "CDS") + t.Logf("FakeServer received %v request...", xdsclient.ClusterResource) // Send a good CDS response, this function waits for the ACK with the right // version. - nonce, err := sendGoodResp(t, "CDS", fakeServer, versionCDS, goodCDSResponse1, goodCDSRequest, cbCDS) + nonce, err := sendGoodResp(t, xdsclient.ClusterResource, fakeServer, versionCDS, goodCDSResponse1, goodCDSRequest, cbCDS) if err != nil { t.Fatal(err) } // Cancel the CDS watch, and start a new one. The new watch should have the // version from the response above. - v2c.RemoveWatch(version.V2ClusterURL, goodClusterName1) + v2c.RemoveWatch(xdsclient.ClusterResource, goodClusterName1) // Wait for a request with no resource names, because the only watch was // removed. emptyReq := &xdspb.DiscoveryRequest{Node: goodNodeProto, TypeUrl: version.V2ClusterURL} if err := compareXDSRequest(fakeServer.XDSRequestChan, emptyReq, strconv.Itoa(versionCDS), nonce); err != nil { - t.Fatalf("Failed to receive %s request: %v", "CDS", err) + t.Fatalf("Failed to receive %v request: %v", xdsclient.ClusterResource, err) } - v2c.AddWatch(version.V2ClusterURL, goodClusterName1) + v2c.AddWatch(xdsclient.ClusterResource, goodClusterName1) // Wait for a request with correct resource names and version. if err := compareXDSRequest(fakeServer.XDSRequestChan, goodCDSRequest, strconv.Itoa(versionCDS), nonce); err != nil { - t.Fatalf("Failed to receive %s request: %v", "CDS", err) + t.Fatalf("Failed to receive %v request: %v", xdsclient.ClusterResource, err) } versionCDS++ // Send a bad response with the next version. - if err := sendBadResp(t, "CDS", fakeServer, versionCDS, goodCDSRequest); err != nil { + if err := sendBadResp(t, xdsclient.ClusterResource, fakeServer, versionCDS, goodCDSRequest); err != nil { t.Fatal(err) } versionCDS++ // send another good response, and check for ack, with the new version. - if _, err := sendGoodResp(t, "CDS", fakeServer, versionCDS, goodCDSResponse1, goodCDSRequest, cbCDS); err != nil { + if _, err := sendGoodResp(t, xdsclient.ClusterResource, fakeServer, versionCDS, goodCDSResponse1, goodCDSRequest, cbCDS); err != nil { t.Fatal(err) } versionCDS++ @@ -391,25 +386,25 @@ func (s) TestV2ClientAckCancelResponseRace(t *testing.T) { defer v2cCleanup() // Start a CDS watch. - v2c.AddWatch(version.V2ClusterURL, goodClusterName1) + v2c.AddWatch(xdsclient.ClusterResource, goodClusterName1) if err := compareXDSRequest(fakeServer.XDSRequestChan, goodCDSRequest, "", ""); err != nil { - t.Fatalf("Failed to receive %s request: %v", "CDS", err) + t.Fatalf("Failed to receive %v request: %v", xdsclient.ClusterResource, err) } - t.Logf("FakeServer received %s request...", "CDS") + t.Logf("FakeServer received %v request...", xdsclient.ClusterResource) // send a good response, and check for ack, with the new version. - nonce, err := sendGoodResp(t, "CDS", fakeServer, versionCDS, goodCDSResponse1, goodCDSRequest, cbCDS) + nonce, err := sendGoodResp(t, xdsclient.ClusterResource, fakeServer, versionCDS, goodCDSResponse1, goodCDSRequest, cbCDS) if err != nil { t.Fatal(err) } // Cancel the watch before the next response is sent. This mimics the case // watch is canceled while response is on wire. - v2c.RemoveWatch(version.V2ClusterURL, goodClusterName1) + v2c.RemoveWatch(xdsclient.ClusterResource, goodClusterName1) // Wait for a request with no resource names, because the only watch was // removed. emptyReq := &xdspb.DiscoveryRequest{Node: goodNodeProto, TypeUrl: version.V2ClusterURL} if err := compareXDSRequest(fakeServer.XDSRequestChan, emptyReq, strconv.Itoa(versionCDS), nonce); err != nil { - t.Fatalf("Failed to receive %s request: %v", "CDS", err) + t.Fatalf("Failed to receive %v request: %v", xdsclient.ClusterResource, err) } versionCDS++ @@ -419,7 +414,7 @@ func (s) TestV2ClientAckCancelResponseRace(t *testing.T) { // Send a good response. nonce = sendXDSRespWithVersion(fakeServer.XDSResponseChan, goodCDSResponse1, versionCDS) - t.Logf("Good %s response pushed to fakeServer...", "CDS") + t.Logf("Good %v response pushed to fakeServer...", xdsclient.ClusterResource) // Expect no ACK because watch was canceled. if req, err := fakeServer.XDSRequestChan.Receive(); err != testutils.ErrRecvTimeout { @@ -427,24 +422,24 @@ func (s) TestV2ClientAckCancelResponseRace(t *testing.T) { } // Still expected an callback update, because response was good. if _, err := cbCDS.Receive(); err != nil { - t.Fatalf("Timeout when expecting %s update", "CDS") + t.Fatalf("Timeout when expecting %v update", xdsclient.ClusterResource) } // Start a new watch. The new watch should have the nonce from the response // above, and version from the first good response. - v2c.AddWatch(version.V2ClusterURL, goodClusterName1) + v2c.AddWatch(xdsclient.ClusterResource, goodClusterName1) if err := compareXDSRequest(fakeServer.XDSRequestChan, goodCDSRequest, strconv.Itoa(versionCDS-1), nonce); err != nil { - t.Fatalf("Failed to receive %s request: %v", "CDS", err) + t.Fatalf("Failed to receive %v request: %v", xdsclient.ClusterResource, err) } // Send a bad response with the next version. - if err := sendBadResp(t, "CDS", fakeServer, versionCDS, goodCDSRequest); err != nil { + if err := sendBadResp(t, xdsclient.ClusterResource, fakeServer, versionCDS, goodCDSRequest); err != nil { t.Fatal(err) } versionCDS++ // send another good response, and check for ack, with the new version. - if _, err := sendGoodResp(t, "CDS", fakeServer, versionCDS, goodCDSResponse1, goodCDSRequest, cbCDS); err != nil { + if _, err := sendGoodResp(t, xdsclient.ClusterResource, fakeServer, versionCDS, goodCDSResponse1, goodCDSRequest, cbCDS); err != nil { t.Fatal(err) } versionCDS++ diff --git a/xds/internal/client/v2/client_cds_test.go b/xds/internal/client/v2/client_cds_test.go index e93f952e2a7e..8538f4e46afc 100644 --- a/xds/internal/client/v2/client_cds_test.go +++ b/xds/internal/client/v2/client_cds_test.go @@ -153,7 +153,7 @@ func (s) TestCDSHandleResponse(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { testWatchHandle(t, &watchHandleTestcase{ - typeURL: version.V2ClusterURL, + rType: xdsclient.ClusterResource, resourceName: goodClusterName1, responseToHandle: test.cdsResponse, @@ -172,7 +172,7 @@ func (s) TestCDSHandleResponseWithoutWatch(t *testing.T) { defer cleanup() v2c, err := newV2Client(&testUpdateReceiver{ - f: func(string, map[string]interface{}) {}, + f: func(xdsclient.ResourceType, map[string]interface{}) {}, }, cc, goodNodeProto, func(int) time.Duration { return 0 }, nil) if err != nil { t.Fatal(err) diff --git a/xds/internal/client/v2/client_eds_test.go b/xds/internal/client/v2/client_eds_test.go index 170a96a42491..7af74f0b6393 100644 --- a/xds/internal/client/v2/client_eds_test.go +++ b/xds/internal/client/v2/client_eds_test.go @@ -135,7 +135,7 @@ func (s) TestEDSHandleResponse(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { testWatchHandle(t, &watchHandleTestcase{ - typeURL: version.V2EndpointsURL, + rType: xdsclient.EndpointsResource, resourceName: goodEDSName, responseToHandle: test.edsResponse, wantHandleErr: test.wantErr, @@ -153,7 +153,7 @@ func (s) TestEDSHandleResponseWithoutWatch(t *testing.T) { defer cleanup() v2c, err := newV2Client(&testUpdateReceiver{ - f: func(string, map[string]interface{}) {}, + f: func(xdsclient.ResourceType, map[string]interface{}) {}, }, cc, goodNodeProto, func(int) time.Duration { return 0 }, nil) if err != nil { t.Fatal(err) diff --git a/xds/internal/client/v2/client_lds_test.go b/xds/internal/client/v2/client_lds_test.go index ca8161f504f4..854cf3ccee73 100644 --- a/xds/internal/client/v2/client_lds_test.go +++ b/xds/internal/client/v2/client_lds_test.go @@ -23,8 +23,8 @@ import ( "time" v2xdspb "github.com/envoyproxy/go-control-plane/envoy/api/v2" + xdsclient "google.golang.org/grpc/xds/internal/client" - "google.golang.org/grpc/xds/internal/version" ) // TestLDSHandleResponse starts a fake xDS server, makes a ClientConn to it, @@ -113,7 +113,7 @@ func (s) TestLDSHandleResponse(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { testWatchHandle(t, &watchHandleTestcase{ - typeURL: version.V2ListenerURL, + rType: xdsclient.ListenerResource, resourceName: goodLDSTarget1, responseToHandle: test.ldsResponse, wantHandleErr: test.wantErr, @@ -131,7 +131,7 @@ func (s) TestLDSHandleResponseWithoutWatch(t *testing.T) { defer cleanup() v2c, err := newV2Client(&testUpdateReceiver{ - f: func(string, map[string]interface{}) {}, + f: func(xdsclient.ResourceType, map[string]interface{}) {}, }, cc, goodNodeProto, func(int) time.Duration { return 0 }, nil) if err != nil { t.Fatal(err) diff --git a/xds/internal/client/v2/client_rds_test.go b/xds/internal/client/v2/client_rds_test.go index aa44371ed350..59e63d83e287 100644 --- a/xds/internal/client/v2/client_rds_test.go +++ b/xds/internal/client/v2/client_rds_test.go @@ -23,9 +23,9 @@ import ( "time" xdspb "github.com/envoyproxy/go-control-plane/envoy/api/v2" + xdsclient "google.golang.org/grpc/xds/internal/client" "google.golang.org/grpc/xds/internal/testutils/fakeserver" - "google.golang.org/grpc/xds/internal/version" ) // doLDS makes a LDS watch, and waits for the response and ack to finish. @@ -34,7 +34,7 @@ import ( // pre-requirement for RDS, and RDS handle would fail without an existing LDS // watch. func doLDS(t *testing.T, v2c xdsclient.APIClient, fakeServer *fakeserver.Server) { - v2c.AddWatch(version.V2ListenerURL, goodLDSTarget1) + v2c.AddWatch(xdsclient.ListenerResource, goodLDSTarget1) if _, err := fakeServer.XDSRequestChan.Receive(); err != nil { t.Fatalf("Timeout waiting for LDS request: %v", err) } @@ -112,7 +112,7 @@ func (s) TestRDSHandleResponseWithRouting(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { testWatchHandle(t, &watchHandleTestcase{ - typeURL: version.V2RouteConfigURL, + rType: xdsclient.RouteConfigResource, resourceName: goodRouteName1, responseToHandle: test.rdsResponse, wantHandleErr: test.wantErr, @@ -130,7 +130,7 @@ func (s) TestRDSHandleResponseWithoutLDSWatch(t *testing.T) { defer cleanup() v2c, err := newV2Client(&testUpdateReceiver{ - f: func(string, map[string]interface{}) {}, + f: func(xdsclient.ResourceType, map[string]interface{}) {}, }, cc, goodNodeProto, func(int) time.Duration { return 0 }, nil) if err != nil { t.Fatal(err) @@ -149,7 +149,7 @@ func (s) TestRDSHandleResponseWithoutRDSWatch(t *testing.T) { defer cleanup() v2c, err := newV2Client(&testUpdateReceiver{ - f: func(string, map[string]interface{}) {}, + f: func(xdsclient.ResourceType, map[string]interface{}) {}, }, cc, goodNodeProto, func(int) time.Duration { return 0 }, nil) if err != nil { t.Fatal(err) diff --git a/xds/internal/client/v2/client_test.go b/xds/internal/client/v2/client_test.go index 024357d8476e..fecda6ad06d9 100644 --- a/xds/internal/client/v2/client_test.go +++ b/xds/internal/client/v2/client_test.go @@ -331,7 +331,7 @@ var ( ) type watchHandleTestcase struct { - typeURL string + rType xdsclient.ResourceType resourceName string responseToHandle *xdspb.DiscoveryResponse @@ -341,7 +341,7 @@ type watchHandleTestcase struct { } type testUpdateReceiver struct { - f func(typeURL string, d map[string]interface{}) + f func(rType xdsclient.ResourceType, d map[string]interface{}) } func (t *testUpdateReceiver) NewListeners(d map[string]xdsclient.ListenerUpdate) { @@ -349,7 +349,7 @@ func (t *testUpdateReceiver) NewListeners(d map[string]xdsclient.ListenerUpdate) for k, v := range d { dd[k] = v } - t.newUpdate(version.V2ListenerURL, dd) + t.newUpdate(xdsclient.ListenerResource, dd) } func (t *testUpdateReceiver) NewRouteConfigs(d map[string]xdsclient.RouteConfigUpdate) { @@ -357,7 +357,7 @@ func (t *testUpdateReceiver) NewRouteConfigs(d map[string]xdsclient.RouteConfigU for k, v := range d { dd[k] = v } - t.newUpdate(version.V2RouteConfigURL, dd) + t.newUpdate(xdsclient.RouteConfigResource, dd) } func (t *testUpdateReceiver) NewClusters(d map[string]xdsclient.ClusterUpdate) { @@ -365,7 +365,7 @@ func (t *testUpdateReceiver) NewClusters(d map[string]xdsclient.ClusterUpdate) { for k, v := range d { dd[k] = v } - t.newUpdate(version.V2ClusterURL, dd) + t.newUpdate(xdsclient.ClusterResource, dd) } func (t *testUpdateReceiver) NewEndpoints(d map[string]xdsclient.EndpointsUpdate) { @@ -373,11 +373,11 @@ func (t *testUpdateReceiver) NewEndpoints(d map[string]xdsclient.EndpointsUpdate for k, v := range d { dd[k] = v } - t.newUpdate(version.V2EndpointsURL, dd) + t.newUpdate(xdsclient.EndpointsResource, dd) } -func (t *testUpdateReceiver) newUpdate(typeURL string, d map[string]interface{}) { - t.f(typeURL, d) +func (t *testUpdateReceiver) newUpdate(rType xdsclient.ResourceType, d map[string]interface{}) { + t.f(rType, d) } // testWatchHandle is called to test response handling for each xDS. @@ -397,8 +397,8 @@ func testWatchHandle(t *testing.T, test *watchHandleTestcase) { gotUpdateCh := testutils.NewChannel() v2c, err := newV2Client(&testUpdateReceiver{ - f: func(typeURL string, d map[string]interface{}) { - if typeURL == test.typeURL { + f: func(rType xdsclient.ResourceType, d map[string]interface{}) { + if rType == test.rType { if u, ok := d[test.resourceName]; ok { gotUpdateCh.Send(updateErr{u, nil}) } @@ -410,14 +410,14 @@ func testWatchHandle(t *testing.T, test *watchHandleTestcase) { } defer v2c.Close() - // RDS needs an existin LDS watch for the hostname. - if test.typeURL == version.V2RouteConfigURL { + // RDS needs an existing LDS watch for the hostname. + if test.rType == xdsclient.RouteConfigResource { doLDS(t, v2c, fakeServer) } // Register the watcher, this will also trigger the v2Client to send the xDS // request. - v2c.AddWatch(test.typeURL, test.resourceName) + v2c.AddWatch(test.rType, test.resourceName) // Wait till the request makes it to the fakeServer. This ensures that // the watch request has been processed by the v2Client. @@ -432,14 +432,14 @@ func testWatchHandle(t *testing.T, test *watchHandleTestcase) { // Also note that this won't trigger ACK, so there's no need to clear the // request channel afterwards. var handleXDSResp func(response *xdspb.DiscoveryResponse) error - switch test.typeURL { - case version.V2ListenerURL: + switch test.rType { + case xdsclient.ListenerResource: handleXDSResp = v2c.handleLDSResponse - case version.V2RouteConfigURL: + case xdsclient.RouteConfigResource: handleXDSResp = v2c.handleRDSResponse - case version.V2ClusterURL: + case xdsclient.ClusterResource: handleXDSResp = v2c.handleCDSResponse - case version.V2EndpointsURL: + case xdsclient.EndpointsResource: handleXDSResp = v2c.handleEDSResponse } if err := handleXDSResp(test.responseToHandle); (err != nil) != test.wantHandleErr { @@ -524,7 +524,7 @@ func (s) TestV2ClientBackoffAfterRecvError(t *testing.T) { callbackCh := make(chan struct{}) v2c, err := newV2Client(&testUpdateReceiver{ - f: func(string, map[string]interface{}) { close(callbackCh) }, + f: func(xdsclient.ResourceType, map[string]interface{}) { close(callbackCh) }, }, cc, goodNodeProto, clientBackoff, nil) if err != nil { t.Fatal(err) @@ -532,7 +532,7 @@ func (s) TestV2ClientBackoffAfterRecvError(t *testing.T) { defer v2c.Close() t.Log("Started xds v2Client...") - v2c.AddWatch(version.V2ListenerURL, goodLDSTarget1) + v2c.AddWatch(xdsclient.ListenerResource, goodLDSTarget1) if _, err := fakeServer.XDSRequestChan.Receive(); err != nil { t.Fatalf("Timeout expired when expecting an LDS request") } @@ -567,8 +567,8 @@ func (s) TestV2ClientRetriesAfterBrokenStream(t *testing.T) { callbackCh := testutils.NewChannel() v2c, err := newV2Client(&testUpdateReceiver{ - f: func(typeURL string, d map[string]interface{}) { - if typeURL == version.V2ListenerURL { + f: func(rType xdsclient.ResourceType, d map[string]interface{}) { + if rType == xdsclient.ListenerResource { if u, ok := d[goodLDSTarget1]; ok { t.Logf("Received LDS callback with ldsUpdate {%+v}", u) callbackCh.Send(struct{}{}) @@ -582,7 +582,7 @@ func (s) TestV2ClientRetriesAfterBrokenStream(t *testing.T) { defer v2c.Close() t.Log("Started xds v2Client...") - v2c.AddWatch(version.V2ListenerURL, goodLDSTarget1) + v2c.AddWatch(xdsclient.ListenerResource, goodLDSTarget1) if _, err := fakeServer.XDSRequestChan.Receive(); err != nil { t.Fatalf("Timeout expired when expecting an LDS request") } @@ -637,8 +637,8 @@ func (s) TestV2ClientWatchWithoutStream(t *testing.T) { callbackCh := testutils.NewChannel() v2c, err := newV2Client(&testUpdateReceiver{ - f: func(typeURL string, d map[string]interface{}) { - if typeURL == version.V2ListenerURL { + f: func(rType xdsclient.ResourceType, d map[string]interface{}) { + if rType == xdsclient.ListenerResource { if u, ok := d[goodLDSTarget1]; ok { t.Logf("Received LDS callback with ldsUpdate {%+v}", u) callbackCh.Send(u) @@ -654,7 +654,7 @@ func (s) TestV2ClientWatchWithoutStream(t *testing.T) { // This watch is started when the xds-ClientConn is in Transient Failure, // and no xds stream is created. - v2c.AddWatch(version.V2ListenerURL, goodLDSTarget1) + v2c.AddWatch(xdsclient.ListenerResource, goodLDSTarget1) // The watcher should receive an update, with a timeout error in it. if v, err := callbackCh.TimedReceive(100 * time.Millisecond); err == nil { diff --git a/xds/internal/client/v3/client.go b/xds/internal/client/v3/client.go index 56bb245e32a6..a1103f8f8b85 100644 --- a/xds/internal/client/v3/client.go +++ b/xds/internal/client/v3/client.go @@ -39,6 +39,15 @@ func init() { xdsclient.RegisterAPIClientBuilder(clientBuilder{}) } +var ( + resourceTypeToURL = map[xdsclient.ResourceType]string{ + xdsclient.ListenerResource: version.V2ListenerURL, + xdsclient.RouteConfigResource: version.V2RouteConfigURL, + xdsclient.ClusterResource: version.V2ClusterURL, + xdsclient.EndpointsResource: version.V2EndpointsURL, + } +) + type clientBuilder struct{} func (clientBuilder) Build(cc *grpc.ClientConn, opts xdsclient.BuildOptions) (xdsclient.APIClient, error) { @@ -93,30 +102,30 @@ type client struct { } // AddWatch overrides the transport helper's AddWatch to save the LDS -// resource_name. This is required when handling an RDS response to perform hot +// resource_name. This is required when handling an RDS response to perform host // matching. -func (v3c *client) AddWatch(resourceType, resourceName string) { +func (v3c *client) AddWatch(rType xdsclient.ResourceType, rName string) { v3c.mu.Lock() // Special handling for LDS, because RDS needs the LDS resource_name for // response host matching. - if resourceType == version.V2ListenerURL || resourceType == version.V3ListenerURL { + if rType == xdsclient.ListenerResource { // Set hostname to the first LDS resource_name, and reset it when the // last LDS watch is removed. The upper level Client isn't expected to // watchLDS more than once. v3c.ldsWatchCount++ if v3c.ldsWatchCount == 1 { - v3c.ldsResourceName = resourceName + v3c.ldsResourceName = rName } } v3c.mu.Unlock() - v3c.TransportHelper.AddWatch(resourceType, resourceName) + v3c.TransportHelper.AddWatch(rType, rName) } -func (v3c *client) RemoveWatch(resourceType, resourceName string) { +func (v3c *client) RemoveWatch(rType xdsclient.ResourceType, rName string) { v3c.mu.Lock() // Special handling for LDS, because RDS needs the LDS resource_name for // response host matching. - if resourceType == version.V2ListenerURL || resourceType == version.V3ListenerURL { + if rType == xdsclient.ListenerResource { // Set hostname to the first LDS resource_name, and reset it when the // last LDS watch is removed. The upper level Client isn't expected to // watchLDS more than once. @@ -126,30 +135,29 @@ func (v3c *client) RemoveWatch(resourceType, resourceName string) { } } v3c.mu.Unlock() - v3c.TransportHelper.RemoveWatch(resourceType, resourceName) + v3c.TransportHelper.RemoveWatch(rType, rName) } func (v3c *client) NewStream(ctx context.Context) (grpc.ClientStream, error) { return v3adsgrpc.NewAggregatedDiscoveryServiceClient(v3c.cc).StreamAggregatedResources(v3c.ctx, grpc.WaitForReady(true)) } -// sendRequest sends a request for provided typeURL and resource on the provided -// stream. +// sendRequest sends out a DiscoveryRequest for the given resourceNames, of type +// rType, on the provided stream. // // version is the ack version to be sent with the request -// - If this is the new request (not an ack/nack), version will be an empty -// string -// - If this is an ack, version will be the version from the response +// - If this is the new request (not an ack/nack), version will be empty. +// - If this is an ack, version will be the version from the response. // - If this is a nack, version will be the previous acked version (from -// versionMap). If there was no ack before, it will be an empty string -func (v3c *client) SendRequest(s grpc.ClientStream, resourceNames []string, typeURL, version, nonce string) error { +// versionMap). If there was no ack before, it will be empty. +func (v3c *client) SendRequest(s grpc.ClientStream, resourceNames []string, rType xdsclient.ResourceType, version, nonce string) error { stream, ok := s.(adsStream) if !ok { return fmt.Errorf("xds: Attempt to send request on unsupported stream type: %T", s) } req := &v3discoverypb.DiscoveryRequest{ Node: v3c.nodeProto, - TypeUrl: typeURL, + TypeUrl: resourceTypeToURL[rType], ResourceNames: resourceNames, VersionInfo: version, ResponseNonce: nonce, @@ -180,10 +188,11 @@ func (v3c *client) RecvResponse(s grpc.ClientStream) (proto.Message, error) { return resp, nil } -func (v3c *client) HandleResponse(r proto.Message) (string, string, string, error) { +func (v3c *client) HandleResponse(r proto.Message) (xdsclient.ResourceType, string, string, error) { + rType := xdsclient.UnknownResource resp, ok := r.(*v3discoverypb.DiscoveryResponse) if !ok { - return "", "", "", fmt.Errorf("xds: unsupported message type: %T", resp) + return rType, "", "", fmt.Errorf("xds: unsupported message type: %T", resp) } // Note that the xDS transport protocol is versioned independently of @@ -191,21 +200,26 @@ func (v3c *client) HandleResponse(r proto.Message) (string, string, string, erro // of resource types using new versions of the transport protocol, or // vice-versa. Hence we need to handle v3 type_urls as well here. var err error - switch resp.GetTypeUrl() { - case version.V2ListenerURL, version.V3ListenerURL: + url := resp.GetTypeUrl() + switch { + case xdsclient.IsListenerResource(url): err = v3c.handleLDSResponse(resp) - case version.V2RouteConfigURL, version.V3RouteConfigURL: + rType = xdsclient.ListenerResource + case xdsclient.IsRouteConfigResource(url): err = v3c.handleRDSResponse(resp) - case version.V2ClusterURL, version.V3ClusterURL: + rType = xdsclient.RouteConfigResource + case xdsclient.IsClusterResource(url): err = v3c.handleCDSResponse(resp) - case version.V2EndpointsURL, version.V3EndpointsURL: + rType = xdsclient.ClusterResource + case xdsclient.IsEndpointsResource(url): err = v3c.handleEDSResponse(resp) + rType = xdsclient.EndpointsResource default: - return "", "", "", xdsclient.ErrResourceTypeUnsupported{ + return rType, "", "", xdsclient.ErrResourceTypeUnsupported{ ErrStr: fmt.Sprintf("Resource type %v unknown in response from server", resp.GetTypeUrl()), } } - return resp.GetTypeUrl(), resp.GetVersionInfo(), resp.GetNonce(), err + return rType, resp.GetVersionInfo(), resp.GetNonce(), err } // handleLDSResponse processes an LDS response received from the xDS server. On diff --git a/xds/xds.go b/xds/xds.go index 40d62605c263..fa0d699e8734 100644 --- a/xds/xds.go +++ b/xds/xds.go @@ -26,5 +26,6 @@ package xds import ( _ "google.golang.org/grpc/xds/internal/balancer" // Register the balancers. _ "google.golang.org/grpc/xds/internal/client/v2" // Register the v2 xDS API client. + _ "google.golang.org/grpc/xds/internal/client/v3" // Register the v3 xDS API client. _ "google.golang.org/grpc/xds/internal/resolver" // Register the xds_resolver. )