diff --git a/go/vt/discovery/fake_healthcheck.go b/go/vt/discovery/fake_healthcheck.go index acff538b78d..d3e9adfeded 100644 --- a/go/vt/discovery/fake_healthcheck.go +++ b/go/vt/discovery/fake_healthcheck.go @@ -365,6 +365,18 @@ func (fhc *FakeHealthCheck) GetAllTablets() map[string]*topodatapb.Tablet { return res } +// BroadcastAll broadcasts all the tablets' healthchecks +func (fhc *FakeHealthCheck) BroadcastAll() { + if fhc.ch == nil { + return + } + fhc.mu.Lock() + defer fhc.mu.Unlock() + for _, item := range fhc.items { + fhc.ch <- simpleCopy(item.ts) + } +} + func simpleCopy(th *TabletHealth) *TabletHealth { return &TabletHealth{ Conn: th.Conn, diff --git a/go/vt/discovery/keyspace_events.go b/go/vt/discovery/keyspace_events.go index aeaa0ff91de..89a0fb23eac 100644 --- a/go/vt/discovery/keyspace_events.go +++ b/go/vt/discovery/keyspace_events.go @@ -379,6 +379,18 @@ func (kss *keyspaceState) onSrvKeyspace(newKeyspace *topodatapb.SrvKeyspace, new return true } +// isServing returns whether a keyspace has at least one serving shard or not. +func (kss *keyspaceState) isServing() bool { + kss.mu.Lock() + defer kss.mu.Unlock() + for _, state := range kss.shards { + if state.serving { + return true + } + } + return false +} + // newKeyspaceState allocates the internal state required to keep track of availability incidents // in this keyspace, and starts up a SrvKeyspace watcher on our topology server which will update // our keyspaceState with any topology changes in real time. @@ -471,3 +483,17 @@ func (kew *KeyspaceEventWatcher) PrimaryIsNotServing(target *query.Target) (*top } return nil, false } + +// GetServingKeyspaces gets the serving keyspaces from the keyspace event watcher. +func (kew *KeyspaceEventWatcher) GetServingKeyspaces() []string { + kew.mu.Lock() + defer kew.mu.Unlock() + + var servingKeyspaces []string + for ksName, state := range kew.keyspaces { + if state.isServing() { + servingKeyspaces = append(servingKeyspaces, ksName) + } + } + return servingKeyspaces +} diff --git a/go/vt/srvtopo/resolver.go b/go/vt/srvtopo/resolver.go index 2cb3fed676c..98d77e259ef 100644 --- a/go/vt/srvtopo/resolver.go +++ b/go/vt/srvtopo/resolver.go @@ -17,12 +17,11 @@ limitations under the License. package srvtopo import ( + "context" "sort" "vitess.io/vitess/go/sqltypes" - "context" - "google.golang.org/protobuf/proto" "vitess.io/vitess/go/vt/key" @@ -43,6 +42,9 @@ type Gateway interface { // QueryServiceByAlias returns a QueryService QueryServiceByAlias(alias *topodatapb.TabletAlias, target *querypb.Target) (queryservice.QueryService, error) + + // GetServingKeyspaces returns list of serving keyspaces. + GetServingKeyspaces() []string } // A Resolver can resolve keyspace ids and key ranges into ResolvedShard* diff --git a/go/vt/vtgate/executor_framework_test.go b/go/vt/vtgate/executor_framework_test.go index 7f4c2dcfe97..de8c9aac5d4 100644 --- a/go/vt/vtgate/executor_framework_test.go +++ b/go/vt/vtgate/executor_framework_test.go @@ -130,7 +130,7 @@ func init() { func createExecutorEnv() (executor *Executor, sbc1, sbc2, sbclookup *sandboxconn.SandboxConn) { cell := "aa" - hc := discovery.NewFakeHealthCheck(nil) + hc := discovery.NewFakeHealthCheck(make(chan *discovery.TabletHealth)) s := createSandbox(KsTestSharded) s.VSchema = executorVSchema serv := newSandboxForCells([]string{cell}) @@ -162,7 +162,6 @@ func createExecutorEnv() (executor *Executor, sbc1, sbc2, sbclookup *sandboxconn _ = hc.AddTestTablet(cell, "c0-e0", 1, "TestExecutor", "c0-e0", topodatapb.TabletType_PRIMARY, true, 1, nil) _ = hc.AddTestTablet(cell, "e0-", 1, "TestExecutor", "e0-", topodatapb.TabletType_PRIMARY, true, 1, nil) // Below is needed so that SendAnyWherePlan doesn't fail - _ = hc.AddTestTablet(cell, "random", 1, "TestXBadVSchema", "-20", topodatapb.TabletType_PRIMARY, true, 1, nil) createSandbox(KsTestUnsharded) sbclookup = hc.AddTestTablet(cell, "0", 1, KsTestUnsharded, "0", topodatapb.TabletType_PRIMARY, true, 1, nil) diff --git a/go/vt/vtgate/executor_test.go b/go/vt/vtgate/executor_test.go index b874d2a7107..f327af5f106 100644 --- a/go/vt/vtgate/executor_test.go +++ b/go/vt/vtgate/executor_test.go @@ -40,6 +40,7 @@ import ( "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/test/utils" "vitess.io/vitess/go/vt/callerid" + "vitess.io/vitess/go/vt/discovery" querypb "vitess.io/vitess/go/vt/proto/query" topodatapb "vitess.io/vitess/go/vt/proto/topodata" vschemapb "vitess.io/vitess/go/vt/proto/vschema" @@ -832,7 +833,7 @@ func TestExecutorShow(t *testing.T) { Fields: buildVarCharFields("Cell", "Keyspace", "Shard", "TabletType", "State", "Alias", "Hostname", "PrimaryTermStartTime"), Rows: [][]sqltypes.Value{ buildVarCharRow("aa", "TestExecutor", "-20", "PRIMARY", "SERVING", "aa-0000000001", "-20", "1970-01-01T00:00:01Z"), - buildVarCharRow("aa", "TestXBadVSchema", "-20", "PRIMARY", "SERVING", "aa-0000000009", "random", "1970-01-01T00:00:01Z"), + buildVarCharRow("aa", "TestUnsharded", "0", "REPLICA", "SERVING", "aa-0000000010", "2", "1970-01-01T00:00:01Z"), }, } utils.MustMatch(t, wantqr, qr, query) @@ -2063,6 +2064,49 @@ func TestExecutorClearsWarnings(t *testing.T) { require.Empty(t, session.Warnings) } +// TestServingKeyspaces tests that the dual queries are routed to the correct keyspaces from the list of serving keyspaces. +func TestServingKeyspaces(t *testing.T) { + executor, sbc1, _, sbclookup := createExecutorEnv() + executor.pv = querypb.ExecuteOptions_Gen4 + gw, ok := executor.resolver.resolver.GetGateway().(*TabletGateway) + require.True(t, ok) + hc := gw.hc.(*discovery.FakeHealthCheck) + + // We broadcast twice because we want to ensure the keyspace event watcher has processed all the healthcheck updates + // from the first broadcast. Since we use a channel for broadcasting, it is blocking and hence the second call ensures + // all the updates (specifically the last one) has been processed by the keyspace-event-watcher. + hc.BroadcastAll() + hc.BroadcastAll() + + sbc1.SetResults([]*sqltypes.Result{ + sqltypes.MakeTestResult(sqltypes.MakeTestFields("keyspace", "varchar"), "TestExecutor"), + }) + sbclookup.SetResults([]*sqltypes.Result{ + sqltypes.MakeTestResult(sqltypes.MakeTestFields("keyspace", "varchar"), "TestUnsharded"), + }) + + require.ElementsMatch(t, []string{"TestExecutor", "TestUnsharded"}, gw.GetServingKeyspaces()) + result, err := executor.Execute(ctx, nil, "TestServingKeyspaces", NewSafeSession(&vtgatepb.Session{}), "select keyspace_name from dual", nil) + require.NoError(t, err) + require.Equal(t, `[[VARCHAR("TestExecutor")]]`, fmt.Sprintf("%v", result.Rows)) + + for _, tablet := range hc.GetAllTablets() { + if tablet.Keyspace == "TestExecutor" { + hc.SetServing(tablet, false) + } + } + // Two broadcast calls for the same reason as above. + hc.BroadcastAll() + hc.BroadcastAll() + + // Clear plan cache, to force re-planning of the query. + executor.plans.Clear() + require.ElementsMatch(t, []string{"TestUnsharded"}, gw.GetServingKeyspaces()) + result, err = executor.Execute(ctx, nil, "TestServingKeyspaces", NewSafeSession(&vtgatepb.Session{}), "select keyspace_name from dual", nil) + require.NoError(t, err) + require.Equal(t, `[[VARCHAR("TestUnsharded")]]`, fmt.Sprintf("%v", result.Rows)) +} + func TestExecutorOtherRead(t *testing.T) { executor, sbc1, sbc2, sbclookup := createExecutorEnv() diff --git a/go/vt/vtgate/tabletgateway.go b/go/vt/vtgate/tabletgateway.go index d50e8936ae9..0b2132c172e 100644 --- a/go/vt/vtgate/tabletgateway.go +++ b/go/vt/vtgate/tabletgateway.go @@ -175,6 +175,14 @@ func (gw *TabletGateway) QueryServiceByAlias(alias *topodatapb.TabletAlias, targ return queryservice.Wrap(qs, gw.withShardError), NewShardError(err, target) } +// GetServingKeyspaces returns list of serving keyspaces. +func (gw *TabletGateway) GetServingKeyspaces() []string { + if gw.kev == nil { + return nil + } + return gw.kev.GetServingKeyspaces() +} + // RegisterStats registers the stats to export the lag since the last refresh // and the checksum of the topology func (gw *TabletGateway) RegisterStats() { diff --git a/go/vt/vtgate/vcursor_impl.go b/go/vt/vtgate/vcursor_impl.go index d758d250243..4b48285d997 100644 --- a/go/vt/vtgate/vcursor_impl.go +++ b/go/vt/vtgate/vcursor_impl.go @@ -342,13 +342,7 @@ func (vc *vcursorImpl) AnyKeyspace() (*vindexes.Keyspace, error) { return nil, errNoDbAvailable } - var keyspaces = make([]*vindexes.Keyspace, 0, len(vc.vschema.Keyspaces)) - for _, ks := range vc.vschema.Keyspaces { - keyspaces = append(keyspaces, ks.Keyspace) - } - sort.Slice(keyspaces, func(i, j int) bool { - return keyspaces[i].Name < keyspaces[j].Name - }) + keyspaces := vc.getSortedServingKeyspaces() // Look for any sharded keyspace if present, otherwise take the first keyspace, // sorted alphabetically @@ -360,18 +354,38 @@ func (vc *vcursorImpl) AnyKeyspace() (*vindexes.Keyspace, error) { return keyspaces[0], nil } +// getSortedServingKeyspaces gets the sorted serving keyspaces +func (vc *vcursorImpl) getSortedServingKeyspaces() []*vindexes.Keyspace { + var keyspaces []*vindexes.Keyspace + + if vc.resolver != nil && vc.resolver.GetGateway() != nil { + keyspaceNames := vc.resolver.GetGateway().GetServingKeyspaces() + for _, ksName := range keyspaceNames { + ks, exists := vc.vschema.Keyspaces[ksName] + if exists { + keyspaces = append(keyspaces, ks.Keyspace) + } + } + } + + if len(keyspaces) == 0 { + for _, ks := range vc.vschema.Keyspaces { + keyspaces = append(keyspaces, ks.Keyspace) + } + } + sort.Slice(keyspaces, func(i, j int) bool { + return keyspaces[i].Name < keyspaces[j].Name + }) + return keyspaces +} + func (vc *vcursorImpl) FirstSortedKeyspace() (*vindexes.Keyspace, error) { if len(vc.vschema.Keyspaces) == 0 { return nil, errNoDbAvailable } - kss := vc.vschema.Keyspaces - keys := make([]string, 0, len(kss)) - for ks := range kss { - keys = append(keys, ks) - } - sort.Strings(keys) + keyspaces := vc.getSortedServingKeyspaces() - return kss[keys[0]].Keyspace, nil + return keyspaces[0], nil } // SysVarSetEnabled implements the ContextVSchema interface diff --git a/go/vt/vttablet/sandboxconn/sandboxconn.go b/go/vt/vttablet/sandboxconn/sandboxconn.go index 63847a4ed31..d5f6e497339 100644 --- a/go/vt/vttablet/sandboxconn/sandboxconn.go +++ b/go/vt/vttablet/sandboxconn/sandboxconn.go @@ -45,6 +45,9 @@ type SandboxConn struct { // These errors work for all functions. MustFailCodes map[vtrpcpb.Code]int + // ServingKeyspaces is a list of serving keyspaces + ServingKeyspaces []string + // These errors are triggered only for specific functions. // For now these are just for the 2PC functions. MustFailPrepare int @@ -509,6 +512,11 @@ func (sbc *SandboxConn) QueryServiceByAlias(_ *topodatapb.TabletAlias, _ *queryp return sbc, nil } +// GetServingKeyspaces returns list of serving keyspaces. +func (sbc *SandboxConn) GetServingKeyspaces() []string { + return sbc.ServingKeyspaces +} + // HandlePanic is part of the QueryService interface. func (sbc *SandboxConn) HandlePanic(err *error) { } diff --git a/go/vt/vttablet/tabletconntest/fakequeryservice.go b/go/vt/vttablet/tabletconntest/fakequeryservice.go index 8bfb40bceee..ea44680df38 100644 --- a/go/vt/vttablet/tabletconntest/fakequeryservice.go +++ b/go/vt/vttablet/tabletconntest/fakequeryservice.go @@ -707,6 +707,11 @@ func (f *FakeQueryService) QueryServiceByAlias(_ *topodatapb.TabletAlias, _ *que panic("not implemented") } +// GetServingKeyspaces returns list of serving keyspaces. +func (f *FakeQueryService) GetServingKeyspaces() []string { + panic("not implemented") +} + // ReserveBeginExecute satisfies the Gateway interface func (f *FakeQueryService) ReserveBeginExecute(ctx context.Context, target *querypb.Target, preQueries []string, postBeginQueries []string, sql string, bindVariables map[string]*querypb.BindVariable, options *querypb.ExecuteOptions) (queryservice.ReservedTransactionState, *sqltypes.Result, error) { panic("implement me")