From 0e08303fb15ab6c3efd0711c5daba1b725ecfd57 Mon Sep 17 00:00:00 2001 From: Arul Ajmani Date: Wed, 10 Aug 2022 13:19:11 -0400 Subject: [PATCH] sql: use locality information when initializing span resolver oracles Previously, we used a node descriptor for locality information about the current node when doing dist SQL planning. This only worked for the system tenant as secondary tenant's used a fake node descriptor. Given the changes to `OptimizeReplicaOrder` in the prior commit, this patch switches to threading in the tenant pod's locality to allow secondary tenant's to make use of this information in dist sql planning. Release note: None --- pkg/server/node.go | 11 +-- pkg/server/server.go | 6 +- pkg/server/server_sql.go | 2 +- pkg/server/tenant.go | 4 +- pkg/sql/distsql_physical_planner.go | 23 +++-- .../physicalplan/replicaoracle/BUILD.bazel | 2 + pkg/sql/physicalplan/replicaoracle/oracle.go | 35 +++++--- .../physicalplan/replicaoracle/oracle_test.go | 85 ++++++++++++------- pkg/sql/physicalplan/span_resolver.go | 10 +-- pkg/sql/physicalplan/span_resolver_test.go | 9 +- 10 files changed, 112 insertions(+), 75 deletions(-) diff --git a/pkg/server/node.go b/pkg/server/node.go index 21c5925b0a27..05542ada7708 100644 --- a/pkg/server/node.go +++ b/pkg/server/node.go @@ -413,9 +413,7 @@ func (n *Node) AnnotateCtxWithSpan( // start starts the node by registering the storage instance for the RPC // service "Node" and initializing stores for each specified engine. -// Launches periodic store gossiping in a goroutine. A callback can -// be optionally provided that will be invoked once this node's -// NodeDescriptor is available, to help bootstrapping. +// Launches periodic store gossiping in a goroutine. // // addr, sqlAddr, and httpAddr are used to populate the Address, // SQLAddress, and HTTPAddress fields respectively of the @@ -432,7 +430,6 @@ func (n *Node) start( attrs roachpb.Attributes, locality roachpb.Locality, localityAddress []roachpb.LocalityAddress, - nodeDescriptorCallback func(descriptor roachpb.NodeDescriptor), ) error { n.initialStart = initialStart n.startedAt = n.storeCfg.Clock.Now().WallTime @@ -449,12 +446,6 @@ func (n *Node) start( StartedAt: n.startedAt, HTTPAddress: util.MakeUnresolvedAddr(httpAddr.Network(), httpAddr.String()), } - // Invoke any passed in nodeDescriptorCallback as soon as it's available, to - // ensure that other components (currently the DistSQLPlanner) are initialized - // before store startup continues. - if nodeDescriptorCallback != nil { - nodeDescriptorCallback(n.Descriptor) - } // Gossip the node descriptor to make this node addressable by node ID. n.storeCfg.Gossip.NodeID.Set(ctx, n.Descriptor.NodeID) diff --git a/pkg/server/server.go b/pkg/server/server.go index dcf6a0a7ed10..ebdecf98ca4e 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -1302,6 +1302,11 @@ func (s *Server) PreStart(ctx context.Context) error { s.rpcContext.StorageClusterID.Set(ctx, state.clusterID) s.rpcContext.NodeID.Set(ctx, state.nodeID) + // Ensure components in the DistSQLPlanner that rely on the node ID are + // initialized before store startup continues. + s.sqlServer.execCfg.DistSQLPlanner.SetGatewaySQLInstanceID(base.SQLInstanceID(state.nodeID)) + s.sqlServer.execCfg.DistSQLPlanner.ConstructAndSetSpanResolver(ctx, state.nodeID, s.cfg.Locality) + // TODO(irfansharif): Now that we have our node ID, we should run another // check here to make sure we've not been decommissioned away (if we're here // following a server restart). See the discussions in #48843 for how that @@ -1393,7 +1398,6 @@ func (s *Server) PreStart(ctx context.Context) error { s.cfg.NodeAttributes, s.cfg.Locality, s.cfg.LocalityAddresses, - s.sqlServer.execCfg.DistSQLPlanner.SetSQLInstanceInfo, ); err != nil { return err } diff --git a/pkg/server/server_sql.go b/pkg/server/server_sql.go index 7db4d1adc5e0..33a28d4205f4 100644 --- a/pkg/server/server_sql.go +++ b/pkg/server/server_sql.go @@ -1212,8 +1212,8 @@ func (s *SQLServer) setInstanceID(ctx context.Context) error { if err != nil { return err } + s.execCfg.DistSQLPlanner.SetGatewaySQLInstanceID(instanceID) s.sqlLivenessSessionID = sessionID - s.execCfg.DistSQLPlanner.SetSQLInstanceInfo(roachpb.NodeDescriptor{NodeID: roachpb.NodeID(instanceID)}) return nil } diff --git a/pkg/server/tenant.go b/pkg/server/tenant.go index 51860f378430..8d469a86323d 100644 --- a/pkg/server/tenant.go +++ b/pkg/server/tenant.go @@ -244,9 +244,7 @@ func startTenantInternal( tenantAdminServer := newTenantAdminServer(baseCfg.AmbientCtx, s, tenantStatusServer, drainServer) - // TODO(asubiotto): remove this. Right now it is needed to initialize the - // SpanResolver. - s.execCfg.DistSQLPlanner.SetSQLInstanceInfo(roachpb.NodeDescriptor{NodeID: 0}) + s.execCfg.DistSQLPlanner.ConstructAndSetSpanResolver(ctx, 0 /* NodeID */, s.execCfg.Locality) authServer = newAuthenticationServer(baseCfg.Config, s) diff --git a/pkg/sql/distsql_physical_planner.go b/pkg/sql/distsql_physical_planner.go index 12a3cd8787b6..e6fc9287070d 100644 --- a/pkg/sql/distsql_physical_planner.go +++ b/pkg/sql/distsql_physical_planner.go @@ -223,15 +223,22 @@ func (dsp *DistSQLPlanner) GetSQLInstanceInfo( return dsp.nodeDescs.GetNodeDescriptor(roachpb.NodeID(sqlInstanceID)) } -// SetSQLInstanceInfo sets the planner's node descriptor. -// The first call to SetSQLInstanceInfo leads to the construction of the SpanResolver. -func (dsp *DistSQLPlanner) SetSQLInstanceInfo(desc roachpb.NodeDescriptor) { - dsp.gatewaySQLInstanceID = base.SQLInstanceID(desc.NodeID) - if dsp.spanResolver == nil { - sr := physicalplan.NewSpanResolver(dsp.st, dsp.distSender, dsp.nodeDescs, desc, - dsp.clock, dsp.rpcCtx, ReplicaOraclePolicy) - dsp.SetSpanResolver(sr) +// ConstructAndSetSpanResolver constructs and sets the planner's +// SpanResolver if it is unset. It's a no-op otherwise. +func (dsp *DistSQLPlanner) ConstructAndSetSpanResolver( + ctx context.Context, nodeID roachpb.NodeID, locality roachpb.Locality, +) { + if dsp.spanResolver != nil { + log.Fatal(ctx, "trying to construct and set span resolver when one already exists") } + sr := physicalplan.NewSpanResolver(dsp.st, dsp.distSender, dsp.nodeDescs, nodeID, locality, + dsp.clock, dsp.rpcCtx, ReplicaOraclePolicy) + dsp.SetSpanResolver(sr) +} + +// SetGatewaySQLInstanceID sets the planner's SQL instance ID. +func (dsp *DistSQLPlanner) SetGatewaySQLInstanceID(id base.SQLInstanceID) { + dsp.gatewaySQLInstanceID = id } // GatewayID returns the ID of the gateway. diff --git a/pkg/sql/physicalplan/replicaoracle/BUILD.bazel b/pkg/sql/physicalplan/replicaoracle/BUILD.bazel index bdbcb5cd71d3..5b3a45f4be31 100644 --- a/pkg/sql/physicalplan/replicaoracle/BUILD.bazel +++ b/pkg/sql/physicalplan/replicaoracle/BUILD.bazel @@ -29,11 +29,13 @@ go_test( "//pkg/gossip", "//pkg/roachpb", "//pkg/rpc", + "//pkg/testutils", "//pkg/util", "//pkg/util/hlc", "//pkg/util/leaktest", "//pkg/util/metric", "//pkg/util/stop", + "@com_github_stretchr_testify//require", ], ) diff --git a/pkg/sql/physicalplan/replicaoracle/oracle.go b/pkg/sql/physicalplan/replicaoracle/oracle.go index b2817c87c778..929df9b8e6c4 100644 --- a/pkg/sql/physicalplan/replicaoracle/oracle.go +++ b/pkg/sql/physicalplan/replicaoracle/oracle.go @@ -43,7 +43,8 @@ var ( // Config is used to construct an OracleFactory. type Config struct { NodeDescs kvcoord.NodeDescStore - NodeDesc roachpb.NodeDescriptor // current node + NodeID roachpb.NodeID // current node's ID. 0 for secondary tenants. + Locality roachpb.Locality // current node's locality. Settings *cluster.Settings Clock *hlc.Clock RPCContext *rpc.Context @@ -147,16 +148,22 @@ func (o *randomOracle) ChoosePreferredReplica( type closestOracle struct { nodeDescs kvcoord.NodeDescStore - // nodeDesc is the descriptor of the current node. It will be used to give - // preference to the current node and others "close" to it. - nodeDesc roachpb.NodeDescriptor + // nodeID and locality of the current node. Used to give preference to the + // current node and others "close" to it. + // + // NodeID may be 0 in which case the current node will not be given any + // preference. NodeID being 0 indicates that no KV instance is available + // inside the same process. + nodeID roachpb.NodeID + locality roachpb.Locality latencyFunc kvcoord.LatencyFunc } func newClosestOracle(cfg Config) Oracle { return &closestOracle{ nodeDescs: cfg.NodeDescs, - nodeDesc: cfg.NodeDesc, + nodeID: cfg.NodeID, + locality: cfg.Locality, latencyFunc: latencyFunc(cfg.RPCContext), } } @@ -175,7 +182,7 @@ func (o *closestOracle) ChoosePreferredReplica( if err != nil { return roachpb.ReplicaDescriptor{}, err } - replicas.OptimizeReplicaOrder(o.nodeDesc.NodeID, o.latencyFunc, o.nodeDesc.Locality) + replicas.OptimizeReplicaOrder(o.nodeID, o.latencyFunc, o.locality) return replicas[0].ReplicaDescriptor, nil } @@ -197,9 +204,14 @@ const maxPreferredRangesPerLeaseHolder = 10 type binPackingOracle struct { maxPreferredRangesPerLeaseHolder int nodeDescs kvcoord.NodeDescStore - // nodeDesc is the descriptor of the current node. It will be used to give - // preference to the current node and others "close" to it. - nodeDesc roachpb.NodeDescriptor + // nodeID and locality of the current node. Used to give preference to the + // current node and others "close" to it. + // + // NodeID may be 0 in which case the current node will not be given any + // preference. NodeID being 0 indicates that no KV instance is available + // inside the same process. + nodeID roachpb.NodeID + locality roachpb.Locality latencyFunc kvcoord.LatencyFunc } @@ -207,7 +219,8 @@ func newBinPackingOracle(cfg Config) Oracle { return &binPackingOracle{ maxPreferredRangesPerLeaseHolder: maxPreferredRangesPerLeaseHolder, nodeDescs: cfg.NodeDescs, - nodeDesc: cfg.NodeDesc, + nodeID: cfg.NodeID, + locality: cfg.Locality, latencyFunc: latencyFunc(cfg.RPCContext), } } @@ -229,7 +242,7 @@ func (o *binPackingOracle) ChoosePreferredReplica( if err != nil { return roachpb.ReplicaDescriptor{}, err } - replicas.OptimizeReplicaOrder(o.nodeDesc.NodeID, o.latencyFunc, o.nodeDesc.Locality) + replicas.OptimizeReplicaOrder(o.nodeID, o.latencyFunc, o.locality) // Look for a replica that has been assigned some ranges, but it's not yet full. minLoad := int(math.MaxInt32) diff --git a/pkg/sql/physicalplan/replicaoracle/oracle_test.go b/pkg/sql/physicalplan/replicaoracle/oracle_test.go index 17adb2ebe78b..1cce2c93e815 100644 --- a/pkg/sql/physicalplan/replicaoracle/oracle_test.go +++ b/pkg/sql/physicalplan/replicaoracle/oracle_test.go @@ -13,6 +13,7 @@ package replicaoracle import ( "context" "fmt" + "math/rand" "strings" "testing" "time" @@ -21,11 +22,13 @@ import ( "github.com/cockroachdb/cockroach/pkg/gossip" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/rpc" + "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/util" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/metric" "github.com/cockroachdb/cockroach/pkg/util/stop" + "github.com/stretchr/testify/require" ) // TestRandomOracle defeats TestUnused for RandomChoice. @@ -35,41 +38,49 @@ func TestRandomOracle(t *testing.T) { func TestClosest(t *testing.T) { defer leaktest.AfterTest(t)() - ctx := context.Background() - stopper := stop.NewStopper() - defer stopper.Stop(ctx) - g, _ := makeGossip(t, stopper) - nd, _ := g.GetNodeDescriptor(1) - o := NewOracle(ClosestChoice, Config{ - NodeDescs: g, - NodeDesc: *nd, - }) - o.(*closestOracle).latencyFunc = func(s string) (time.Duration, bool) { - if strings.HasSuffix(s, "2") { - return time.Nanosecond, true + testutils.RunTrueAndFalse(t, "valid-latency-func", func(t *testing.T, validLatencyFunc bool) { + ctx := context.Background() + stopper := stop.NewStopper() + defer stopper.Stop(ctx) + g, _ := makeGossip(t, stopper) + nd2, err := g.GetNodeDescriptor(2) + require.NoError(t, err) + o := NewOracle(ClosestChoice, Config{ + NodeDescs: g, + NodeID: 1, + Locality: nd2.Locality, // pretend node 2 is closest. + }) + o.(*closestOracle).latencyFunc = func(s string) (time.Duration, bool) { + if strings.HasSuffix(s, "2") { + return time.Nanosecond, validLatencyFunc + } + return time.Millisecond, validLatencyFunc } - return time.Millisecond, true - } - info, err := o.ChoosePreferredReplica( - ctx, - nil, /* txn */ - &roachpb.RangeDescriptor{ - InternalReplicas: []roachpb.ReplicaDescriptor{ - {NodeID: 4, StoreID: 4}, - {NodeID: 2, StoreID: 2}, - {NodeID: 3, StoreID: 3}, + internalReplicas := []roachpb.ReplicaDescriptor{ + {NodeID: 4, StoreID: 4}, + {NodeID: 2, StoreID: 2}, + {NodeID: 3, StoreID: 3}, + } + rand.Shuffle(len(internalReplicas), func(i, j int) { + internalReplicas[i], internalReplicas[j] = internalReplicas[j], internalReplicas[i] + }) + info, err := o.ChoosePreferredReplica( + ctx, + nil, /* txn */ + &roachpb.RangeDescriptor{ + InternalReplicas: internalReplicas, }, - }, - nil, /* leaseHolder */ - roachpb.LAG_BY_CLUSTER_SETTING, - QueryState{}, - ) - if err != nil { - t.Fatalf("Failed to choose closest replica: %v", err) - } - if info.NodeID != 2 { - t.Fatalf("Failed to choose node 2, got %v", info.NodeID) - } + nil, /* leaseHolder */ + roachpb.LAG_BY_CLUSTER_SETTING, + QueryState{}, + ) + if err != nil { + t.Fatalf("Failed to choose closest replica: %v", err) + } + if info.NodeID != 2 { + t.Fatalf("Failed to choose node 2, got %v", info.NodeID) + } + }) } func makeGossip(t *testing.T, stopper *stop.Stopper) (*gossip.Gossip, *hlc.Clock) { @@ -99,5 +110,13 @@ func newNodeDesc(nodeID roachpb.NodeID) *roachpb.NodeDescriptor { return &roachpb.NodeDescriptor{ NodeID: nodeID, Address: util.MakeUnresolvedAddr("tcp", fmt.Sprintf("invalid.invalid:%d", nodeID)), + Locality: roachpb.Locality{ + Tiers: []roachpb.Tier{ + { + Key: "region", + Value: fmt.Sprintf("region_%d", nodeID), + }, + }, + }, } } diff --git a/pkg/sql/physicalplan/span_resolver.go b/pkg/sql/physicalplan/span_resolver.go index 4ea55a5536b6..9c59f0f5b5b1 100644 --- a/pkg/sql/physicalplan/span_resolver.go +++ b/pkg/sql/physicalplan/span_resolver.go @@ -118,7 +118,6 @@ type SpanResolverIterator interface { type spanResolver struct { st *cluster.Settings distSender *kvcoord.DistSender - nodeDesc roachpb.NodeDescriptor oracle replicaoracle.Oracle } @@ -129,17 +128,18 @@ func NewSpanResolver( st *cluster.Settings, distSender *kvcoord.DistSender, nodeDescs kvcoord.NodeDescStore, - nodeDesc roachpb.NodeDescriptor, + nodeID roachpb.NodeID, + locality roachpb.Locality, clock *hlc.Clock, rpcCtx *rpc.Context, policy replicaoracle.Policy, ) SpanResolver { return &spanResolver{ - st: st, - nodeDesc: nodeDesc, + st: st, oracle: replicaoracle.NewOracle(policy, replicaoracle.Config{ NodeDescs: nodeDescs, - NodeDesc: nodeDesc, + NodeID: nodeID, + Locality: locality, Settings: st, Clock: clock, RPCContext: rpcCtx, diff --git a/pkg/sql/physicalplan/span_resolver_test.go b/pkg/sql/physicalplan/span_resolver_test.go index 6083d71658ae..7ab53881ea7b 100644 --- a/pkg/sql/physicalplan/span_resolver_test.go +++ b/pkg/sql/physicalplan/span_resolver_test.go @@ -90,7 +90,8 @@ func TestSpanResolverUsesCaches(t *testing.T) { s3.Cfg.Settings, s3.DistSenderI().(*kvcoord.DistSender), s3.Gossip(), - s3.GetNode().Descriptor, + s3.GetNode().Descriptor.NodeID, + s3.GetNode().Descriptor.Locality, s3.Clock(), nil, // rpcCtx replicaoracle.BinPackingChoice) @@ -201,7 +202,8 @@ func TestSpanResolver(t *testing.T) { s.(*server.TestServer).Cfg.Settings, s.DistSenderI().(*kvcoord.DistSender), s.GossipI().(*gossip.Gossip), - s.(*server.TestServer).GetNode().Descriptor, + s.(*server.TestServer).GetNode().Descriptor.NodeID, + s.(*server.TestServer).GetNode().Descriptor.Locality, s.Clock(), nil, // rpcCtx replicaoracle.BinPackingChoice) @@ -299,7 +301,8 @@ func TestMixedDirections(t *testing.T) { s.(*server.TestServer).Cfg.Settings, s.DistSenderI().(*kvcoord.DistSender), s.GossipI().(*gossip.Gossip), - s.(*server.TestServer).GetNode().Descriptor, + s.(*server.TestServer).GetNode().Descriptor.NodeID, + s.(*server.TestServer).GetNode().Descriptor.Locality, s.Clock(), nil, // rpcCtx replicaoracle.BinPackingChoice)