Skip to content

Commit

Permalink
Fix ACK handling
Browse files Browse the repository at this point in the history
  • Loading branch information
howardjohn committed Nov 18, 2020
1 parent 6b4285d commit 61d958e
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 29 deletions.
95 changes: 67 additions & 28 deletions adsc/adsc.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"fmt"
"math"
"net"
"sort"
"sync"
"time"

Expand Down Expand Up @@ -93,7 +94,14 @@ type ADSC struct {
// Updates includes the type of the last update received from the server.
Updates chan string

mutex sync.Mutex
mutex sync.Mutex
watches map[string]Watch
}

type Watch struct {
resources []string
lastNonce string
lastVersion string
}

var (
Expand All @@ -106,6 +114,7 @@ func Dial(url string, opts *Config) (*ADSC, error) {
adsc := &ADSC{
done: make(chan error),
Updates: make(chan string, 100),
watches: map[string]Watch{},
RootCert: opts.RootCert,
SystemCerts: opts.SystemCerts,
ClientCert: opts.ClientCert,
Expand Down Expand Up @@ -249,7 +258,6 @@ func (a *ADSC) handleRecv() {
}
}

// TODO: add hook to inject nacks
a.mutex.Lock()
a.ack(msg, names)
a.mutex.Unlock()
Expand Down Expand Up @@ -292,6 +300,7 @@ func (a *ADSC) handleLDS(ll []*listener.Listener) {
}
}
}
sort.Strings(routes)

if dumpScope.DebugEnabled() {
for i, l := range ll {
Expand All @@ -303,18 +312,41 @@ func (a *ADSC) handleLDS(ll []*listener.Listener) {
dumpScope.Debugf("lds %d: %v", i, b)
}
}

a.mutex.Lock()
defer a.mutex.Unlock()
if len(routes) > 0 {
a.sendRequest(resource.RouteType, routes)
}

a.handleResourceUpdate(resource.RouteType, routes)

select {
case a.Updates <- "lds":
default:
}
}

func (a *ADSC) handleResourceUpdate(typeUrl string, resources []string) {
if !listEqual(a.watches[typeUrl].resources, resources) {
scope.Debugf("%v type resources changed: %v -> %v", typeUrl, a.watches[typeUrl].resources, resources)
watch := a.watches[typeUrl]
watch.resources = resources
a.watches[typeUrl] = watch
a.request(typeUrl, watch)
}
}

// listEqual checks that two lists contain all the same elements
func listEqual(a []string, b []string) bool {
if len(a) != len(b) {
return false
}
for i := range a {
if a[i] != b[i] {
return false
}
}
return true
}

// compact representations, for simplified debugging/testing

// TCPListener extracts the core elements from envoy Listener.
Expand Down Expand Up @@ -355,10 +387,10 @@ func (a *ADSC) handleCDS(ll []*cluster.Cluster) {

cn = append(cn, c.Name)
}
sort.Strings(cn)

a.handleResourceUpdate(resource.EndpointType, cn)

if len(cn) > 0 {
a.sendRequest(resource.EndpointType, cn)
}
if dumpScope.DebugEnabled() {
for i, c := range ll {
b, err := marshal.MarshalToString(c)
Expand All @@ -372,6 +404,14 @@ func (a *ADSC) handleCDS(ll []*cluster.Cluster) {

a.mutex.Lock()
defer a.mutex.Unlock()
if !a.InitialLoad {
// first load - Envoy loads listeners after endpoints
_ = a.send(&discovery.DiscoveryRequest{
Node: a.node,
TypeUrl: resource.ListenerType,
}, ReasonInit)
a.InitialLoad = true
}

select {
case a.Updates <- "cds":
Expand Down Expand Up @@ -410,13 +450,6 @@ func (a *ADSC) handleEDS(eds []*endpoint.ClusterLoadAssignment) {
dumpScope.Debugf("eds %d: %v", i, b)
}
}
if !a.InitialLoad {
// first load - Envoy loads listeners after endpoints
_ = a.send(&discovery.DiscoveryRequest{
Node: a.node,
TypeUrl: resource.ListenerType,
}, "init")
}

a.mutex.Lock()
defer a.mutex.Unlock()
Expand All @@ -435,10 +468,6 @@ func (a *ADSC) handleRDS(configurations []*route.RouteConfiguration) {
rds[r.Name] = r
}

if !a.InitialLoad {
a.InitialLoad = true
}

if dumpScope.DebugEnabled() {
for i, r := range configurations {
b, err := marshal.MarshalToString(r)
Expand Down Expand Up @@ -496,28 +525,38 @@ func (a *ADSC) Watch() {
err := a.send(&discovery.DiscoveryRequest{
Node: a.node,
TypeUrl: resource.ClusterType,
}, "init")
}, ReasonInit)
if err != nil {
scope.Errorf("Error sending request: %v", err)
}
}

func (a *ADSC) sendRequest(typeurl string, rsc []string) {
const (
ReasonAck = "ack"
ReasonRequest = "request"
ReasonInit = "init"
)

func (a *ADSC) request(typeUrl string, watch Watch) {
_ = a.send(&discovery.DiscoveryRequest{
ResponseNonce: "",
ResponseNonce: watch.lastNonce,
TypeUrl: typeUrl,
Node: a.node,
TypeUrl: typeurl,
ResourceNames: rsc,
}, "request")
VersionInfo: watch.lastVersion,
ResourceNames: watch.resources,
}, ReasonRequest)
}

func (a *ADSC) ack(msg *discovery.DiscoveryResponse, names []string) {
sendNames := names
watch := a.watches[msg.TypeUrl]
watch.lastNonce = msg.Nonce
watch.lastVersion = msg.VersionInfo
a.watches[msg.TypeUrl] = watch
_ = a.send(&discovery.DiscoveryRequest{
ResponseNonce: msg.Nonce,
TypeUrl: msg.TypeUrl,
Node: a.node,
VersionInfo: msg.VersionInfo,
ResourceNames: sendNames,
}, "ack")
ResourceNames: names,
}, ReasonAck)
}
2 changes: 1 addition & 1 deletion pkg/simulation/xds/xds.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func (x *Simulation) Run(ctx model.Context) error {
go func() {
adsc.Connect(ctx.Args.PilotAddress, &adsc.Config{
Namespace: x.Namespace,
Workload: x.Name,
Workload: x.Name + "-" + x.IP,
Meta: meta,
NodeType: string(x.PodType),
IP: x.IP,
Expand Down

0 comments on commit 61d958e

Please sign in to comment.