Skip to content

Commit

Permalink
Merge pull request #102468 from cockroachdb/blathers/backport-release…
Browse files Browse the repository at this point in the history
…-23.1-101855

release-23.1: rpc: attempt SRV query on every dial
  • Loading branch information
Ye Ji authored Apr 28, 2023
2 parents c641ece + 4d2144a commit 2991259
Show file tree
Hide file tree
Showing 7 changed files with 87 additions and 38 deletions.
1 change: 1 addition & 0 deletions pkg/rpc/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ go_library(
"//pkg/util/log/severity",
"//pkg/util/metric",
"//pkg/util/netutil",
"//pkg/util/randutil",
"//pkg/util/stop",
"//pkg/util/syncutil",
"//pkg/util/timeutil",
Expand Down
30 changes: 30 additions & 0 deletions pkg/rpc/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/netutil"
"github.com/cockroachdb/cockroach/pkg/util/randutil"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
Expand Down Expand Up @@ -542,6 +543,10 @@ type ContextOptions struct {
// node-to-node connections and prevents one-way partitions from occurring by
// turing them into two-way partitions.
NeedsDialback bool

// PreferSRVLookup indicates whether SRV records are preferred over A/AAAA
// records when dialing network targets.
PreferSRVLookup bool
}

func (c ContextOptions) validate() error {
Expand Down Expand Up @@ -1829,6 +1834,12 @@ func (rpcCtx *Context) dialOptsNetwork(
// which is only definitely provided during dial.
dialer := onlyOnceDialer{}
dialerFunc := dialer.dial
if rpcCtx.ContextOptions.PreferSRVLookup {
dialer := &srvResolvingDialer{
dialerFunc: dialerFunc,
}
dialerFunc = dialer.dial
}
if rpcCtx.Knobs.InjectedLatencyOracle != nil {
latency := rpcCtx.Knobs.InjectedLatencyOracle.GetLatency(target)
log.VEventf(ctx, 1, "connecting with simulated latency %dms",
Expand Down Expand Up @@ -2008,6 +2019,25 @@ func (ald *artificialLatencyDialer) dial(ctx context.Context, addr string) (net.
}, nil
}

// srvResolvingDialer first queries SRV records for addr, and dials
// a random member of the result.
// If the result is empty, it dials the addr directly.
type srvResolvingDialer struct {
dialerFunc dialerFunc
}

func (srd *srvResolvingDialer) dial(ctx context.Context, addr string) (net.Conn, error) {
addrs, err := netutil.SRV(ctx, addr)
if err != nil {
return nil, err
}
if len(addrs) == 0 {
// If SRV lookup returns empty, we fallback to addr.
addrs = []string{addr}
}
return srd.dialerFunc(ctx, addrs[int(randutil.FastUint32())%len(addrs)])
}

type delayingListener struct {
net.Listener
enabled func() bool
Expand Down
45 changes: 45 additions & 0 deletions pkg/rpc/context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2746,5 +2746,50 @@ func TestHeartbeatDialback(t *testing.T) {
}
}

// TestSRVResolvingDialer tests srvResolvingDialer dials the correct target if SRV query
// is successful, and dials the input target directly if SRV query returns empty records.
func TestSRVResolvingDialer(t *testing.T) {
defer leaktest.AfterTest(t)()
ctx := context.Background()

t.Run("success SRV lookup", func(t *testing.T) {
expected := &net.SRV{
Target: "test",
Port: 123,
}
defer netutil.TestingOverrideSRVLookupFn(func(service, proto, name string) (string, []*net.SRV, error) {
return "", []*net.SRV{expected}, nil
})()

dialed := false
dial := func(ctx context.Context, addr string) (net.Conn, error) {
dialed = true
require.Equal(t, fmt.Sprintf("%s:%d", expected.Target, expected.Port), addr)
return nil, nil
}
dialer := &srvResolvingDialer{dialerFunc: dial}
_, err := dialer.dial(ctx, "srvquery")
require.NoError(t, err)
require.True(t, dialed)
})

t.Run("empty SRV lookup", func(t *testing.T) {
defer netutil.TestingOverrideSRVLookupFn(func(service, proto, name string) (string, []*net.SRV, error) {
return "", []*net.SRV{}, nil
})()
expected := "test-expected"
dialed := false
dial := func(ctx context.Context, addr string) (net.Conn, error) {
dialed = true
require.Equal(t, expected, addr)
return nil, nil
}
dialer := &srvResolvingDialer{dialerFunc: dial}
_, err := dialer.dial(ctx, expected)
require.NoError(t, err)
require.True(t, dialed)
})
}

// TODO(baptist): Add a test using TestCluster to verify this works in a full
// integration test.
1 change: 0 additions & 1 deletion pkg/server/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -530,7 +530,6 @@ go_test(
"//pkg/util/log",
"//pkg/util/log/logpb",
"//pkg/util/metric",
"//pkg/util/netutil",
"//pkg/util/netutil/addr",
"//pkg/util/protoutil",
"//pkg/util/randident",
Expand Down
25 changes: 3 additions & 22 deletions pkg/server/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/envutil"
"github.com/cockroachdb/cockroach/pkg/util/humanizeutil"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/netutil"
"github.com/cockroachdb/cockroach/pkg/util/retry"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
Expand Down Expand Up @@ -955,27 +954,9 @@ func (cfg *Config) parseGossipBootstrapAddresses(
}

if cfg.JoinPreferSRVRecords {
// The following code substitutes the entry in --join by the
// result of SRV resolution, if suitable SRV records are found
// for that name.
//
// TODO(knz): Delay this lookup. The logic for "regular" addresses
// is delayed until the point the connection is attempted, so that
// fresh DNS records are used for a new connection. This makes
// it possible to update DNS records without restarting the node.
// The SRV logic here does not have this property (yet).
srvAddrs, err := netutil.SRV(ctx, address)
if err != nil {
return nil, err
}

if len(srvAddrs) > 0 {
for _, sa := range srvAddrs {
bootstrapAddresses = append(bootstrapAddresses,
util.MakeUnresolvedAddrWithDefaults("tcp", sa, base.DefaultPort))
}
continue
}
// We will use the port in the SRV records.
bootstrapAddresses = append(bootstrapAddresses, util.MakeUnresolvedAddr("tcp", address))
continue
}

// Otherwise, use the address.
Expand Down
22 changes: 7 additions & 15 deletions pkg/server/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ package server

import (
"context"
"net"
"os"
"reflect"
"testing"
Expand All @@ -24,7 +23,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/envutil"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/netutil"
"github.com/cockroachdb/cockroach/pkg/util/netutil/addr"
"github.com/kr/pretty"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -240,11 +238,8 @@ func TestParseBootstrapResolvers(t *testing.T) {

t.Run("srv", func(t *testing.T) {
cfg.JoinPreferSRVRecords = true
cfg.JoinList = append(base.JoinListType(nil), "othername")

defer netutil.TestingOverrideSRVLookupFn(func(service, proto, name string) (string, []*net.SRV, error) {
return "cluster", []*net.SRV{{Target: expectedName, Port: 111}}, nil
})()
const srvName = "srv-name"
cfg.JoinList = append(base.JoinListType(nil), srvName)

addresses, err := cfg.parseGossipBootstrapAddresses(context.Background())
if err != nil {
Expand All @@ -253,18 +248,15 @@ func TestParseBootstrapResolvers(t *testing.T) {
if len(addresses) != 1 {
t.Fatalf("expected 1 address, got %# v", pretty.Formatter(addresses))
}
host, port, err := addr.SplitHostPort(addresses[0].String(), "UNKNOWN")
host, port, err := addr.SplitHostPort(addresses[0].String(), "defaultPort")
if err != nil {
t.Fatal(err)
}
if port == "UNKNOWN" {
t.Fatalf("expected port defined in resover: %# v", pretty.Formatter(addresses))
if port != "defaultPort" {
t.Fatalf("unexpected port defined in resover: %# v", pretty.Formatter(addresses))
}
if port != "111" {
t.Fatalf("expected port 111 from SRV, got %q", port)
}
if host != expectedName {
t.Errorf("expected name %q, got %q", expectedName, host)
if host != srvName {
t.Errorf("expected host %q, got %q", srvName, host)
}
})
}
1 change: 1 addition & 0 deletions pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,7 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) {
},
TenantRPCAuthorizer: authorizer,
NeedsDialback: true,
PreferSRVLookup: cfg.JoinPreferSRVRecords,
}
if knobs := cfg.TestingKnobs.Server; knobs != nil {
serverKnobs := knobs.(*TestingKnobs)
Expand Down

0 comments on commit 2991259

Please sign in to comment.