diff --git a/go/vt/vtgateproxy/discovery.go b/go/vt/vtgateproxy/discovery.go index 40df37f376b..a4b48340bfd 100644 --- a/go/vt/vtgateproxy/discovery.go +++ b/go/vt/vtgateproxy/discovery.go @@ -23,12 +23,15 @@ import ( "io" "math/rand" "os" + "sort" + "sync" "time" "google.golang.org/grpc/resolver" "vitess.io/vitess/go/stats" "vitess.io/vitess/go/vt/log" + "vitess.io/vitess/go/vt/servenv" ) // File based discovery for vtgate grpc endpoints @@ -54,14 +57,29 @@ import ( // type: Only select from hosts of this type (required) // +// Resolver(https://godoc.org/google.golang.org/grpc/resolver#Resolver). +type JSONGateResolver struct { + target resolver.Target + clientConn resolver.ClientConn + poolType string +} + +func (r *JSONGateResolver) ResolveNow(o resolver.ResolveNowOptions) {} + +func (r *JSONGateResolver) Close() { + log.Infof("Closing resolver for target %s", r.target.URL.String()) +} + type JSONGateResolverBuilder struct { jsonPath string addressField string portField string poolTypeField string affinityField string + affinityValue string - targets []targetHost + mu sync.RWMutex + targets map[string][]targetHost resolvers []*JSONGateResolver rand *rand.Rand @@ -70,24 +88,14 @@ type JSONGateResolverBuilder struct { } type targetHost struct { - addr string - poolType string - affinity string -} - -// Resolver(https://godoc.org/google.golang.org/grpc/resolver#Resolver). -type JSONGateResolver struct { - target resolver.Target - clientConn resolver.ClientConn - poolType string - affinity string + Addr string + PoolType string + Affinity string } var ( - buildCount = stats.NewCounter("JsonDiscoveryBuild", "JSON host discovery rebuilt the host list") - unchangedCount = stats.NewCounter("JsonDiscoveryUnchanged", "JSON host discovery parsed and determined no change to the file") - affinityCount = stats.NewCountersWithSingleLabel("JsonDiscoveryHostAffinity", "Count of hosts returned from discovery by AZ affinity", "affinity") - poolTypeCount = stats.NewCountersWithSingleLabel("JsonDiscoveryHostPoolType", "Count of hosts returned from discovery by pool type", "type") + parseCount = stats.NewCountersWithSingleLabel("JsonDiscoveryParseCount", "Count of results of JSON host file parsing (changed, unchanged, error)", "result") + targetCount = stats.NewGaugesWithSingleLabel("JsonDiscoveryTargetCount", "Count of hosts returned from discovery by pool type", "pool") ) func RegisterJSONGateResolver( @@ -96,13 +104,16 @@ func RegisterJSONGateResolver( portField string, poolTypeField string, affinityField string, + affinityValue string, ) (*JSONGateResolverBuilder, error) { jsonDiscovery := &JSONGateResolverBuilder{ + targets: map[string][]targetHost{}, jsonPath: jsonPath, addressField: addressField, portField: portField, poolTypeField: poolTypeField, affinityField: affinityField, + affinityValue: affinityValue, } resolver.Register(jsonDiscovery) @@ -113,6 +124,8 @@ func RegisterJSONGateResolver( return nil, err } + servenv.AddStatusPart("JSON Discovery", targetsTemplate, jsonDiscovery.debugTargets) + return jsonDiscovery, nil } @@ -138,17 +151,19 @@ func (b *JSONGateResolverBuilder) start() error { poolTypes := map[string]int{} affinityTypes := map[string]int{} - for _, t := range b.targets { - count := poolTypes[t.poolType] - poolTypes[t.poolType] = count + 1 + for _, ts := range b.targets { + for _, t := range ts { + count := poolTypes[t.PoolType] + poolTypes[t.PoolType] = count + 1 - count = affinityTypes[t.affinity] - affinityTypes[t.affinity] = count + 1 + count = affinityTypes[t.Affinity] + affinityTypes[t.Affinity] = count + 1 + } } - buildCount.Add(1) + parseCount.Add("changed", 1) - log.Infof("loaded %d targets, pool types %v, affinity groups %v", len(b.targets), poolTypes, affinityTypes) + log.Infof("loaded targets, pool types %v, affinity %s, groups %v", poolTypes, *affinityValue, affinityTypes) // Start a config watcher b.ticker = time.NewTicker(1 * time.Second) @@ -158,10 +173,12 @@ func (b *JSONGateResolverBuilder) start() error { } go func() { + var parseErr error for range b.ticker.C { checkFileStat, err := os.Stat(b.jsonPath) if err != nil { log.Errorf("Error stat'ing config %v\n", err) + parseCount.Add("error", 1) continue } isUnchanged := checkFileStat.Size() == fileStat.Size() && checkFileStat.ModTime() == fileStat.ModTime() @@ -173,12 +190,20 @@ func (b *JSONGateResolverBuilder) start() error { fileStat = checkFileStat contentsChanged, err := b.parse() - if err != nil || !contentsChanged { - unchangedCount.Add(1) + if err != nil { + parseCount.Add("error", 1) + if parseErr == nil || err.Error() != parseErr.Error() { + parseErr = err + log.Error(err) + } continue } - - buildCount.Add(1) + parseErr = nil + if !contentsChanged { + parseCount.Add("unchanged", 1) + continue + } + parseCount.Add("changed", 1) // notify all the resolvers that the targets changed for _, r := range b.resolvers { @@ -217,7 +242,7 @@ func (b *JSONGateResolverBuilder) parse() (bool, error) { return false, fmt.Errorf("error parsing JSON discovery file %s: %v", b.jsonPath, err) } - var targets []targetHost + var targets = map[string][]targetHost{} for _, host := range hosts { address, hasAddress := host[b.addressField] port, hasPort := host[b.portField] @@ -258,29 +283,46 @@ func (b *JSONGateResolverBuilder) parse() (bool, error) { return false, fmt.Errorf("error parsing JSON discovery file %s: port field %s has invalid value %v", b.jsonPath, b.portField, port) } - targets = append(targets, targetHost{fmt.Sprintf("%s:%s", address, port), poolType.(string), affinity.(string)}) + target := targetHost{fmt.Sprintf("%s:%s", address, port), poolType.(string), affinity.(string)} + targets[target.PoolType] = append(targets[target.PoolType], target) + } + + for poolType := range targets { + if b.affinityField != "" { + sort.Slice(targets[poolType], func(i, j int) bool { + return b.affinityValue == targets[poolType][i].Affinity + }) + } + if len(targets[poolType]) > *numConnections { + targets[poolType] = targets[poolType][:*numConnections] + } + targetCount.Set(poolType, int64(len(targets[poolType]))) } + + b.mu.Lock() b.targets = targets + b.mu.Unlock() return true, nil } -// Update the current list of hosts for the given resolver -func (b *JSONGateResolverBuilder) update(r *JSONGateResolver) { - - log.V(100).Infof("resolving target %s to %d connections\n", r.target.URL.String(), *numConnections) +func (b *JSONGateResolverBuilder) GetPools() []string { + b.mu.RLock() + defer b.mu.RUnlock() + var pools []string + for pool := range b.targets { + pools = append(pools, pool) + } + sort.Strings(pools) + return pools +} - // filter to only targets that match the pool type. if unset, this will just be a copy - // of the full target list. +func (b *JSONGateResolverBuilder) GetTargets(poolType string) []targetHost { + // Copy the target slice + b.mu.RLock() targets := []targetHost{} - for _, target := range b.targets { - if r.poolType == target.poolType { - targets = append(targets, target) - log.V(1000).Infof("matched target %v with type %s", target, r.poolType) - } else { - log.V(1000).Infof("skipping host %v with type %s", target, r.poolType) - } - } + targets = append(targets, b.targets[poolType]...) + b.mu.RUnlock() // Shuffle to ensure every host has a different order to iterate through, putting // the affinity matching (e.g. same az) hosts at the front and the non-matching ones @@ -293,7 +335,7 @@ func (b *JSONGateResolverBuilder) update(r *JSONGateResolver) { for i := 0; i < n-1; i++ { j := head + b.rand.Intn(tail-head+1) - if r.affinity == "" || r.affinity == targets[j].affinity { + if *affinityField != "" && *affinityValue == targets[j].Affinity { targets[head], targets[j] = targets[j], targets[head] head++ } else { @@ -302,32 +344,22 @@ func (b *JSONGateResolverBuilder) update(r *JSONGateResolver) { } } - // Grab the first N addresses, and voila! - var addrs []resolver.Address - targets = targets[:min(*numConnections, len(targets))] - for _, target := range targets { - addrs = append(addrs, resolver.Address{Addr: target.addr}) - } + return targets +} + +// Update the current list of hosts for the given resolver +func (b *JSONGateResolverBuilder) update(r *JSONGateResolver) { + + log.V(100).Infof("resolving target %s to %d connections\n", r.target.URL.String(), *numConnections) - // Count some metrics - var unknown, local, remote int64 + targets := b.GetTargets(r.poolType) + + var addrs []resolver.Address for _, target := range targets { - if r.affinity == "" { - unknown++ - } else if r.affinity == target.affinity { - local++ - } else { - remote++ - } + addrs = append(addrs, resolver.Address{Addr: target.Addr}) } - if unknown != 0 { - affinityCount.Add("unknown", unknown) - } - affinityCount.Add("local", local) - affinityCount.Add("remote", remote) - poolTypeCount.Add(r.poolType, int64(len(targets))) - log.V(100).Infof("updated targets for %s to %v (local %d / remote %d)", r.target.URL.String(), targets, local, remote) + log.V(100).Infof("updated targets for %s to %v", r.target.URL.String(), targets) r.clientConn.UpdateState(resolver.State{Addresses: addrs}) } @@ -346,19 +378,12 @@ func (b *JSONGateResolverBuilder) Build(target resolver.Target, cc resolver.Clie } } - // Affinity on the other hand is just an optimization - affinity := "" - if b.affinityField != "" { - affinity = attrs.Get(b.affinityField) - } - - log.V(100).Infof("Start discovery for target %v poolType %s affinity %s\n", target.URL.String(), poolType, affinity) + log.V(100).Infof("Start discovery for target %v poolType %s affinity %s\n", target.URL.String(), poolType, b.affinityValue) r := &JSONGateResolver{ target: target, clientConn: cc, poolType: poolType, - affinity: affinity, } b.update(r) @@ -367,19 +392,44 @@ func (b *JSONGateResolverBuilder) Build(target resolver.Target, cc resolver.Clie return r, nil } -func (r *JSONGateResolver) ResolveNow(o resolver.ResolveNowOptions) {} - -func (r *JSONGateResolver) Close() { - log.Infof("Closing resolver for target %s", r.target.URL.String()) -} - -// Utilities -func min(a, b int) int { - if a < b { - return a +// debugTargets will return the builder's targets with a sorted slice of +// poolTypes for rendering debug output +func (b *JSONGateResolverBuilder) debugTargets() any { + pools := b.GetPools() + targets := map[string][]targetHost{} + for pool := range b.targets { + targets[pool] = b.GetTargets(pool) + } + return struct { + Pools []string + Targets map[string][]targetHost + }{ + Pools: pools, + Targets: targets, } - return b } -func init() { -} +const ( + // targetsTemplate is a HTML template to display the gate resolver's target hosts. + targetsTemplate = ` + + +{{range $i, $p := .Pools}} + + +{{range index $.Targets $p}} + + + {{end}} +{{end}} +
{{$p}}
{{.Addr}}{{.Affinity}}
+` +) diff --git a/go/vt/vtgateproxy/vtgateproxy.go b/go/vt/vtgateproxy/vtgateproxy.go index bdf44348450..86ebb6350ff 100644 --- a/go/vt/vtgateproxy/vtgateproxy.go +++ b/go/vt/vtgateproxy/vtgateproxy.go @@ -43,7 +43,8 @@ var ( vtgateHostsFile = flag.String("vtgate_hosts_file", "", "json file describing the host list to use for vtgate:// resolution") numConnections = flag.Int("num_connections", 4, "number of outbound GPRC connections to maintain") poolTypeField = flag.String("pool_type_field", "", "Field name used to specify the target vtgate type and filter the hosts") - affinityField = flag.String("affinity_field", "", "Attribute (both mysql protocol connection and JSON file) used to specify the routing affinity , e.g. 'az_id'") + affinityField = flag.String("affinity_field", "", "Attribute (JSON file) used to specify the routing affinity , e.g. 'az_id'") + affinityValue = flag.String("affinity_value", "", "Value to match for routing affinity , e.g. 'use-az1'") addressField = flag.String("address_field", "address", "field name in the json file containing the address") portField = flag.String("port_field", "port", "field name in the json file containing the port") @@ -194,5 +195,6 @@ func Init() { *portField, *poolTypeField, *affinityField, + *affinityValue, ) }