Skip to content

Commit

Permalink
xds: Export a WatchListener() method on the xdsClient. (#3817)
Browse files Browse the repository at this point in the history
  • Loading branch information
easwars committed Aug 18, 2020
1 parent a3740e5 commit 0f73133
Show file tree
Hide file tree
Showing 20 changed files with 445 additions and 353 deletions.
66 changes: 64 additions & 2 deletions xds/internal/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,10 +96,10 @@ type APIClientBuilder interface {
// version specific implementations of the xDS client.
type APIClient interface {
// AddWatch adds a watch for an xDS resource given its type and name.
AddWatch(resourceType, resourceName string)
AddWatch(ResourceType, string)
// RemoveWatch cancels an already registered watch for an xDS resource
// given its type and name.
RemoveWatch(resourceType, resourceName string)
RemoveWatch(ResourceType, string)
// Close cleans up resources allocated by the API client.
Close()
}
Expand Down Expand Up @@ -396,3 +396,65 @@ func (c *Client) Close() {
c.cc.Close()
c.logger.Infof("Shutdown")
}

// ResourceType identifies resources in a transport protocol agnostic way. These
// will be used in transport version agnostic code, while the versioned API
// clients will map these to appropriate version URLs.
type ResourceType int

// Version agnostic resource type constants.
const (
UnknownResource ResourceType = iota
ListenerResource
HTTPConnManagerResource
RouteConfigResource
ClusterResource
EndpointsResource
)

func (r ResourceType) String() string {
switch r {
case ListenerResource:
return "ListenerResource"
case HTTPConnManagerResource:
return "HTTPConnManagerResource"
case RouteConfigResource:
return "RouteConfigResource"
case ClusterResource:
return "ClusterResource"
case EndpointsResource:
return "EndpointsResource"
default:
return "UnknownResource"
}
}

// IsListenerResource returns true if the provider URL corresponds to an xDS
// Listener resource.
func IsListenerResource(url string) bool {
return url == version.V2ListenerURL || url == version.V3ListenerURL
}

// IsHTTPConnManagerResource returns true if the provider URL corresponds to an xDS
// HTTPConnManager resource.
func IsHTTPConnManagerResource(url string) bool {
return url == version.V2HTTPConnManagerURL || url == version.V3HTTPConnManagerURL
}

// IsRouteConfigResource returns true if the provider URL corresponds to an xDS
// RouteConfig resource.
func IsRouteConfigResource(url string) bool {
return url == version.V2RouteConfigURL || url == version.V3RouteConfigURL
}

// IsClusterResource returns true if the provider URL corresponds to an xDS
// Cluster resource.
func IsClusterResource(url string) bool {
return url == version.V2ClusterURL || url == version.V3ClusterURL
}

// IsEndpointsResource returns true if the provider URL corresponds to an xDS
// Endpoints resource.
func IsEndpointsResource(url string) bool {
return url == version.V2EndpointsURL || url == version.V3EndpointsURL
}
12 changes: 5 additions & 7 deletions xds/internal/client/client_callback.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@

package client

import "google.golang.org/grpc/xds/internal/version"

type watcherInfoWithUpdate struct {
wi *watchInfo
update interface{}
Expand All @@ -46,20 +44,20 @@ func (c *Client) callCallback(wiu *watcherInfoWithUpdate) {
// window that a watcher's callback could be called after the watcher is
// canceled, and the user needs to take care of it.
var ccb func()
switch wiu.wi.typeURL {
case version.V2ListenerURL:
switch wiu.wi.rType {
case ListenerResource:
if s, ok := c.ldsWatchers[wiu.wi.target]; ok && s[wiu.wi] {
ccb = func() { wiu.wi.ldsCallback(wiu.update.(ListenerUpdate), wiu.err) }
}
case version.V2RouteConfigURL:
case RouteConfigResource:
if s, ok := c.rdsWatchers[wiu.wi.target]; ok && s[wiu.wi] {
ccb = func() { wiu.wi.rdsCallback(wiu.update.(RouteConfigUpdate), wiu.err) }
}
case version.V2ClusterURL:
case ClusterResource:
if s, ok := c.cdsWatchers[wiu.wi.target]; ok && s[wiu.wi] {
ccb = func() { wiu.wi.cdsCallback(wiu.update.(ClusterUpdate), wiu.err) }
}
case version.V2EndpointsURL:
case EndpointsResource:
if s, ok := c.edsWatchers[wiu.wi.target]; ok && s[wiu.wi] {
ccb = func() { wiu.wi.edsCallback(wiu.update.(EndpointsUpdate), wiu.err) }
}
Expand Down
34 changes: 18 additions & 16 deletions xds/internal/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,8 @@ func clientOpts(balancerName string, overrideWatchExpiryTImeout bool) Options {
type testAPIClient struct {
r UpdateHandler

addWatches map[string]*testutils.Channel
removeWatches map[string]*testutils.Channel
addWatches map[ResourceType]*testutils.Channel
removeWatches map[ResourceType]*testutils.Channel
}

func overrideNewAPIClient() (<-chan *testAPIClient, func()) {
Expand All @@ -83,28 +83,30 @@ func overrideNewAPIClient() (<-chan *testAPIClient, func()) {
}

func newTestAPIClient(r UpdateHandler) *testAPIClient {
addWatches := make(map[string]*testutils.Channel)
addWatches[version.V2ListenerURL] = testutils.NewChannel()
addWatches[version.V2RouteConfigURL] = testutils.NewChannel()
addWatches[version.V2ClusterURL] = testutils.NewChannel()
addWatches[version.V2EndpointsURL] = testutils.NewChannel()
removeWatches := make(map[string]*testutils.Channel)
removeWatches[version.V2ListenerURL] = testutils.NewChannel()
removeWatches[version.V2RouteConfigURL] = testutils.NewChannel()
removeWatches[version.V2ClusterURL] = testutils.NewChannel()
removeWatches[version.V2EndpointsURL] = testutils.NewChannel()
addWatches := map[ResourceType]*testutils.Channel{
ListenerResource: testutils.NewChannel(),
RouteConfigResource: testutils.NewChannel(),
ClusterResource: testutils.NewChannel(),
EndpointsResource: testutils.NewChannel(),
}
removeWatches := map[ResourceType]*testutils.Channel{
ListenerResource: testutils.NewChannel(),
RouteConfigResource: testutils.NewChannel(),
ClusterResource: testutils.NewChannel(),
EndpointsResource: testutils.NewChannel(),
}
return &testAPIClient{
r: r,
addWatches: addWatches,
removeWatches: removeWatches,
}
}

func (c *testAPIClient) AddWatch(resourceType, resourceName string) {
func (c *testAPIClient) AddWatch(resourceType ResourceType, resourceName string) {
c.addWatches[resourceType].Send(resourceName)
}

func (c *testAPIClient) RemoveWatch(resourceType, resourceName string) {
func (c *testAPIClient) RemoveWatch(resourceType ResourceType, resourceName string) {
c.removeWatches[resourceType].Send(resourceName)
}

Expand All @@ -130,12 +132,12 @@ func (s) TestWatchCallAnotherWatch(t *testing.T) {
clusterUpdateCh.Send(clusterUpdateErr{u: update, err: err})
// Calls another watch inline, to ensure there's deadlock.
c.WatchCluster("another-random-name", func(ClusterUpdate, error) {})
if _, err := v2Client.addWatches[version.V2ClusterURL].Receive(); firstTime && err != nil {
if _, err := v2Client.addWatches[ClusterResource].Receive(); firstTime && err != nil {
t.Fatalf("want new watch to start, got error %v", err)
}
firstTime = false
})
if _, err := v2Client.addWatches[version.V2ClusterURL].Receive(); err != nil {
if _, err := v2Client.addWatches[ClusterResource].Receive(); err != nil {
t.Fatalf("want new watch to start, got error %v", err)
}

Expand Down
Loading

0 comments on commit 0f73133

Please sign in to comment.