Skip to content

Commit

Permalink
sql: use locality information when initializing span resolver oracles
Browse files Browse the repository at this point in the history
Previously, we used a node descriptor for locality information about
the current node when doing dist SQL planning. This only worked for
the system tenant as secondary tenant's used a fake node descriptor.
Given the changes to `OptimizeReplicaOrder` in the prior commit, this
patch switches to threading in the tenant pod's locality to allow
secondary tenant's to make use of this information in dist sql
planning.

Release note: None
  • Loading branch information
arulajmani committed Aug 14, 2022
1 parent befd340 commit 0e08303
Show file tree
Hide file tree
Showing 10 changed files with 112 additions and 75 deletions.
11 changes: 1 addition & 10 deletions pkg/server/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -413,9 +413,7 @@ func (n *Node) AnnotateCtxWithSpan(

// start starts the node by registering the storage instance for the RPC
// service "Node" and initializing stores for each specified engine.
// Launches periodic store gossiping in a goroutine. A callback can
// be optionally provided that will be invoked once this node's
// NodeDescriptor is available, to help bootstrapping.
// Launches periodic store gossiping in a goroutine.
//
// addr, sqlAddr, and httpAddr are used to populate the Address,
// SQLAddress, and HTTPAddress fields respectively of the
Expand All @@ -432,7 +430,6 @@ func (n *Node) start(
attrs roachpb.Attributes,
locality roachpb.Locality,
localityAddress []roachpb.LocalityAddress,
nodeDescriptorCallback func(descriptor roachpb.NodeDescriptor),
) error {
n.initialStart = initialStart
n.startedAt = n.storeCfg.Clock.Now().WallTime
Expand All @@ -449,12 +446,6 @@ func (n *Node) start(
StartedAt: n.startedAt,
HTTPAddress: util.MakeUnresolvedAddr(httpAddr.Network(), httpAddr.String()),
}
// Invoke any passed in nodeDescriptorCallback as soon as it's available, to
// ensure that other components (currently the DistSQLPlanner) are initialized
// before store startup continues.
if nodeDescriptorCallback != nil {
nodeDescriptorCallback(n.Descriptor)
}

// Gossip the node descriptor to make this node addressable by node ID.
n.storeCfg.Gossip.NodeID.Set(ctx, n.Descriptor.NodeID)
Expand Down
6 changes: 5 additions & 1 deletion pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1302,6 +1302,11 @@ func (s *Server) PreStart(ctx context.Context) error {
s.rpcContext.StorageClusterID.Set(ctx, state.clusterID)
s.rpcContext.NodeID.Set(ctx, state.nodeID)

// Ensure components in the DistSQLPlanner that rely on the node ID are
// initialized before store startup continues.
s.sqlServer.execCfg.DistSQLPlanner.SetGatewaySQLInstanceID(base.SQLInstanceID(state.nodeID))
s.sqlServer.execCfg.DistSQLPlanner.ConstructAndSetSpanResolver(ctx, state.nodeID, s.cfg.Locality)

// TODO(irfansharif): Now that we have our node ID, we should run another
// check here to make sure we've not been decommissioned away (if we're here
// following a server restart). See the discussions in #48843 for how that
Expand Down Expand Up @@ -1393,7 +1398,6 @@ func (s *Server) PreStart(ctx context.Context) error {
s.cfg.NodeAttributes,
s.cfg.Locality,
s.cfg.LocalityAddresses,
s.sqlServer.execCfg.DistSQLPlanner.SetSQLInstanceInfo,
); err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/server/server_sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -1212,8 +1212,8 @@ func (s *SQLServer) setInstanceID(ctx context.Context) error {
if err != nil {
return err
}
s.execCfg.DistSQLPlanner.SetGatewaySQLInstanceID(instanceID)
s.sqlLivenessSessionID = sessionID
s.execCfg.DistSQLPlanner.SetSQLInstanceInfo(roachpb.NodeDescriptor{NodeID: roachpb.NodeID(instanceID)})
return nil
}

Expand Down
4 changes: 1 addition & 3 deletions pkg/server/tenant.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,9 +244,7 @@ func startTenantInternal(

tenantAdminServer := newTenantAdminServer(baseCfg.AmbientCtx, s, tenantStatusServer, drainServer)

// TODO(asubiotto): remove this. Right now it is needed to initialize the
// SpanResolver.
s.execCfg.DistSQLPlanner.SetSQLInstanceInfo(roachpb.NodeDescriptor{NodeID: 0})
s.execCfg.DistSQLPlanner.ConstructAndSetSpanResolver(ctx, 0 /* NodeID */, s.execCfg.Locality)

authServer = newAuthenticationServer(baseCfg.Config, s)

Expand Down
23 changes: 15 additions & 8 deletions pkg/sql/distsql_physical_planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,15 +223,22 @@ func (dsp *DistSQLPlanner) GetSQLInstanceInfo(
return dsp.nodeDescs.GetNodeDescriptor(roachpb.NodeID(sqlInstanceID))
}

// SetSQLInstanceInfo sets the planner's node descriptor.
// The first call to SetSQLInstanceInfo leads to the construction of the SpanResolver.
func (dsp *DistSQLPlanner) SetSQLInstanceInfo(desc roachpb.NodeDescriptor) {
dsp.gatewaySQLInstanceID = base.SQLInstanceID(desc.NodeID)
if dsp.spanResolver == nil {
sr := physicalplan.NewSpanResolver(dsp.st, dsp.distSender, dsp.nodeDescs, desc,
dsp.clock, dsp.rpcCtx, ReplicaOraclePolicy)
dsp.SetSpanResolver(sr)
// ConstructAndSetSpanResolver constructs and sets the planner's
// SpanResolver if it is unset. It's a no-op otherwise.
func (dsp *DistSQLPlanner) ConstructAndSetSpanResolver(
ctx context.Context, nodeID roachpb.NodeID, locality roachpb.Locality,
) {
if dsp.spanResolver != nil {
log.Fatal(ctx, "trying to construct and set span resolver when one already exists")
}
sr := physicalplan.NewSpanResolver(dsp.st, dsp.distSender, dsp.nodeDescs, nodeID, locality,
dsp.clock, dsp.rpcCtx, ReplicaOraclePolicy)
dsp.SetSpanResolver(sr)
}

// SetGatewaySQLInstanceID sets the planner's SQL instance ID.
func (dsp *DistSQLPlanner) SetGatewaySQLInstanceID(id base.SQLInstanceID) {
dsp.gatewaySQLInstanceID = id
}

// GatewayID returns the ID of the gateway.
Expand Down
2 changes: 2 additions & 0 deletions pkg/sql/physicalplan/replicaoracle/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,13 @@ go_test(
"//pkg/gossip",
"//pkg/roachpb",
"//pkg/rpc",
"//pkg/testutils",
"//pkg/util",
"//pkg/util/hlc",
"//pkg/util/leaktest",
"//pkg/util/metric",
"//pkg/util/stop",
"@com_github_stretchr_testify//require",
],
)

Expand Down
35 changes: 24 additions & 11 deletions pkg/sql/physicalplan/replicaoracle/oracle.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ var (
// Config is used to construct an OracleFactory.
type Config struct {
NodeDescs kvcoord.NodeDescStore
NodeDesc roachpb.NodeDescriptor // current node
NodeID roachpb.NodeID // current node's ID. 0 for secondary tenants.
Locality roachpb.Locality // current node's locality.
Settings *cluster.Settings
Clock *hlc.Clock
RPCContext *rpc.Context
Expand Down Expand Up @@ -147,16 +148,22 @@ func (o *randomOracle) ChoosePreferredReplica(

type closestOracle struct {
nodeDescs kvcoord.NodeDescStore
// nodeDesc is the descriptor of the current node. It will be used to give
// preference to the current node and others "close" to it.
nodeDesc roachpb.NodeDescriptor
// nodeID and locality of the current node. Used to give preference to the
// current node and others "close" to it.
//
// NodeID may be 0 in which case the current node will not be given any
// preference. NodeID being 0 indicates that no KV instance is available
// inside the same process.
nodeID roachpb.NodeID
locality roachpb.Locality
latencyFunc kvcoord.LatencyFunc
}

func newClosestOracle(cfg Config) Oracle {
return &closestOracle{
nodeDescs: cfg.NodeDescs,
nodeDesc: cfg.NodeDesc,
nodeID: cfg.NodeID,
locality: cfg.Locality,
latencyFunc: latencyFunc(cfg.RPCContext),
}
}
Expand All @@ -175,7 +182,7 @@ func (o *closestOracle) ChoosePreferredReplica(
if err != nil {
return roachpb.ReplicaDescriptor{}, err
}
replicas.OptimizeReplicaOrder(o.nodeDesc.NodeID, o.latencyFunc, o.nodeDesc.Locality)
replicas.OptimizeReplicaOrder(o.nodeID, o.latencyFunc, o.locality)
return replicas[0].ReplicaDescriptor, nil
}

Expand All @@ -197,17 +204,23 @@ const maxPreferredRangesPerLeaseHolder = 10
type binPackingOracle struct {
maxPreferredRangesPerLeaseHolder int
nodeDescs kvcoord.NodeDescStore
// nodeDesc is the descriptor of the current node. It will be used to give
// preference to the current node and others "close" to it.
nodeDesc roachpb.NodeDescriptor
// nodeID and locality of the current node. Used to give preference to the
// current node and others "close" to it.
//
// NodeID may be 0 in which case the current node will not be given any
// preference. NodeID being 0 indicates that no KV instance is available
// inside the same process.
nodeID roachpb.NodeID
locality roachpb.Locality
latencyFunc kvcoord.LatencyFunc
}

func newBinPackingOracle(cfg Config) Oracle {
return &binPackingOracle{
maxPreferredRangesPerLeaseHolder: maxPreferredRangesPerLeaseHolder,
nodeDescs: cfg.NodeDescs,
nodeDesc: cfg.NodeDesc,
nodeID: cfg.NodeID,
locality: cfg.Locality,
latencyFunc: latencyFunc(cfg.RPCContext),
}
}
Expand All @@ -229,7 +242,7 @@ func (o *binPackingOracle) ChoosePreferredReplica(
if err != nil {
return roachpb.ReplicaDescriptor{}, err
}
replicas.OptimizeReplicaOrder(o.nodeDesc.NodeID, o.latencyFunc, o.nodeDesc.Locality)
replicas.OptimizeReplicaOrder(o.nodeID, o.latencyFunc, o.locality)

// Look for a replica that has been assigned some ranges, but it's not yet full.
minLoad := int(math.MaxInt32)
Expand Down
85 changes: 52 additions & 33 deletions pkg/sql/physicalplan/replicaoracle/oracle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ package replicaoracle
import (
"context"
"fmt"
"math/rand"
"strings"
"testing"
"time"
Expand All @@ -21,11 +22,13 @@ import (
"github.com/cockroachdb/cockroach/pkg/gossip"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/rpc"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/metric"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/stretchr/testify/require"
)

// TestRandomOracle defeats TestUnused for RandomChoice.
Expand All @@ -35,41 +38,49 @@ func TestRandomOracle(t *testing.T) {

func TestClosest(t *testing.T) {
defer leaktest.AfterTest(t)()
ctx := context.Background()
stopper := stop.NewStopper()
defer stopper.Stop(ctx)
g, _ := makeGossip(t, stopper)
nd, _ := g.GetNodeDescriptor(1)
o := NewOracle(ClosestChoice, Config{
NodeDescs: g,
NodeDesc: *nd,
})
o.(*closestOracle).latencyFunc = func(s string) (time.Duration, bool) {
if strings.HasSuffix(s, "2") {
return time.Nanosecond, true
testutils.RunTrueAndFalse(t, "valid-latency-func", func(t *testing.T, validLatencyFunc bool) {
ctx := context.Background()
stopper := stop.NewStopper()
defer stopper.Stop(ctx)
g, _ := makeGossip(t, stopper)
nd2, err := g.GetNodeDescriptor(2)
require.NoError(t, err)
o := NewOracle(ClosestChoice, Config{
NodeDescs: g,
NodeID: 1,
Locality: nd2.Locality, // pretend node 2 is closest.
})
o.(*closestOracle).latencyFunc = func(s string) (time.Duration, bool) {
if strings.HasSuffix(s, "2") {
return time.Nanosecond, validLatencyFunc
}
return time.Millisecond, validLatencyFunc
}
return time.Millisecond, true
}
info, err := o.ChoosePreferredReplica(
ctx,
nil, /* txn */
&roachpb.RangeDescriptor{
InternalReplicas: []roachpb.ReplicaDescriptor{
{NodeID: 4, StoreID: 4},
{NodeID: 2, StoreID: 2},
{NodeID: 3, StoreID: 3},
internalReplicas := []roachpb.ReplicaDescriptor{
{NodeID: 4, StoreID: 4},
{NodeID: 2, StoreID: 2},
{NodeID: 3, StoreID: 3},
}
rand.Shuffle(len(internalReplicas), func(i, j int) {
internalReplicas[i], internalReplicas[j] = internalReplicas[j], internalReplicas[i]
})
info, err := o.ChoosePreferredReplica(
ctx,
nil, /* txn */
&roachpb.RangeDescriptor{
InternalReplicas: internalReplicas,
},
},
nil, /* leaseHolder */
roachpb.LAG_BY_CLUSTER_SETTING,
QueryState{},
)
if err != nil {
t.Fatalf("Failed to choose closest replica: %v", err)
}
if info.NodeID != 2 {
t.Fatalf("Failed to choose node 2, got %v", info.NodeID)
}
nil, /* leaseHolder */
roachpb.LAG_BY_CLUSTER_SETTING,
QueryState{},
)
if err != nil {
t.Fatalf("Failed to choose closest replica: %v", err)
}
if info.NodeID != 2 {
t.Fatalf("Failed to choose node 2, got %v", info.NodeID)
}
})
}

func makeGossip(t *testing.T, stopper *stop.Stopper) (*gossip.Gossip, *hlc.Clock) {
Expand Down Expand Up @@ -99,5 +110,13 @@ func newNodeDesc(nodeID roachpb.NodeID) *roachpb.NodeDescriptor {
return &roachpb.NodeDescriptor{
NodeID: nodeID,
Address: util.MakeUnresolvedAddr("tcp", fmt.Sprintf("invalid.invalid:%d", nodeID)),
Locality: roachpb.Locality{
Tiers: []roachpb.Tier{
{
Key: "region",
Value: fmt.Sprintf("region_%d", nodeID),
},
},
},
}
}
10 changes: 5 additions & 5 deletions pkg/sql/physicalplan/span_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,6 @@ type SpanResolverIterator interface {
type spanResolver struct {
st *cluster.Settings
distSender *kvcoord.DistSender
nodeDesc roachpb.NodeDescriptor
oracle replicaoracle.Oracle
}

Expand All @@ -129,17 +128,18 @@ func NewSpanResolver(
st *cluster.Settings,
distSender *kvcoord.DistSender,
nodeDescs kvcoord.NodeDescStore,
nodeDesc roachpb.NodeDescriptor,
nodeID roachpb.NodeID,
locality roachpb.Locality,
clock *hlc.Clock,
rpcCtx *rpc.Context,
policy replicaoracle.Policy,
) SpanResolver {
return &spanResolver{
st: st,
nodeDesc: nodeDesc,
st: st,
oracle: replicaoracle.NewOracle(policy, replicaoracle.Config{
NodeDescs: nodeDescs,
NodeDesc: nodeDesc,
NodeID: nodeID,
Locality: locality,
Settings: st,
Clock: clock,
RPCContext: rpcCtx,
Expand Down
9 changes: 6 additions & 3 deletions pkg/sql/physicalplan/span_resolver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,8 @@ func TestSpanResolverUsesCaches(t *testing.T) {
s3.Cfg.Settings,
s3.DistSenderI().(*kvcoord.DistSender),
s3.Gossip(),
s3.GetNode().Descriptor,
s3.GetNode().Descriptor.NodeID,
s3.GetNode().Descriptor.Locality,
s3.Clock(),
nil, // rpcCtx
replicaoracle.BinPackingChoice)
Expand Down Expand Up @@ -201,7 +202,8 @@ func TestSpanResolver(t *testing.T) {
s.(*server.TestServer).Cfg.Settings,
s.DistSenderI().(*kvcoord.DistSender),
s.GossipI().(*gossip.Gossip),
s.(*server.TestServer).GetNode().Descriptor,
s.(*server.TestServer).GetNode().Descriptor.NodeID,
s.(*server.TestServer).GetNode().Descriptor.Locality,
s.Clock(),
nil, // rpcCtx
replicaoracle.BinPackingChoice)
Expand Down Expand Up @@ -299,7 +301,8 @@ func TestMixedDirections(t *testing.T) {
s.(*server.TestServer).Cfg.Settings,
s.DistSenderI().(*kvcoord.DistSender),
s.GossipI().(*gossip.Gossip),
s.(*server.TestServer).GetNode().Descriptor,
s.(*server.TestServer).GetNode().Descriptor.NodeID,
s.(*server.TestServer).GetNode().Descriptor.Locality,
s.Clock(),
nil, // rpcCtx
replicaoracle.BinPackingChoice)
Expand Down

0 comments on commit 0e08303

Please sign in to comment.