diff --git a/go/vt/discovery/tablet_picker.go b/go/vt/discovery/tablet_picker.go index c9537d3851e..167708c5e2d 100644 --- a/go/vt/discovery/tablet_picker.go +++ b/go/vt/discovery/tablet_picker.go @@ -17,7 +17,9 @@ limitations under the License. package discovery import ( + "context" "fmt" + "io" "math/rand" "sort" "strings" @@ -25,20 +27,16 @@ import ( "time" "vitess.io/vitess/go/stats" - + "vitess.io/vitess/go/vt/grpcclient" + "vitess.io/vitess/go/vt/log" + "vitess.io/vitess/go/vt/topo" "vitess.io/vitess/go/vt/topo/topoproto" - - vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" - + "vitess.io/vitess/go/vt/vterrors" "vitess.io/vitess/go/vt/vttablet/tabletconn" - "vitess.io/vitess/go/vt/log" - - "context" - + querypb "vitess.io/vitess/go/vt/proto/query" topodatapb "vitess.io/vitess/go/vt/proto/topodata" - "vitess.io/vitess/go/vt/topo" - "vitess.io/vitess/go/vt/vterrors" + vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" ) type TabletPickerCellPreference int @@ -291,13 +289,12 @@ func (tp *TabletPicker) orderByTabletType(candidates []*topo.TabletInfo) []*topo return candidates } -// PickForStreaming picks an available tablet. +// PickForStreaming picks a tablet that is healthy and serving. // Selection is based on CellPreference. // See prioritizeTablets for prioritization logic. func (tp *TabletPicker) PickForStreaming(ctx context.Context) (*topodatapb.Tablet, error) { - rand.Seed(time.Now().UnixNano()) - // keep trying at intervals (tabletPickerRetryDelay) until a tablet is found - // or the context is canceled + // Keep trying at intervals (tabletPickerRetryDelay) until a healthy + // serving tablet is found or the context is cancelled. for { select { case <-ctx.Done(): @@ -330,15 +327,15 @@ func (tp *TabletPicker) PickForStreaming(ctx context.Context) (*topodatapb.Table } else if tp.inOrder { candidates = tp.orderByTabletType(candidates) } else { - // Randomize candidates + // Randomize candidates. rand.Shuffle(len(candidates), func(i, j int) { candidates[i], candidates[j] = candidates[j], candidates[i] }) } if len(candidates) == 0 { - // if no candidates were found, sleep and try again + // If no viable candidates were found, sleep and try again. tp.incNoTabletFoundStat() - log.Infof("No tablet found for streaming, shard %s.%s, cells %v, tabletTypes %v, sleeping for %.3f seconds", + log.Infof("No healthy serving tablet found for streaming, shard %s.%s, cells %v, tabletTypes %v, sleeping for %.3f seconds.", tp.keyspace, tp.shard, tp.cells, tp.tabletTypes, float64(GetTabletPickerRetryDelay().Milliseconds())/1000.0) timer := time.NewTimer(GetTabletPickerRetryDelay()) select { @@ -349,34 +346,24 @@ func (tp *TabletPicker) PickForStreaming(ctx context.Context) (*topodatapb.Table } continue } - for _, ti := range candidates { - // try to connect to tablet - if conn, err := tabletconn.GetDialer()(ti.Tablet, true); err == nil { - // OK to use ctx here because it is not actually used by the underlying Close implementation - _ = conn.Close(ctx) - log.Infof("tablet picker found tablet %s", ti.Tablet.String()) - return ti.Tablet, nil - } - // err found - log.Warningf("unable to connect to tablet for alias %v", ti.Alias) - } - // Got here? Means we iterated all tablets and did not find a healthy one - tp.incNoTabletFoundStat() + log.Infof("Tablet picker found a healthy serving tablet for streaming: %s", candidates[0].Tablet.String()) + return candidates[0].Tablet, nil } } -// GetMatchingTablets returns a list of TabletInfo for tablets -// that match the cells, keyspace, shard and tabletTypes for this TabletPicker +// GetMatchingTablets returns a list of TabletInfo for healthy +// serving tablets that match the cells, keyspace, shard and +// tabletTypes for this TabletPicker. func (tp *TabletPicker) GetMatchingTablets(ctx context.Context) []*topo.TabletInfo { - // Special handling for PRIMARY tablet type - // Since there is only one primary, we ignore cell and find the primary + // Special handling for PRIMARY tablet type: since there is only + // one primary per shard, we ignore cell and find the primary. aliases := make([]*topodatapb.TabletAlias, 0) if len(tp.tabletTypes) == 1 && tp.tabletTypes[0] == topodatapb.TabletType_PRIMARY { shortCtx, cancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout) defer cancel() si, err := tp.ts.GetShard(shortCtx, tp.keyspace, tp.shard) if err != nil { - log.Errorf("error getting shard %s/%s: %s", tp.keyspace, tp.shard, err.Error()) + log.Errorf("Error getting shard %s/%s: %v", tp.keyspace, tp.shard, err) return nil } if _, ignore := tp.ignoreTablets[si.PrimaryAlias.String()]; !ignore { @@ -385,24 +372,25 @@ func (tp *TabletPicker) GetMatchingTablets(ctx context.Context) []*topo.TabletIn } else { actualCells := make([]string, 0) for _, cell := range tp.cells { - // check if cell is actually an alias - // non-blocking read so that this is fast + // Check if cell is actually an alias; using a + // non-blocking read so that this is fast. shortCtx, cancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout) defer cancel() _, err := tp.ts.GetCellInfo(shortCtx, cell, false) if err != nil { - // not a valid cell, check whether it is a cell alias + // Not a valid cell, check whether it is a cell alias... shortCtx, cancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout) defer cancel() alias, err := tp.ts.GetCellsAlias(shortCtx, cell, false) - // if we get an error, either cellAlias doesn't exist or it isn't a cell alias at all. Ignore and continue + // If we get an error, either cellAlias doesn't exist or + // it isn't a cell alias at all; ignore and continue. if err == nil { actualCells = append(actualCells, alias.Cells...) } else { log.Infof("Unable to resolve cell %s, ignoring", cell) } } else { - // valid cell, add it to our list + // Valid cell, add it to our list. actualCells = append(actualCells, cell) } } @@ -410,12 +398,11 @@ func (tp *TabletPicker) GetMatchingTablets(ctx context.Context) []*topo.TabletIn for _, cell := range actualCells { shortCtx, cancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout) defer cancel() - // match cell, keyspace and shard + // Match cell, keyspace, and shard. sri, err := tp.ts.GetShardReplication(shortCtx, cell, tp.keyspace, tp.shard) if err != nil { continue } - for _, node := range sri.Nodes { if _, ignore := tp.ignoreTablets[node.TabletAlias.String()]; !ignore { aliases = append(aliases, node.TabletAlias) @@ -427,33 +414,47 @@ func (tp *TabletPicker) GetMatchingTablets(ctx context.Context) []*topo.TabletIn if len(aliases) == 0 { return nil } + shortCtx, cancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout) defer cancel() tabletMap, err := tp.ts.GetTabletMap(shortCtx, aliases) if err != nil { - log.Warningf("error fetching tablets from topo: %v", err) - // If we get a partial result we can still use it, otherwise return + log.Warningf("Error fetching tablets from topo: %v", err) + // If we get a partial result we can still use it, otherwise return. if len(tabletMap) == 0 { return nil } } + tablets := make([]*topo.TabletInfo, 0, len(aliases)) for _, tabletAlias := range aliases { tabletInfo, ok := tabletMap[topoproto.TabletAliasString(tabletAlias)] if !ok { - // Either tablet disappeared on us, or we got a partial result (GetTabletMap ignores - // topo.ErrNoNode). Just log a warning - log.Warningf("failed to load tablet %v", tabletAlias) + // Either tablet disappeared on us, or we got a partial result + // (GetTabletMap ignores topo.ErrNoNode); just log a warning. + log.Warningf("Tablet picker failed to load tablet %v", tabletAlias) } else if topoproto.IsTypeInList(tabletInfo.Type, tp.tabletTypes) { - tablets = append(tablets, tabletInfo) + // Try to connect to the tablet and confirm that it's usable. + if conn, err := tabletconn.GetDialer()(tabletInfo.Tablet, grpcclient.FailFast(true)); err == nil { + // Ensure that the tablet is healthy and serving. + shortCtx, cancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout) + defer cancel() + if err := conn.StreamHealth(shortCtx, func(shr *querypb.StreamHealthResponse) error { + if shr != nil && shr.Serving && shr.RealtimeStats != nil && shr.RealtimeStats.HealthError == "" { + return io.EOF // End the stream + } + return vterrors.New(vtrpcpb.Code_INTERNAL, "tablet is not healthy and serving") + }); err == nil || err == io.EOF { + tablets = append(tablets, tabletInfo) + } + _ = conn.Close(ctx) + } } } return tablets } func init() { - // TODO(sougou): consolidate this call to be once per process. - rand.Seed(time.Now().UnixNano()) globalTPStats = newTabletPickerStats() } diff --git a/go/vt/discovery/tablet_picker_test.go b/go/vt/discovery/tablet_picker_test.go index fd2c1635359..91b936303df 100644 --- a/go/vt/discovery/tablet_picker_test.go +++ b/go/vt/discovery/tablet_picker_test.go @@ -22,10 +22,11 @@ import ( "github.com/stretchr/testify/require" "google.golang.org/protobuf/proto" - querypb "vitess.io/vitess/go/vt/proto/query" - topodatapb "vitess.io/vitess/go/vt/proto/topodata" "vitess.io/vitess/go/vt/topo" "vitess.io/vitess/go/vt/topo/memorytopo" + + querypb "vitess.io/vitess/go/vt/proto/query" + topodatapb "vitess.io/vitess/go/vt/proto/topodata" ) func TestPickPrimary(t *testing.T) { @@ -503,6 +504,45 @@ func TestPickErrorOnlySpecified(t *testing.T) { require.Greater(t, globalTPStats.noTabletFoundError.Counts()["cell.ks.0.replica"], int64(0)) } +// TestPickFallbackType tests that when providing a list of tablet types to +// pick from, with the list in preference order, that when the primary/first +// type has no available healthy serving tablets that we select a healthy +// serving tablet from the secondary/second type. +func TestPickFallbackType(t *testing.T) { + cells := []string{"cell1", "cell2"} + localCell := cells[0] + tabletTypes := "replica,primary" + options := TabletPickerOptions{ + TabletOrder: "InOrder", + } + te := newPickerTestEnv(t, cells) + + // This one should be selected even though it's the secondary type + // as it is healthy and serving. + primaryTablet := addTablet(te, 100, topodatapb.TabletType_PRIMARY, localCell, true, true) + defer deleteTablet(t, te, primaryTablet) + + // Replica tablet should not be selected as it is unhealthy. + replicaTablet := addTablet(te, 200, topodatapb.TabletType_REPLICA, localCell, false, false) + defer deleteTablet(t, te, replicaTablet) + + ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond) + defer cancel() + _, err := te.topoServ.UpdateShardFields(ctx, te.keyspace, te.shard, func(si *topo.ShardInfo) error { + si.PrimaryAlias = primaryTablet.Alias + return nil + }) + require.NoError(t, err) + + tp, err := NewTabletPicker(context.Background(), te.topoServ, cells, localCell, te.keyspace, te.shard, tabletTypes, options) + require.NoError(t, err) + ctx2, cancel2 := context.WithTimeout(context.Background(), 1*time.Second) + defer cancel2() + tablet, err := tp.PickForStreaming(ctx2) + require.NoError(t, err) + assert.True(t, proto.Equal(primaryTablet, tablet), "Pick: %v, want %v", tablet, primaryTablet) +} + type pickerTestEnv struct { t *testing.T keyspace string @@ -551,18 +591,21 @@ func addTablet(te *pickerTestEnv, id int, tabletType topodatapb.TabletType, cell err := te.topoServ.CreateTablet(context.Background(), tablet) require.NoError(te.t, err) + shr := &querypb.StreamHealthResponse{ + Serving: serving, + Target: &querypb.Target{ + Keyspace: te.keyspace, + Shard: te.shard, + TabletType: tabletType, + }, + RealtimeStats: &querypb.RealtimeStats{HealthError: "tablet is unhealthy"}, + } if healthy { - _ = createFixedHealthConn(tablet, &querypb.StreamHealthResponse{ - Serving: serving, - Target: &querypb.Target{ - Keyspace: te.keyspace, - Shard: te.shard, - TabletType: tabletType, - }, - RealtimeStats: &querypb.RealtimeStats{HealthError: ""}, - }) + shr.RealtimeStats.HealthError = "" } + _ = createFixedHealthConn(tablet, shr) + return tablet } diff --git a/go/vt/vttablet/sandboxconn/sandboxconn.go b/go/vt/vttablet/sandboxconn/sandboxconn.go index 05c6b0485df..6afc81217f5 100644 --- a/go/vt/vttablet/sandboxconn/sandboxconn.go +++ b/go/vt/vttablet/sandboxconn/sandboxconn.go @@ -415,14 +415,8 @@ func (sbc *SandboxConn) SetStreamHealthResponse(res *querypb.StreamHealthRespons sbc.streamHealthResponse = res } -// StreamHealth always mocks a "healthy" result by default. If you want to override this behavior you -// can call SetStreamHealthResponse. +// StreamHealth always mocks a "healthy" result. func (sbc *SandboxConn) StreamHealth(ctx context.Context, callback func(*querypb.StreamHealthResponse) error) error { - sbc.mapMu.Lock() - defer sbc.mapMu.Unlock() - if sbc.streamHealthResponse != nil { - return callback(sbc.streamHealthResponse) - } return nil } diff --git a/go/vt/vttablet/tabletmanager/vreplication/controller.go b/go/vt/vttablet/tabletmanager/vreplication/controller.go index 0eed7de71ee..8e9f880b028 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/controller.go +++ b/go/vt/vttablet/tabletmanager/vreplication/controller.go @@ -20,6 +20,7 @@ import ( "fmt" "strconv" "strings" + "sync/atomic" "time" "google.golang.org/protobuf/encoding/prototext" @@ -29,7 +30,6 @@ import ( "context" - "vitess.io/vitess/go/sync2" "vitess.io/vitess/go/tb" "vitess.io/vitess/go/vt/binlog/binlogplayer" "vitess.io/vitess/go/vt/log" @@ -40,6 +40,13 @@ import ( topodatapb "vitess.io/vitess/go/vt/proto/topodata" ) +const ( + // How many times to retry tablet selection before we + // give up and return an error message that the user + // can see and act upon if needed. + tabletPickerRetries = 5 +) + // controller is created by Engine. Members are initialized upfront. // There is no mutex within a controller becaust its members are // either read-only or self-synchronized. @@ -59,7 +66,7 @@ type controller struct { done chan struct{} // The following fields are updated after start. So, they need synchronization. - sourceTablet sync2.AtomicString + sourceTablet atomic.Value lastWorkflowError *lastError } @@ -79,6 +86,7 @@ func newController(ctx context.Context, params map[string]string, dbClientFactor done: make(chan struct{}), source: &binlogdatapb.BinlogSource{}, } + ct.sourceTablet.Store(&topodatapb.TabletAlias{}) log.Infof("creating controller with cell: %v, tabletTypes: %v, and params: %v", cell, tabletTypesStr, params) // id @@ -173,7 +181,7 @@ func (ct *controller) run(ctx context.Context) { func (ct *controller) runBlp(ctx context.Context) (err error) { defer func() { - ct.sourceTablet.Set("") + ct.sourceTablet.Store(&topodatapb.TabletAlias{}) if x := recover(); x != nil { log.Errorf("stream %v: caught panic: %v\n%s", ct.id, x, tb.Stack(4)) err = fmt.Errorf("panic: %v", x) @@ -203,23 +211,11 @@ func (ct *controller) runBlp(ctx context.Context) (err error) { } defer dbClient.Close() - var tablet *topodatapb.Tablet - if ct.source.GetExternalMysql() == "" { - log.Infof("trying to find a tablet eligible for vreplication. stream id: %v", ct.id) - tablet, err = ct.tabletPicker.PickForStreaming(ctx) - if err != nil { - select { - case <-ctx.Done(): - default: - ct.blpStats.ErrorCounts.Add([]string{"No Source Tablet Found"}, 1) - ct.setMessage(dbClient, fmt.Sprintf("Error picking tablet: %s", err.Error())) - } - return err - } - ct.setMessage(dbClient, fmt.Sprintf("Picked source tablet: %s", tablet.Alias.String())) - log.Infof("found a tablet eligible for vreplication. stream id: %v tablet: %s", ct.id, tablet.Alias.String()) - ct.sourceTablet.Set(tablet.Alias.String()) + tablet, err := ct.pickSourceTablet(ctx, dbClient) + if err != nil { + return err } + switch { case len(ct.source.Tables) > 0: // Table names can have search patterns. Resolve them against the schema. @@ -268,8 +264,10 @@ func (ct *controller) runBlp(ctx context.Context) (err error) { vr := newVReplicator(ct.id, ct.source, vsClient, ct.blpStats, dbClient, ct.mysqld, ct.vre) err = vr.Replicate(ctx) ct.lastWorkflowError.record(err) + // If this is a mysql error that we know needs manual intervention OR - // we cannot identify this as non-recoverable, but it has persisted beyond the retry limit (maxTimeToRetryError) + // we cannot identify this as non-recoverable, but it has persisted + // beyond the retry limit (maxTimeToRetryError). if isUnrecoverableError(err) || !ct.lastWorkflowError.shouldRetry() { log.Errorf("vreplication stream %d going into error state due to %+v", ct.id, err) if errSetState := vr.setState(binlogplayer.BlpError, err.Error()); errSetState != nil { @@ -294,6 +292,35 @@ func (ct *controller) setMessage(dbClient binlogplayer.DBClient, message string) } return nil } + +// pickSourceTablet picks a healthy serving tablet to source for +// the vreplication stream. If the source is marked as external, it +// returns nil. +func (ct *controller) pickSourceTablet(ctx context.Context, dbClient binlogplayer.DBClient) (*topodatapb.Tablet, error) { + if ct.source.GetExternalMysql() != "" { + return nil, nil + } + log.Infof("Trying to find an eligible source tablet for vreplication stream id %d for workflow: %s", + ct.id, ct.workflow) + tpCtx, tpCancel := context.WithTimeout(ctx, discovery.GetTabletPickerRetryDelay()*tabletPickerRetries) + defer tpCancel() + tablet, err := ct.tabletPicker.PickForStreaming(tpCtx) + if err != nil { + select { + case <-ctx.Done(): + default: + ct.blpStats.ErrorCounts.Add([]string{"No Source Tablet Found"}, 1) + ct.setMessage(dbClient, fmt.Sprintf("Error picking tablet: %s", err.Error())) + } + return tablet, err + } + ct.setMessage(dbClient, fmt.Sprintf("Picked source tablet: %s", tablet.Alias.String())) + log.Infof("Found eligible source tablet %s for vreplication stream id %d for workflow %s", + tablet.Alias.String(), ct.id, ct.workflow) + ct.sourceTablet.Store(tablet.Alias) + return tablet, err +} + func (ct *controller) Stop() { ct.cancel() <-ct.done diff --git a/go/vt/vttablet/tabletmanager/vreplication/engine.go b/go/vt/vttablet/tabletmanager/vreplication/engine.go index afce87aa630..ef0d1376857 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/engine.go +++ b/go/vt/vttablet/tabletmanager/vreplication/engine.go @@ -739,7 +739,7 @@ func (vre *Engine) WaitForPos(ctx context.Context, id int, pos string) error { return fmt.Errorf("unexpected result: %v", qr) } - // When err is not nil then we got a retryable error and will loop again + // When err is not nil then we got a retryable error and will loop again. if err == nil { current, dcerr := binlogplayer.DecodePosition(qr.Rows[0][0].ToString()) if dcerr != nil { diff --git a/go/vt/vttablet/tabletmanager/vreplication/stats.go b/go/vt/vttablet/tabletmanager/vreplication/stats.go index c8c242bab05..727d32b9d8d 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/stats.go +++ b/go/vt/vttablet/tabletmanager/vreplication/stats.go @@ -27,6 +27,8 @@ import ( "vitess.io/vitess/go/stats" "vitess.io/vitess/go/vt/servenv" + + topodatapb "vitess.io/vitess/go/vt/proto/topodata" ) var ( @@ -142,7 +144,10 @@ func (st *vrStats) register() { defer st.mu.Unlock() result := make(map[string]string, len(st.controllers)) for _, ct := range st.controllers { - result[fmt.Sprintf("%v", ct.id)] = ct.sourceTablet.Get() + ta := ct.sourceTablet.Load() + if ta != nil { + result[fmt.Sprintf("%v", ct.id)] = ta.(*topodatapb.TabletAlias).String() + } } return result })) @@ -394,8 +399,7 @@ func (st *vrStats) status() *EngineStatus { ReplicationLagSeconds: ct.blpStats.ReplicationLagSeconds.Get(), Counts: ct.blpStats.Timings.Counts(), Rates: ct.blpStats.Rates.Get(), - State: ct.blpStats.State.Get(), - SourceTablet: ct.sourceTablet.Get(), + SourceTablet: ct.sourceTablet.Load().(*topodatapb.TabletAlias), Messages: ct.blpStats.MessageHistory(), QueryCounts: ct.blpStats.QueryCount.Counts(), PhaseTimings: ct.blpStats.PhaseTimings.Counts(), @@ -427,7 +431,7 @@ type ControllerStatus struct { Counts map[string]int64 Rates map[string][]float64 State string - SourceTablet string + SourceTablet *topodatapb.TabletAlias Messages []string QueryCounts map[string]int64 PhaseTimings map[string]int64 diff --git a/go/vt/vttablet/tabletmanager/vreplication/stats_test.go b/go/vt/vttablet/tabletmanager/vreplication/stats_test.go index 2accc3cfa24..b63583e57ee 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/stats_test.go +++ b/go/vt/vttablet/tabletmanager/vreplication/stats_test.go @@ -28,6 +28,8 @@ import ( "vitess.io/vitess/go/mysql" "vitess.io/vitess/go/vt/binlog/binlogplayer" "vitess.io/vitess/go/vt/proto/binlogdata" + + topodatapb "vitess.io/vitess/go/vt/proto/topodata" ) var wantOut = ` @@ -107,8 +109,14 @@ func TestStatusHtml(t *testing.T) { done: make(chan struct{}), }, } - testStats.controllers[1].sourceTablet.Set("src1") - testStats.controllers[2].sourceTablet.Set("src2") + testStats.controllers[1].sourceTablet.Store(&topodatapb.TabletAlias{ + Cell: "zone1", + Uid: 01, + }) + testStats.controllers[2].sourceTablet.Store(&topodatapb.TabletAlias{ + Cell: "zone1", + Uid: 02, + }) close(testStats.controllers[2].done) tpl := template.Must(template.New("test").Parse(vreplicationTemplate)) @@ -135,7 +143,10 @@ func TestVReplicationStats(t *testing.T) { done: make(chan struct{}), }, } - testStats.controllers[1].sourceTablet.Set("src1") + testStats.controllers[1].sourceTablet.Store(&topodatapb.TabletAlias{ + Cell: "zone1", + Uid: 01, + }) sleepTime := 1 * time.Millisecond record := func(phase string) { diff --git a/go/vt/wrangler/fake_tablet_test.go b/go/vt/wrangler/fake_tablet_test.go index 9fdb6e616a1..254e1813d8d 100644 --- a/go/vt/wrangler/fake_tablet_test.go +++ b/go/vt/wrangler/fake_tablet_test.go @@ -32,6 +32,8 @@ import ( "vitess.io/vitess/go/vt/mysqlctl/fakemysqldaemon" "vitess.io/vitess/go/vt/topo" "vitess.io/vitess/go/vt/vttablet/grpctmserver" + "vitess.io/vitess/go/vt/vttablet/queryservice" + "vitess.io/vitess/go/vt/vttablet/queryservice/fakes" "vitess.io/vitess/go/vt/vttablet/tabletconntest" "vitess.io/vitess/go/vt/vttablet/tabletmanager" "vitess.io/vitess/go/vt/vttablet/tabletservermock" @@ -48,6 +50,12 @@ import ( _ "vitess.io/vitess/go/vt/vttablet/grpctabletconn" ) +func init() { + // Ensure we will use the right protocol (gRPC) in all unit tests. + tabletconntest.SetProtocol("go.vt.wrangler.fake_tablet_test", "grpc") + tmclienttest.SetProtocol("go.vt.wrangler.fake_tablet_test", "grpc") +} + // This file was copied from testlib. All tests from testlib should be moved // to the current directory. In order to move tests from there, we have to // remove the circular dependency it causes (through vtctl dependence). @@ -81,6 +89,8 @@ type fakeTablet struct { StartHTTPServer bool HTTPListener net.Listener HTTPServer *http.Server + + queryservice.QueryService } // TabletOption is an interface for changing tablet parameters. @@ -141,6 +151,7 @@ func newFakeTablet(t *testing.T, wr *Wrangler, cell string, uid uint32, tabletTy Tablet: tablet, FakeMysqlDaemon: fakeMysqlDaemon, RPCServer: grpc.NewServer(), + QueryService: fakes.ErrorQueryService, } } @@ -238,8 +249,14 @@ func (ft *fakeTablet) Target() querypb.Target { } } -func init() { - // enforce we will use the right protocol (gRPC) in all unit tests - tabletconntest.SetProtocol("go.vt.wrangler.fake_tablet_test", "grpc") - tmclienttest.SetProtocol("go.vt.wrangler.fake_tablet_test", "grpc") +func (ft *fakeTablet) StreamHealth(ctx context.Context, callback func(*querypb.StreamHealthResponse) error) error { + return callback(&querypb.StreamHealthResponse{ + Serving: true, + Target: &querypb.Target{ + Keyspace: ft.Tablet.Keyspace, + Shard: ft.Tablet.Shard, + TabletType: ft.Tablet.Type, + }, + RealtimeStats: &querypb.RealtimeStats{}, + }) } diff --git a/go/vt/wrangler/traffic_switcher_env_test.go b/go/vt/wrangler/traffic_switcher_env_test.go index 02eb5afb377..e34d0af6182 100644 --- a/go/vt/wrangler/traffic_switcher_env_test.go +++ b/go/vt/wrangler/traffic_switcher_env_test.go @@ -18,6 +18,8 @@ package wrangler import ( "fmt" + "math/rand" + "sync" "testing" "time" @@ -30,6 +32,7 @@ import ( "vitess.io/vitess/go/mysql" "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/vt/binlog/binlogplayer" + "vitess.io/vitess/go/vt/grpcclient" "vitess.io/vitess/go/vt/key" "vitess.io/vitess/go/vt/logutil" binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" @@ -39,6 +42,9 @@ import ( "vitess.io/vitess/go/vt/topo" "vitess.io/vitess/go/vt/topo/memorytopo" "vitess.io/vitess/go/vt/topotools" + "vitess.io/vitess/go/vt/vttablet/queryservice" + "vitess.io/vitess/go/vt/vttablet/tabletconn" + "vitess.io/vitess/go/vt/vttablet/tabletconntest" "vitess.io/vitess/go/vt/vttablet/tabletmanager/vreplication" "vitess.io/vitess/go/vt/vttablet/tmclient" ) @@ -74,6 +80,7 @@ type testMigraterEnv struct { sourceKeyRanges []*topodatapb.KeyRange targetKeyRanges []*topodatapb.KeyRange tmeDB *fakesqldb.DB + mu sync.Mutex } // testShardMigraterEnv has some convenience functions for adding expected queries. @@ -135,6 +142,19 @@ func newTestTableMigraterCustom(ctx context.Context, t *testing.T, sourceShards, tme.targetKeyRanges = append(tme.targetKeyRanges, targetKeyRange) } + dialerName := fmt.Sprintf("TrafficSwitcherTest-%s-%d", t.Name(), rand.Intn(1000000000)) + tabletconn.RegisterDialer(dialerName, func(tablet *topodatapb.Tablet, failFast grpcclient.FailFast) (queryservice.QueryService, error) { + tme.mu.Lock() + defer tme.mu.Unlock() + for _, ft := range append(tme.sourcePrimaries, tme.targetPrimaries...) { + if ft.Tablet.Alias.Uid == tablet.Alias.Uid { + return ft, nil + } + } + return nil, nil + }) + tabletconntest.SetProtocol("go.vt.wrangler.traffic_switcher_env_test", dialerName) + vs := &vschemapb.Keyspace{ Sharded: true, Vindexes: map[string]*vschemapb.Vindex{ @@ -296,6 +316,19 @@ func newTestShardMigrater(ctx context.Context, t *testing.T, sourceShards, targe tme.targetKeyRanges = append(tme.targetKeyRanges, targetKeyRange) } + dialerName := fmt.Sprintf("TrafficSwitcherTest-%s-%d", t.Name(), rand.Intn(1000000000)) + tabletconn.RegisterDialer(dialerName, func(tablet *topodatapb.Tablet, failFast grpcclient.FailFast) (queryservice.QueryService, error) { + tme.mu.Lock() + defer tme.mu.Unlock() + for _, ft := range append(tme.sourcePrimaries, tme.targetPrimaries...) { + if ft.Tablet.Alias.Uid == tablet.Alias.Uid { + return ft, nil + } + } + return nil, nil + }) + tabletconntest.SetProtocol("go.vt.wrangler.traffic_switcher_env_test", dialerName) + vs := &vschemapb.Keyspace{ Sharded: true, Vindexes: map[string]*vschema.Vindex{