Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Random selection of keyspace based on available tablet #13359

Merged
merged 5 commits into from
Jul 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions go/vt/discovery/fake_healthcheck.go
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,18 @@ func (fhc *FakeHealthCheck) GetAllTablets() map[string]*topodatapb.Tablet {
return res
}

// BroadcastAll broadcasts all the tablets' healthchecks
func (fhc *FakeHealthCheck) BroadcastAll() {
if fhc.ch == nil {
return
}
fhc.mu.Lock()
defer fhc.mu.Unlock()
for _, item := range fhc.items {
fhc.ch <- simpleCopy(item.ts)
}
}

func simpleCopy(th *TabletHealth) *TabletHealth {
return &TabletHealth{
Conn: th.Conn,
Expand Down
26 changes: 26 additions & 0 deletions go/vt/discovery/keyspace_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -379,6 +379,18 @@ func (kss *keyspaceState) onSrvKeyspace(newKeyspace *topodatapb.SrvKeyspace, new
return true
}

// isServing returns whether a keyspace has at least one serving shard or not.
func (kss *keyspaceState) isServing() bool {
kss.mu.Lock()
defer kss.mu.Unlock()
for _, state := range kss.shards {
if state.serving {
return true
}
}
return false
}

// newKeyspaceState allocates the internal state required to keep track of availability incidents
// in this keyspace, and starts up a SrvKeyspace watcher on our topology server which will update
// our keyspaceState with any topology changes in real time.
Expand Down Expand Up @@ -471,3 +483,17 @@ func (kew *KeyspaceEventWatcher) PrimaryIsNotServing(target *query.Target) (*top
}
return nil, false
}

// GetServingKeyspaces gets the serving keyspaces from the keyspace event watcher.
func (kew *KeyspaceEventWatcher) GetServingKeyspaces() []string {
kew.mu.Lock()
defer kew.mu.Unlock()

var servingKeyspaces []string
for ksName, state := range kew.keyspaces {
if state.isServing() {
servingKeyspaces = append(servingKeyspaces, ksName)
}
}
return servingKeyspaces
}
6 changes: 4 additions & 2 deletions go/vt/srvtopo/resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,11 @@ limitations under the License.
package srvtopo

import (
"context"
"sort"

"vitess.io/vitess/go/sqltypes"

"context"

"google.golang.org/protobuf/proto"

"vitess.io/vitess/go/vt/key"
Expand All @@ -43,6 +42,9 @@ type Gateway interface {

// QueryServiceByAlias returns a QueryService
QueryServiceByAlias(alias *topodatapb.TabletAlias, target *querypb.Target) (queryservice.QueryService, error)

// GetServingKeyspaces returns list of serving keyspaces.
GetServingKeyspaces() []string
}

// A Resolver can resolve keyspace ids and key ranges into ResolvedShard*
Expand Down
3 changes: 1 addition & 2 deletions go/vt/vtgate/executor_framework_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ func init() {

func createExecutorEnv() (executor *Executor, sbc1, sbc2, sbclookup *sandboxconn.SandboxConn) {
cell := "aa"
hc := discovery.NewFakeHealthCheck(nil)
hc := discovery.NewFakeHealthCheck(make(chan *discovery.TabletHealth))
s := createSandbox(KsTestSharded)
s.VSchema = executorVSchema
serv := newSandboxForCells([]string{cell})
Expand Down Expand Up @@ -162,7 +162,6 @@ func createExecutorEnv() (executor *Executor, sbc1, sbc2, sbclookup *sandboxconn
_ = hc.AddTestTablet(cell, "c0-e0", 1, "TestExecutor", "c0-e0", topodatapb.TabletType_PRIMARY, true, 1, nil)
_ = hc.AddTestTablet(cell, "e0-", 1, "TestExecutor", "e0-", topodatapb.TabletType_PRIMARY, true, 1, nil)
// Below is needed so that SendAnyWherePlan doesn't fail
_ = hc.AddTestTablet(cell, "random", 1, "TestXBadVSchema", "-20", topodatapb.TabletType_PRIMARY, true, 1, nil)

createSandbox(KsTestUnsharded)
sbclookup = hc.AddTestTablet(cell, "0", 1, KsTestUnsharded, "0", topodatapb.TabletType_PRIMARY, true, 1, nil)
Expand Down
46 changes: 45 additions & 1 deletion go/vt/vtgate/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/test/utils"
"vitess.io/vitess/go/vt/callerid"
"vitess.io/vitess/go/vt/discovery"
querypb "vitess.io/vitess/go/vt/proto/query"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
vschemapb "vitess.io/vitess/go/vt/proto/vschema"
Expand Down Expand Up @@ -832,7 +833,7 @@ func TestExecutorShow(t *testing.T) {
Fields: buildVarCharFields("Cell", "Keyspace", "Shard", "TabletType", "State", "Alias", "Hostname", "PrimaryTermStartTime"),
Rows: [][]sqltypes.Value{
buildVarCharRow("aa", "TestExecutor", "-20", "PRIMARY", "SERVING", "aa-0000000001", "-20", "1970-01-01T00:00:01Z"),
buildVarCharRow("aa", "TestXBadVSchema", "-20", "PRIMARY", "SERVING", "aa-0000000009", "random", "1970-01-01T00:00:01Z"),
buildVarCharRow("aa", "TestUnsharded", "0", "REPLICA", "SERVING", "aa-0000000010", "2", "1970-01-01T00:00:01Z"),
},
}
utils.MustMatch(t, wantqr, qr, query)
Expand Down Expand Up @@ -2063,6 +2064,49 @@ func TestExecutorClearsWarnings(t *testing.T) {
require.Empty(t, session.Warnings)
}

// TestServingKeyspaces tests that the dual queries are routed to the correct keyspaces from the list of serving keyspaces.
func TestServingKeyspaces(t *testing.T) {
executor, sbc1, _, sbclookup := createExecutorEnv()
executor.pv = querypb.ExecuteOptions_Gen4
gw, ok := executor.resolver.resolver.GetGateway().(*TabletGateway)
require.True(t, ok)
hc := gw.hc.(*discovery.FakeHealthCheck)

// We broadcast twice because we want to ensure the keyspace event watcher has processed all the healthcheck updates
// from the first broadcast. Since we use a channel for broadcasting, it is blocking and hence the second call ensures
// all the updates (specifically the last one) has been processed by the keyspace-event-watcher.
hc.BroadcastAll()
hc.BroadcastAll()

sbc1.SetResults([]*sqltypes.Result{
sqltypes.MakeTestResult(sqltypes.MakeTestFields("keyspace", "varchar"), "TestExecutor"),
})
sbclookup.SetResults([]*sqltypes.Result{
sqltypes.MakeTestResult(sqltypes.MakeTestFields("keyspace", "varchar"), "TestUnsharded"),
})

require.ElementsMatch(t, []string{"TestExecutor", "TestUnsharded"}, gw.GetServingKeyspaces())
result, err := executor.Execute(ctx, nil, "TestServingKeyspaces", NewSafeSession(&vtgatepb.Session{}), "select keyspace_name from dual", nil)
require.NoError(t, err)
require.Equal(t, `[[VARCHAR("TestExecutor")]]`, fmt.Sprintf("%v", result.Rows))

for _, tablet := range hc.GetAllTablets() {
if tablet.Keyspace == "TestExecutor" {
hc.SetServing(tablet, false)
}
}
// Two broadcast calls for the same reason as above.
hc.BroadcastAll()
hc.BroadcastAll()

// Clear plan cache, to force re-planning of the query.
executor.plans.Clear()
require.ElementsMatch(t, []string{"TestUnsharded"}, gw.GetServingKeyspaces())
result, err = executor.Execute(ctx, nil, "TestServingKeyspaces", NewSafeSession(&vtgatepb.Session{}), "select keyspace_name from dual", nil)
require.NoError(t, err)
require.Equal(t, `[[VARCHAR("TestUnsharded")]]`, fmt.Sprintf("%v", result.Rows))
}

func TestExecutorOtherRead(t *testing.T) {
executor, sbc1, sbc2, sbclookup := createExecutorEnv()

Expand Down
8 changes: 8 additions & 0 deletions go/vt/vtgate/tabletgateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,14 @@ func (gw *TabletGateway) QueryServiceByAlias(alias *topodatapb.TabletAlias, targ
return queryservice.Wrap(qs, gw.withShardError), NewShardError(err, target)
}

// GetServingKeyspaces returns list of serving keyspaces.
func (gw *TabletGateway) GetServingKeyspaces() []string {
if gw.kev == nil {
return nil
}
return gw.kev.GetServingKeyspaces()
}

// RegisterStats registers the stats to export the lag since the last refresh
// and the checksum of the topology
func (gw *TabletGateway) RegisterStats() {
Expand Down
42 changes: 28 additions & 14 deletions go/vt/vtgate/vcursor_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -342,13 +342,7 @@ func (vc *vcursorImpl) AnyKeyspace() (*vindexes.Keyspace, error) {
return nil, errNoDbAvailable
}

var keyspaces = make([]*vindexes.Keyspace, 0, len(vc.vschema.Keyspaces))
for _, ks := range vc.vschema.Keyspaces {
keyspaces = append(keyspaces, ks.Keyspace)
}
sort.Slice(keyspaces, func(i, j int) bool {
return keyspaces[i].Name < keyspaces[j].Name
})
keyspaces := vc.getSortedServingKeyspaces()

// Look for any sharded keyspace if present, otherwise take the first keyspace,
// sorted alphabetically
Expand All @@ -360,18 +354,38 @@ func (vc *vcursorImpl) AnyKeyspace() (*vindexes.Keyspace, error) {
return keyspaces[0], nil
}

// getSortedServingKeyspaces gets the sorted serving keyspaces
func (vc *vcursorImpl) getSortedServingKeyspaces() []*vindexes.Keyspace {
var keyspaces []*vindexes.Keyspace

if vc.resolver != nil && vc.resolver.GetGateway() != nil {
keyspaceNames := vc.resolver.GetGateway().GetServingKeyspaces()
for _, ksName := range keyspaceNames {
ks, exists := vc.vschema.Keyspaces[ksName]
if exists {
keyspaces = append(keyspaces, ks.Keyspace)
}
}
}

if len(keyspaces) == 0 {
for _, ks := range vc.vschema.Keyspaces {
keyspaces = append(keyspaces, ks.Keyspace)
}
}
sort.Slice(keyspaces, func(i, j int) bool {
return keyspaces[i].Name < keyspaces[j].Name
})
return keyspaces
}

func (vc *vcursorImpl) FirstSortedKeyspace() (*vindexes.Keyspace, error) {
if len(vc.vschema.Keyspaces) == 0 {
return nil, errNoDbAvailable
}
kss := vc.vschema.Keyspaces
keys := make([]string, 0, len(kss))
for ks := range kss {
keys = append(keys, ks)
}
sort.Strings(keys)
keyspaces := vc.getSortedServingKeyspaces()

return kss[keys[0]].Keyspace, nil
return keyspaces[0], nil
}

// SysVarSetEnabled implements the ContextVSchema interface
Expand Down
8 changes: 8 additions & 0 deletions go/vt/vttablet/sandboxconn/sandboxconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ type SandboxConn struct {
// These errors work for all functions.
MustFailCodes map[vtrpcpb.Code]int

// ServingKeyspaces is a list of serving keyspaces
ServingKeyspaces []string

// These errors are triggered only for specific functions.
// For now these are just for the 2PC functions.
MustFailPrepare int
Expand Down Expand Up @@ -509,6 +512,11 @@ func (sbc *SandboxConn) QueryServiceByAlias(_ *topodatapb.TabletAlias, _ *queryp
return sbc, nil
}

// GetServingKeyspaces returns list of serving keyspaces.
func (sbc *SandboxConn) GetServingKeyspaces() []string {
return sbc.ServingKeyspaces
}

// HandlePanic is part of the QueryService interface.
func (sbc *SandboxConn) HandlePanic(err *error) {
}
Expand Down
5 changes: 5 additions & 0 deletions go/vt/vttablet/tabletconntest/fakequeryservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -707,6 +707,11 @@ func (f *FakeQueryService) QueryServiceByAlias(_ *topodatapb.TabletAlias, _ *que
panic("not implemented")
}

// GetServingKeyspaces returns list of serving keyspaces.
func (f *FakeQueryService) GetServingKeyspaces() []string {
panic("not implemented")
}

// ReserveBeginExecute satisfies the Gateway interface
func (f *FakeQueryService) ReserveBeginExecute(ctx context.Context, target *querypb.Target, preQueries []string, postBeginQueries []string, sql string, bindVariables map[string]*querypb.BindVariable, options *querypb.ExecuteOptions) (queryservice.ReservedTransactionState, *sqltypes.Result, error) {
panic("implement me")
Expand Down
Loading