Skip to content

Commit

Permalink
Resolve consul plugin data race (#6271)
Browse files Browse the repository at this point in the history
* use dependency injectio to handle fallback resolution

* add changelog

* fixup dns resolvers and test
  • Loading branch information
sam-heilbron authored Apr 8, 2022
1 parent c687552 commit 4f133dd
Show file tree
Hide file tree
Showing 6 changed files with 68 additions and 28 deletions.
5 changes: 5 additions & 0 deletions changelog/v1.12.0-beta2/consul-plugin-data-race.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
changelog:
- type: FIX
issueLink: https://github.com/solo-io/gloo/issues/4782
resolvesIssue: false
description: Fix a data race that occurs when resolving IP addresses from consul services.
52 changes: 50 additions & 2 deletions projects/gloo/pkg/plugins/consul/dns_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package consul
import (
"context"
"net"
"sync"

"github.com/rotisserie/eris"
)
Expand All @@ -15,11 +16,23 @@ type DnsResolver interface {
Resolve(ctx context.Context, address string) ([]net.IPAddr, error)
}

type ConsulDnsResolver struct {
var (
_ DnsResolver = new(dnsResolver)
_ DnsResolver = new(dnsResolverWithFallback)
)

func NewConsulDnsResolver(address string) DnsResolver {
basicResolver := &dnsResolver{
DnsAddress: address,
}
return NewDnsResolverWithFallback(basicResolver)
}

type dnsResolver struct {
DnsAddress string
}

func (c *ConsulDnsResolver) Resolve(ctx context.Context, address string) ([]net.IPAddr, error) {
func (c *dnsResolver) Resolve(ctx context.Context, address string) ([]net.IPAddr, error) {
res := net.Resolver{
PreferGo: true, // otherwise we may use cgo which doesn't resolve on my mac in testing
Dial: func(ctx context.Context, network, address string) (conn net.Conn, err error) {
Expand All @@ -38,3 +51,38 @@ func (c *ConsulDnsResolver) Resolve(ctx context.Context, address string) ([]net.
}
return ipAddrs, nil
}

type dnsResolverWithFallback struct {
resolver DnsResolver

sync.RWMutex
previousResolutions map[string][]net.IPAddr
}

func NewDnsResolverWithFallback(resolver DnsResolver) *dnsResolverWithFallback {
return &dnsResolverWithFallback{
resolver: resolver,
previousResolutions: make(map[string][]net.IPAddr),
}
}

func (d *dnsResolverWithFallback) Resolve(ctx context.Context, address string) ([]net.IPAddr, error) {
ipAddrs, err := d.resolver.Resolve(ctx, address)

// Synchronize access to previous resolutions
d.Lock()
defer d.Unlock()

// If we successfully resolved the addresses, update our last known state and return
if err == nil {
d.previousResolutions[address] = ipAddrs
return ipAddrs, nil
}

// If we did not successfully resolve the addresses, attempt to use the last known state
lastKnownIdAddrs, resolvedPreviously := d.previousResolutions[address]
if !resolvedPreviously {
return nil, err
}
return lastKnownIdAddrs, nil
}
16 changes: 4 additions & 12 deletions projects/gloo/pkg/plugins/consul/eds.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func (p *plugin) WatchEndpoints(writeNamespace string, upstreamsToTrack v1.Upstr
// Here is where the specs are produced; each resulting spec is a grouping of serviceInstances (aka endpoints)
// associated with a single consul service on one datacenter.
specs := refreshSpecs(ctx, p.client, serviceMeta, errChan)
endpoints := buildEndpointsFromSpecs(opts.Ctx, writeNamespace, p.resolver, specs, trackedServiceToUpstreams, p.previousDnsResolutions)
endpoints := buildEndpointsFromSpecs(opts.Ctx, writeNamespace, p.resolver, specs, trackedServiceToUpstreams)

previousHash = hashutils.MustHash(endpoints)
previousSpecs = specs
Expand All @@ -106,7 +106,7 @@ func (p *plugin) WatchEndpoints(writeNamespace string, upstreamsToTrack v1.Upstr

case <-timer.C:
// Poll to ensure any DNS updates get picked up in endpoints for EDS
endpoints := buildEndpointsFromSpecs(opts.Ctx, writeNamespace, p.resolver, previousSpecs, trackedServiceToUpstreams, p.previousDnsResolutions)
endpoints := buildEndpointsFromSpecs(opts.Ctx, writeNamespace, p.resolver, previousSpecs, trackedServiceToUpstreams)

currentHash := hashutils.MustHash(endpoints)
if previousHash == currentHash {
Expand Down Expand Up @@ -190,12 +190,11 @@ func buildEndpointsFromSpecs(
resolver DnsResolver,
specs []*consulapi.CatalogService,
trackedServiceToUpstreams map[string][]*v1.Upstream,
previousResolutions map[string][]string,
) v1.EndpointList {
var endpoints v1.EndpointList
for _, spec := range specs {
if upstreams, ok := trackedServiceToUpstreams[spec.ServiceName]; ok {
if eps, err := buildEndpoints(ctx, writeNamespace, resolver, spec, upstreams, previousResolutions); err != nil {
if eps, err := buildEndpoints(ctx, writeNamespace, resolver, spec, upstreams); err != nil {
contextutils.LoggerFrom(ctx).Warnf("consul eds plugin encountered error resolving DNS for consul service %v", spec, err)
} else {
endpoints = append(endpoints, eps...)
Expand Down Expand Up @@ -273,7 +272,6 @@ func buildEndpoints(
resolver DnsResolver,
service *consulapi.CatalogService,
upstreams []*v1.Upstream,
previousResolutions map[string][]string,
) ([]*v1.Endpoint, error) {

// Address is the IP address of the Consul node on which the service is registered.
Expand All @@ -285,13 +283,7 @@ func buildEndpoints(

ipAddresses, err := getIpAddresses(ctx, address, resolver)
if err != nil {
addresses, resolvedPreviously := previousResolutions[address]
if !resolvedPreviously {
return nil, err
}
ipAddresses = addresses
} else {
previousResolutions[address] = ipAddresses
return nil, err
}

var endpoints []*v1.Endpoint
Expand Down
17 changes: 7 additions & 10 deletions projects/gloo/pkg/plugins/consul/eds_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -583,9 +583,7 @@ var _ = Describe("Consul EDS", func() {
}

// make sure the we have a correct number of generated endpoints:

previousResolutions := make(map[string][]string)
endpoints := buildEndpointsFromSpecs(context.TODO(), writeNamespace, mockDnsResolver, svcs, trackedServiceToUpstreams, previousResolutions)
endpoints := buildEndpointsFromSpecs(context.TODO(), writeNamespace, mockDnsResolver, svcs, trackedServiceToUpstreams)
endpontNames := map[string]bool{}
for _, endpoint := range endpoints {
fmt.Fprintf(GinkgoWriter, "%s%v\n", "endpoint: ", endpoint)
Expand Down Expand Up @@ -637,8 +635,7 @@ var _ = Describe("Consul EDS", func() {
// add another upstream so to test that tag2 is in the labels.
upstream2 := createTestFilteredUpstream("my-svc-2", "my-svc", []string{"tag-2"}, []string{"serf"}, []string{"dc-1", "dc-2"})

previousResolutions := make(map[string][]string)
endpoints, err := buildEndpoints(context.TODO(), writeNamespace, nil, consulService, v1.UpstreamList{upstream, upstream2}, previousResolutions)
endpoints, err := buildEndpoints(context.TODO(), writeNamespace, nil, consulService, v1.UpstreamList{upstream, upstream2})
Expect(err).To(BeNil())
Expect(endpoints).To(HaveLen(1))
Expect(endpoints[0]).To(matchers.BeEquivalentToDiff(&v1.Endpoint{
Expand Down Expand Up @@ -678,8 +675,8 @@ var _ = Describe("Consul EDS", func() {
mockDnsResolver.EXPECT().Resolve(gomock.Any(), gomock.Any()).Do(func(context.Context, string) {
fmt.Fprint(GinkgoWriter, "Initial resolve called.")
}).Return(initialIps, nil).Times(1) // once for each consul service
previousResolutions := make(map[string][]string)
endpoints, err := buildEndpoints(context.TODO(), writeNamespace, mockDnsResolver, consulService, v1.UpstreamList{upstream}, previousResolutions)

endpoints, err := buildEndpoints(context.TODO(), writeNamespace, mockDnsResolver, consulService, v1.UpstreamList{upstream})
Expect(err).To(BeNil())
Expect(endpoints).To(HaveLen(1))
Expect(endpoints[0]).To(matchers.BeEquivalentToDiff(&v1.Endpoint{
Expand Down Expand Up @@ -715,15 +712,15 @@ var _ = Describe("Consul EDS", func() {

initialIps := []net.IPAddr{{IP: net.IPv4(127, 0, 0, 1)}}
mockDnsResolver := mock_consul2.NewMockDnsResolver(ctrl)
mockDnsResolverWithFallback := NewDnsResolverWithFallback(mockDnsResolver)
mockDnsResolver.EXPECT().Resolve(gomock.Any(), gomock.Any()).Do(func(context.Context, string) {
fmt.Fprint(GinkgoWriter, "Initial resolve called.")
}).Return(initialIps, nil).Times(1)

upstream := createTestFilteredUpstream("my-svc", "my-svc", []string{"tag-1"}, []string{"http"}, []string{"dc-1", "dc-2"})

previousResolutions := make(map[string][]string)
// Initial call should be successfull
endpoints, err := buildEndpoints(context.TODO(), writeNamespace, mockDnsResolver, consulService, v1.UpstreamList{upstream}, previousResolutions)
endpoints, err := buildEndpoints(context.TODO(), writeNamespace, mockDnsResolverWithFallback, consulService, v1.UpstreamList{upstream})
Expect(err).To(BeNil())
Expect(endpoints).To(HaveLen(1))
Expect(endpoints[0]).To(matchers.BeEquivalentToDiff(&v1.Endpoint{
Expand All @@ -750,7 +747,7 @@ var _ = Describe("Consul EDS", func() {
}).Return(nil, failErr).Times(1)

// Following call should also be successfull despite the error
endpoints, err = buildEndpoints(context.TODO(), writeNamespace, mockDnsResolver, consulService, v1.UpstreamList{upstream}, previousResolutions)
endpoints, err = buildEndpoints(context.TODO(), writeNamespace, mockDnsResolverWithFallback, consulService, v1.UpstreamList{upstream})
Expect(err).To(BeNil())
Expect(endpoints).To(HaveLen(1))
Expect(endpoints[0]).To(matchers.BeEquivalentToDiff(&v1.Endpoint{
Expand Down
4 changes: 1 addition & 3 deletions projects/gloo/pkg/plugins/consul/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,16 +46,14 @@ type plugin struct {
dnsPollingInterval time.Duration
consulUpstreamDiscoverySettings *v1.Settings_ConsulUpstreamDiscoveryConfiguration
settings *v1.Settings
previousDnsResolutions map[string][]string
}

func NewPlugin(client consul.ConsulWatcher, resolver DnsResolver, dnsPollingInterval *time.Duration) *plugin {
pollingInterval := DefaultDnsPollingInterval
if dnsPollingInterval != nil {
pollingInterval = *dnsPollingInterval
}
previousDnsResolutions := make(map[string][]string)
return &plugin{client: client, resolver: resolver, dnsPollingInterval: pollingInterval, previousDnsResolutions: previousDnsResolutions}
return &plugin{client: client, resolver: resolver, dnsPollingInterval: pollingInterval}
}

func (p *plugin) Name() string {
Expand Down
2 changes: 1 addition & 1 deletion projects/gloo/pkg/plugins/registry/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ func Plugins(opts bootstrap.Opts) []plugins.Plugin {
glooPlugins = append(glooPlugins, kubernetes.NewPlugin(opts.KubeClient, opts.KubeCoreCache))
}
if opts.Consul.ConsulWatcher != nil {
glooPlugins = append(glooPlugins, consul.NewPlugin(opts.Consul.ConsulWatcher, &consul.ConsulDnsResolver{DnsAddress: opts.Consul.DnsServer}, opts.Consul.DnsPollingInterval))
glooPlugins = append(glooPlugins, consul.NewPlugin(opts.Consul.ConsulWatcher, consul.NewConsulDnsResolver(opts.Consul.DnsServer), opts.Consul.DnsPollingInterval))
}

return glooPlugins
Expand Down

0 comments on commit 4f133dd

Please sign in to comment.