Skip to content

Commit

Permalink
discovery: preserve results from other resolve calls
Browse files Browse the repository at this point in the history
Properly preserve results from other resolve calls. There is an
assumption that resolve() is always called with the same addresses but
that is not true with gRPC and `--endpoint-group`. Without this fix,
multiple resolves could happen at the same time but some of the callers
will not be able to retrieve the results leading to random errors.

Signed-off-by: Giedrius Statkevičius <giedrius.statkevicius@vinted.com>
  • Loading branch information
GiedriusS committed Nov 5, 2024
1 parent b0553ac commit 91a24e2
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 15 deletions.
1 change: 1 addition & 0 deletions cmd/thanos/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -502,6 +502,7 @@ func runQuery(
dns.ResolverType(dnsSDResolver),
),
dnsSDInterval,
logger,
)

dnsEndpointProvider := dns.NewProvider(
Expand Down
42 changes: 28 additions & 14 deletions pkg/discovery/dns/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"sync"
"time"

"github.com/go-kit/log"
grpcresolver "google.golang.org/grpc/resolver"
)

Expand All @@ -19,12 +20,14 @@ var (
type builder struct {
resolveInterval time.Duration
provider *Provider
logger log.Logger
}

func RegisterGRPCResolver(provider *Provider, interval time.Duration) {
func RegisterGRPCResolver(provider *Provider, interval time.Duration, logger log.Logger) {
grpcresolver.Register(&builder{
resolveInterval: interval,
provider: provider,
logger: logger,
})
}

Expand All @@ -39,6 +42,7 @@ func (b *builder) Build(t grpcresolver.Target, cc grpcresolver.ClientConn, _ grp
cancel: cancel,
cc: cc,
interval: b.resolveInterval,
logger: b.logger,
}
r.wg.Add(1)
go r.run()
Expand All @@ -55,7 +59,8 @@ type resolver struct {
cc grpcresolver.ClientConn
interval time.Duration

wg sync.WaitGroup
wg sync.WaitGroup
logger log.Logger
}

func (r *resolver) Close() {
Expand All @@ -78,19 +83,28 @@ func (r *resolver) addresses() []string {
func (r *resolver) run() {
defer r.wg.Done()
for {
if err := r.resolve(); err != nil {
r.cc.ReportError(err)
} else {
state := grpcresolver.State{}
for _, addr := range r.addresses() {
raddr := grpcresolver.Address{Addr: addr}
state.Addresses = append(state.Addresses, raddr)
func() {
if err := r.resolve(); err != nil {
r.cc.ReportError(err)
r.logger.Log("msg", "failed to resolve", "err", err)
} else {
state := grpcresolver.State{}
addrs := r.addresses()
if len(addrs) == 0 {
r.logger.Log("msg", "no addresses resolved", "target", r.target)
return
}
for _, addr := range addrs {
raddr := grpcresolver.Address{Addr: addr}
state.Addresses = append(state.Addresses, raddr)
}
err = r.cc.UpdateState(state)
if err != nil {
r.logger.Log("msg", "failed to update state", "err", err)
return
}
}
err = r.cc.UpdateState(state)
if err != nil {
continue
}
}
}()
select {
case <-r.ctx.Done():
return
Expand Down
8 changes: 7 additions & 1 deletion pkg/discovery/dns/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package dns

import (
"context"
"maps"
"net"
"strings"
"sync"
Expand Down Expand Up @@ -111,7 +112,12 @@ func GetQTypeName(addr string) (qtype, name string) {
// Addresses prefixed with `dns+` or `dnssrv+` will be resolved through respective DNS lookup (A/AAAA or SRV).
// For non-SRV records, it will return an error if a port is not supplied.
func (p *Provider) Resolve(ctx context.Context, addrs []string) error {
resolvedAddrs := map[string][]string{}
// NOTE(GiedriusS): we could call this with different addrs (e.g. from gRPC) so we need
// to save the previous results.
p.RLock()
resolvedAddrs := maps.Clone(p.resolved)
p.RUnlock()

errs := errutil.MultiError{}

for _, addr := range addrs {
Expand Down

0 comments on commit 91a24e2

Please sign in to comment.