Skip to content

Commit

Permalink
xds: handle errors in xds_client (#3658)
Browse files Browse the repository at this point in the history
- xds_client
  - send resource-not-found error when a resource is removed for LDS or CDS
  - handle LDS resource-not-found to cancel RDS watch

- test update because it was expecting no update when resource is removed
- test cleanup to apply timeout to channels
  • Loading branch information
menghanl authored Jun 3, 2020
1 parent cb7f5de commit 42eed59
Show file tree
Hide file tree
Showing 9 changed files with 467 additions and 66 deletions.
54 changes: 36 additions & 18 deletions xds/internal/client/client_callback.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,11 +74,11 @@ func (c *Client) callCallback(wiu *watcherInfoWithUpdate) {
//
// A response can contain multiple resources. They will be parsed and put in a
// map from resource name to the resource content.
func (c *Client) newLDSUpdate(d map[string]ldsUpdate) {
func (c *Client) newLDSUpdate(updates map[string]ldsUpdate) {
c.mu.Lock()
defer c.mu.Unlock()

for name, update := range d {
for name, update := range updates {
if s, ok := c.ldsWatchers[name]; ok {
for wi := range s {
wi.newUpdate(update)
Expand All @@ -88,23 +88,32 @@ func (c *Client) newLDSUpdate(d map[string]ldsUpdate) {
c.ldsCache[name] = update
}
}
// TODO: handle removing resources, which means if a resource exists in the
// previous update, but not in the new update. This needs the balancers and
// resolvers to handle errors correctly.

// TODO: remove item from cache and remove corresponding RDS cached data.
for name := range c.ldsCache {
if _, ok := updates[name]; !ok {
// If resource exists in cache, but not in the new update, delete it
// from cache, and also send an resource not found error to indicate
// resource removed.
delete(c.ldsCache, name)
for wi := range c.ldsWatchers[name] {
wi.resourceNotFound()
}
}
}
// When LDS resource is removed, we don't delete corresponding RDS cached
// data. The RDS watch will be canceled, and cache entry is removed when the
// last watch is canceled.
}

// newRDSUpdate is called by the underlying xdsv2Client when it receives an xDS
// response.
//
// A response can contain multiple resources. They will be parsed and put in a
// map from resource name to the resource content.
func (c *Client) newRDSUpdate(d map[string]rdsUpdate) {
func (c *Client) newRDSUpdate(updates map[string]rdsUpdate) {
c.mu.Lock()
defer c.mu.Unlock()

for name, update := range d {
for name, update := range updates {
if s, ok := c.rdsWatchers[name]; ok {
for wi := range s {
wi.newUpdate(update)
Expand All @@ -121,11 +130,11 @@ func (c *Client) newRDSUpdate(d map[string]rdsUpdate) {
//
// A response can contain multiple resources. They will be parsed and put in a
// map from resource name to the resource content.
func (c *Client) newCDSUpdate(d map[string]ClusterUpdate) {
func (c *Client) newCDSUpdate(updates map[string]ClusterUpdate) {
c.mu.Lock()
defer c.mu.Unlock()

for name, update := range d {
for name, update := range updates {
if s, ok := c.cdsWatchers[name]; ok {
for wi := range s {
wi.newUpdate(update)
Expand All @@ -135,23 +144,32 @@ func (c *Client) newCDSUpdate(d map[string]ClusterUpdate) {
c.cdsCache[name] = update
}
}
// TODO: handle removing resources, which means if a resource exists in the
// previous update, but not in the new update. This needs the balancers and
// resolvers to handle errors correctly.

// TODO: remove item from cache and remove corresponding EDS cached data.
for name := range c.cdsCache {
if _, ok := updates[name]; !ok {
// If resource exists in cache, but not in the new update, delete it
// from cache, and also send an resource not found error to indicate
// resource removed.
delete(c.cdsCache, name)
for wi := range c.cdsWatchers[name] {
wi.resourceNotFound()
}
}
}
// When CDS resource is removed, we don't delete corresponding EDS cached
// data. The EDS watch will be canceled, and cache entry is removed when the
// last watch is canceled.
}

// newEDSUpdate is called by the underlying xdsv2Client when it receives an xDS
// response.
//
// A response can contain multiple resources. They will be parsed and put in a
// map from resource name to the resource content.
func (c *Client) newEDSUpdate(d map[string]EndpointsUpdate) {
func (c *Client) newEDSUpdate(updates map[string]EndpointsUpdate) {
c.mu.Lock()
defer c.mu.Unlock()

for name, update := range d {
for name, update := range updates {
if s, ok := c.edsWatchers[name]; ok {
for wi := range s {
wi.newUpdate(update)
Expand Down
36 changes: 22 additions & 14 deletions xds/internal/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,8 +126,8 @@ func (s) TestNew(t *testing.T) {
type testXDSV2Client struct {
r updateHandler

addWatches map[string]chan string
removeWatches map[string]chan string
addWatches map[string]*testutils.Channel
removeWatches map[string]*testutils.Channel
}

func overrideNewXDSV2Client() (<-chan *testXDSV2Client, func()) {
Expand All @@ -142,16 +142,16 @@ func overrideNewXDSV2Client() (<-chan *testXDSV2Client, func()) {
}

func newTestXDSV2Client(r updateHandler) *testXDSV2Client {
addWatches := make(map[string]chan string)
addWatches[ldsURL] = make(chan string, 10)
addWatches[rdsURL] = make(chan string, 10)
addWatches[cdsURL] = make(chan string, 10)
addWatches[edsURL] = make(chan string, 10)
removeWatches := make(map[string]chan string)
removeWatches[ldsURL] = make(chan string, 10)
removeWatches[rdsURL] = make(chan string, 10)
removeWatches[cdsURL] = make(chan string, 10)
removeWatches[edsURL] = make(chan string, 10)
addWatches := make(map[string]*testutils.Channel)
addWatches[ldsURL] = testutils.NewChannel()
addWatches[rdsURL] = testutils.NewChannel()
addWatches[cdsURL] = testutils.NewChannel()
addWatches[edsURL] = testutils.NewChannel()
removeWatches := make(map[string]*testutils.Channel)
removeWatches[ldsURL] = testutils.NewChannel()
removeWatches[rdsURL] = testutils.NewChannel()
removeWatches[cdsURL] = testutils.NewChannel()
removeWatches[edsURL] = testutils.NewChannel()
return &testXDSV2Client{
r: r,
addWatches: addWatches,
Expand All @@ -160,11 +160,11 @@ func newTestXDSV2Client(r updateHandler) *testXDSV2Client {
}

func (c *testXDSV2Client) addWatch(resourceType, resourceName string) {
c.addWatches[resourceType] <- resourceName
c.addWatches[resourceType].Send(resourceName)
}

func (c *testXDSV2Client) removeWatch(resourceType, resourceName string) {
c.removeWatches[resourceType] <- resourceName
c.removeWatches[resourceType].Send(resourceName)
}

func (c *testXDSV2Client) close() {}
Expand All @@ -184,11 +184,19 @@ func (s) TestWatchCallAnotherWatch(t *testing.T) {
v2Client := <-v2ClientCh

clusterUpdateCh := testutils.NewChannel()
firstTime := true
c.WatchCluster(testCDSName, func(update ClusterUpdate, err error) {
clusterUpdateCh.Send(clusterUpdateErr{u: update, err: err})
// Calls another watch inline, to ensure there's deadlock.
c.WatchCluster("another-random-name", func(ClusterUpdate, error) {})
if _, err := v2Client.addWatches[cdsURL].Receive(); firstTime && err != nil {
t.Fatalf("want new watch to start, got error %v", err)
}
firstTime = false
})
if _, err := v2Client.addWatches[cdsURL].Receive(); err != nil {
t.Fatalf("want new watch to start, got error %v", err)
}

wantUpdate := ClusterUpdate{ServiceName: testEDSName}
v2Client.r.newCDSUpdate(map[string]ClusterUpdate{
Expand Down
37 changes: 30 additions & 7 deletions xds/internal/client/client_watchers.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,32 +75,43 @@ func (wi *watchInfo) newUpdate(update interface{}) {
wi.c.scheduleCallback(wi, update, nil)
}

func (wi *watchInfo) resourceNotFound() {
wi.mu.Lock()
defer wi.mu.Unlock()
if wi.state == watchInfoStateCanceled {
return
}
wi.state = watchInfoStateRespReceived
wi.expiryTimer.Stop()
wi.sendErrorLocked(NewErrorf(ErrorTypeResourceNotFound, "xds: %s target %s not found in received response", wi.typeURL, wi.target))
}

func (wi *watchInfo) timeout() {
wi.mu.Lock()
defer wi.mu.Unlock()
if wi.state == watchInfoStateCanceled || wi.state == watchInfoStateRespReceived {
return
}
wi.state = watchInfoStateTimeout
wi.sendErrorLocked(fmt.Errorf("xds: %s target %s not found, watcher timeout", wi.typeURL, wi.target))
}

// Caller must hold wi.mu.
func (wi *watchInfo) sendErrorLocked(err error) {
var (
u interface{}
t string
)
switch wi.typeURL {
case ldsURL:
u = ldsUpdate{}
t = "LDS"
case rdsURL:
u = rdsUpdate{}
t = "RDS"
case cdsURL:
u = ClusterUpdate{}
t = "CDS"
case edsURL:
u = EndpointsUpdate{}
t = "EDS"
}
wi.c.scheduleCallback(wi, u, fmt.Errorf("xds: %s target %s not found, watcher timeout", t, wi.target))
wi.c.scheduleCallback(wi, u, err)
}

func (wi *watchInfo) cancel() {
Expand Down Expand Up @@ -185,7 +196,19 @@ func (c *Client) watch(wi *watchInfo) (cancel func()) {
// watching this resource.
delete(watchers, resourceName)
c.v2c.removeWatch(wi.typeURL, resourceName)
// TODO: remove item from cache.
// Remove the resource from cache. When a watch for this
// resource is added later, it will trigger a xDS request with
// resource names, and client will receive new xDS responses.
switch wi.typeURL {
case ldsURL:
delete(c.ldsCache, resourceName)
case rdsURL:
delete(c.rdsCache, resourceName)
case cdsURL:
delete(c.cdsCache, resourceName)
case edsURL:
delete(c.edsCache, resourceName)
}
}
}
}
Expand Down
Loading

0 comments on commit 42eed59

Please sign in to comment.