From 00c7907a085debe35aa03831d42fb6b3aeb5de56 Mon Sep 17 00:00:00 2001 From: Manan Gupta Date: Mon, 29 May 2023 17:21:39 +0530 Subject: [PATCH 1/8] test: add a failing test Signed-off-by: Manan Gupta --- go/test/endtoend/vtorc/general/vtorc_test.go | 2 +- .../vtorc/primaryfailure/main_test.go | 2 +- .../primaryfailure/primary_failure_test.go | 55 +++++++++++++++++++ go/test/endtoend/vtorc/utils/utils.go | 36 +++++------- 4 files changed, 71 insertions(+), 24 deletions(-) diff --git a/go/test/endtoend/vtorc/general/vtorc_test.go b/go/test/endtoend/vtorc/general/vtorc_test.go index 4254606dd94..91cfc1d91ac 100644 --- a/go/test/endtoend/vtorc/general/vtorc_test.go +++ b/go/test/endtoend/vtorc/general/vtorc_test.go @@ -421,7 +421,7 @@ func TestDurabilityPolicySetLater(t *testing.T) { time.Sleep(30 * time.Second) // Now set the correct durability policy - out, err := newCluster.VtctldClientProcess.ExecuteCommandWithOutput("SetKeyspaceDurabilityPolicy", keyspace.Name, "--durability-policy=semi_sync") + out, err := newCluster.ClusterInstance.VtctldClientProcess.ExecuteCommandWithOutput("SetKeyspaceDurabilityPolicy", keyspace.Name, "--durability-policy=semi_sync") require.NoError(t, err, out) // VTOrc should promote a new primary after seeing the durability policy change diff --git a/go/test/endtoend/vtorc/primaryfailure/main_test.go b/go/test/endtoend/vtorc/primaryfailure/main_test.go index 7d9c57b6b22..a3e50bd0cc9 100644 --- a/go/test/endtoend/vtorc/primaryfailure/main_test.go +++ b/go/test/endtoend/vtorc/primaryfailure/main_test.go @@ -32,7 +32,7 @@ func TestMain(m *testing.M) { var cellInfos []*utils.CellInfo cellInfos = append(cellInfos, &utils.CellInfo{ CellName: utils.Cell1, - NumReplicas: 12, + NumReplicas: 13, NumRdonly: 3, UIDBase: 100, }) diff --git a/go/test/endtoend/vtorc/primaryfailure/primary_failure_test.go b/go/test/endtoend/vtorc/primaryfailure/primary_failure_test.go index 9f1dd52d1d5..3072aa7b57b 100644 --- a/go/test/endtoend/vtorc/primaryfailure/primary_failure_test.go +++ b/go/test/endtoend/vtorc/primaryfailure/primary_failure_test.go @@ -97,6 +97,61 @@ func TestDownPrimary(t *testing.T) { utils.WaitForSuccessfulRecoveryCount(t, vtOrcProcess, logic.RecoverDeadPrimaryRecoveryName, 1) } +// bring down primary before VTOrc has started, let vtorc repair. +func TestDownPrimaryBeforeVTOrc(t *testing.T) { + defer cluster.PanicHandler(t) + utils.SetupVttabletsAndVTOrcs(t, clusterInfo, 2, 1, nil, cluster.VTOrcConfiguration{}, 0, "none") + keyspace := &clusterInfo.ClusterInstance.Keyspaces[0] + shard0 := &keyspace.Shards[0] + curPrimary := shard0.Vttablets[0] + + // Promote the first tablet as the primary + err := clusterInfo.ClusterInstance.VtctlclientProcess.InitializeShard(keyspace.Name, shard0.Name, clusterInfo.ClusterInstance.Cell, curPrimary.TabletUID) + require.NoError(t, err) + + // find the replica and rdonly tablets + var replica, rdonly *cluster.Vttablet + for _, tablet := range shard0.Vttablets { + // we know we have only two replcia tablets, so the one not the primary must be the other replica + if tablet.Alias != curPrimary.Alias && tablet.Type == "replica" { + replica = tablet + } + if tablet.Type == "rdonly" { + rdonly = tablet + } + } + assert.NotNil(t, replica, "could not find replica tablet") + assert.NotNil(t, rdonly, "could not find rdonly tablet") + + // check that the replication is setup correctly before we failover + utils.CheckReplication(t, clusterInfo, curPrimary, []*cluster.Vttablet{rdonly, replica}, 10*time.Second) + + // Make the current primary vttablet unavailable. + err = curPrimary.VttabletProcess.TearDown() + require.NoError(t, err) + err = curPrimary.MysqlctlProcess.Stop() + require.NoError(t, err) + + // Start a VTOrc instance + utils.StartVTOrcs(t, clusterInfo, []string{"--remote_operation_timeout=10s"}, cluster.VTOrcConfiguration{ + PreventCrossDataCenterPrimaryFailover: true, + }, 1) + + vtOrcProcess := clusterInfo.ClusterInstance.VTOrcProcesses[0] + + defer func() { + // we remove the tablet from our global list + utils.PermanentlyRemoveVttablet(clusterInfo, curPrimary) + }() + + // check that the replica gets promoted + utils.CheckPrimaryTablet(t, clusterInfo, replica, true) + + // also check that the replication is working correctly after failover + utils.VerifyWritesSucceed(t, clusterInfo, replica, []*cluster.Vttablet{rdonly}, 10*time.Second) + utils.WaitForSuccessfulRecoveryCount(t, vtOrcProcess, logic.RecoverDeadPrimaryRecoveryName, 1) +} + // TestDeadPrimaryRecoversImmediately test Vtorc ability to recover immediately if primary is dead. // Reason is, unlike other recoveries, in DeadPrimary we don't call DiscoverInstance since we know // that primary is unreachable. This help us save few seconds depending on value of `RemoteOperationTimeout` flag. diff --git a/go/test/endtoend/vtorc/utils/utils.go b/go/test/endtoend/vtorc/utils/utils.go index 17babdecc10..4d7352fd2a8 100644 --- a/go/test/endtoend/vtorc/utils/utils.go +++ b/go/test/endtoend/vtorc/utils/utils.go @@ -67,11 +67,10 @@ type CellInfo struct { // VTOrcClusterInfo stores the information for a cluster. This is supposed to be used only for VTOrc tests. type VTOrcClusterInfo struct { - ClusterInstance *cluster.LocalProcessCluster - Ts *topo.Server - CellInfos []*CellInfo - VtctldClientProcess *cluster.VtctldClientProcess - lastUsedValue int + ClusterInstance *cluster.LocalProcessCluster + Ts *topo.Server + CellInfos []*CellInfo + lastUsedValue int } // CreateClusterAndStartTopo starts the cluster and topology service @@ -100,17 +99,13 @@ func CreateClusterAndStartTopo(cellInfos []*CellInfo) (*VTOrcClusterInfo, error) return nil, err } - // store the vtctldclient process - vtctldClientProcess := cluster.VtctldClientProcessInstance("localhost", clusterInstance.VtctldProcess.GrpcPort, clusterInstance.TmpDirectory) - // create topo server connection ts, err := topo.OpenServer(*clusterInstance.TopoFlavorString(), clusterInstance.VtctlProcess.TopoGlobalAddress, clusterInstance.VtctlProcess.TopoGlobalRoot) return &VTOrcClusterInfo{ - ClusterInstance: clusterInstance, - Ts: ts, - CellInfos: cellInfos, - lastUsedValue: 100, - VtctldClientProcess: vtctldClientProcess, + ClusterInstance: clusterInstance, + Ts: ts, + CellInfos: cellInfos, + lastUsedValue: 100, }, err } @@ -307,7 +302,7 @@ func SetupVttabletsAndVTOrcs(t *testing.T, clusterInfo *VTOrcClusterInfo, numRep if durability == "" { durability = "none" } - out, err := clusterInfo.VtctldClientProcess.ExecuteCommandWithOutput("SetKeyspaceDurabilityPolicy", keyspaceName, fmt.Sprintf("--durability-policy=%s", durability)) + out, err := clusterInfo.ClusterInstance.VtctldClientProcess.ExecuteCommandWithOutput("SetKeyspaceDurabilityPolicy", keyspaceName, fmt.Sprintf("--durability-policy=%s", durability)) require.NoError(t, err, out) // start vtorc @@ -829,20 +824,17 @@ func SetupNewClusterSemiSync(t *testing.T) *VTOrcClusterInfo { require.NoError(t, err) } - vtctldClientProcess := cluster.VtctldClientProcessInstance("localhost", clusterInstance.VtctldProcess.GrpcPort, clusterInstance.TmpDirectory) - - out, err := vtctldClientProcess.ExecuteCommandWithOutput("SetKeyspaceDurabilityPolicy", keyspaceName, "--durability-policy=semi_sync") + out, err := clusterInstance.VtctldClientProcess.ExecuteCommandWithOutput("SetKeyspaceDurabilityPolicy", keyspaceName, "--durability-policy=semi_sync") require.NoError(t, err, out) // create topo server connection ts, err := topo.OpenServer(*clusterInstance.TopoFlavorString(), clusterInstance.VtctlProcess.TopoGlobalAddress, clusterInstance.VtctlProcess.TopoGlobalRoot) require.NoError(t, err) clusterInfo := &VTOrcClusterInfo{ - ClusterInstance: clusterInstance, - Ts: ts, - CellInfos: nil, - lastUsedValue: 100, - VtctldClientProcess: vtctldClientProcess, + ClusterInstance: clusterInstance, + Ts: ts, + CellInfos: nil, + lastUsedValue: 100, } return clusterInfo } From 25573d2685d71a84558b8d18d8efc49b6b4700c5 Mon Sep 17 00:00:00 2001 From: Manan Gupta Date: Mon, 29 May 2023 21:32:22 +0530 Subject: [PATCH 2/8] feat: fix the problem Signed-off-by: Manan Gupta --- .../primaryfailure/primary_failure_test.go | 3 +- go/vt/vtorc/inst/analysis.go | 1 + go/vt/vtorc/inst/analysis_dao.go | 38 +++++++++++- go/vt/vtorc/inst/analysis_dao_test.go | 60 +++++++++++++++++++ 4 files changed, 97 insertions(+), 5 deletions(-) diff --git a/go/test/endtoend/vtorc/primaryfailure/primary_failure_test.go b/go/test/endtoend/vtorc/primaryfailure/primary_failure_test.go index 3072aa7b57b..dc13ba8ef16 100644 --- a/go/test/endtoend/vtorc/primaryfailure/primary_failure_test.go +++ b/go/test/endtoend/vtorc/primaryfailure/primary_failure_test.go @@ -127,8 +127,7 @@ func TestDownPrimaryBeforeVTOrc(t *testing.T) { utils.CheckReplication(t, clusterInfo, curPrimary, []*cluster.Vttablet{rdonly, replica}, 10*time.Second) // Make the current primary vttablet unavailable. - err = curPrimary.VttabletProcess.TearDown() - require.NoError(t, err) + _ = curPrimary.VttabletProcess.Kill() err = curPrimary.MysqlctlProcess.Stop() require.NoError(t, err) diff --git a/go/vt/vtorc/inst/analysis.go b/go/vt/vtorc/inst/analysis.go index 777a02fb3c5..bd186ca894d 100644 --- a/go/vt/vtorc/inst/analysis.go +++ b/go/vt/vtorc/inst/analysis.go @@ -31,6 +31,7 @@ type StructureAnalysisCode string const ( NoProblem AnalysisCode = "NoProblem" ClusterHasNoPrimary AnalysisCode = "ClusterHasNoPrimary" + InvalidPrimary AnalysisCode = "InvalidPrimary" DeadPrimaryWithoutReplicas AnalysisCode = "DeadPrimaryWithoutReplicas" DeadPrimary AnalysisCode = "DeadPrimary" DeadPrimaryAndReplicas AnalysisCode = "DeadPrimaryAndReplicas" diff --git a/go/vt/vtorc/inst/analysis_dao.go b/go/vt/vtorc/inst/analysis_dao.go index 0a88bc9095d..72dadf8e49a 100644 --- a/go/vt/vtorc/inst/analysis_dao.go +++ b/go/vt/vtorc/inst/analysis_dao.go @@ -56,6 +56,7 @@ func initializeAnalysisDaoPostConfiguration() { type clusterAnalysis struct { hasClusterwideAction bool + totalTablets int primaryAlias string durability reparentutil.Durabler } @@ -406,6 +407,8 @@ func GetReplicationAnalysis(keyspace string, shard string, hints *ReplicationAna } // ca has clusterwide info ca := clusters[keyspaceShard] + // Increment the total amount of tablets. + ca.totalTablets += 1 if ca.hasClusterwideAction { // We can only take one cluster level action at a time. return nil @@ -415,10 +418,12 @@ func GetReplicationAnalysis(keyspace string, shard string, hints *ReplicationAna return nil } isInvalid := m.GetBool("is_invalid") - if isInvalid { + if a.IsClusterPrimary && isInvalid { + a.Analysis = InvalidPrimary + a.Description = "VTOrc hasn't been able to reach the primary even once" + } else if isInvalid { return nil - } - if a.IsClusterPrimary && !a.LastCheckValid && a.CountReplicas == 0 { + } else if a.IsClusterPrimary && !a.LastCheckValid && a.CountReplicas == 0 { a.Analysis = DeadPrimaryWithoutReplicas a.Description = "Primary cannot be reached by vtorc and has no replica" ca.hasClusterwideAction = true @@ -590,6 +595,33 @@ func GetReplicationAnalysis(keyspace string, shard string, hints *ReplicationAna return nil }) + for idx, analysis := range result { + if analysis.Analysis == InvalidPrimary { + keyspaceName := analysis.ClusterDetails.Keyspace + shardName := analysis.ClusterDetails.Shard + keyspaceShard := getKeyspaceShardName(keyspaceName, shardName) + totalReplicas := clusters[keyspaceShard].totalTablets - 1 + var notReplicatingReplicas []int + for i := idx + 1; i < len(result); i++ { + replicaAnalysis := result[i] + if replicaAnalysis.ClusterDetails.Keyspace == keyspaceName && + replicaAnalysis.ClusterDetails.Shard == shardName && + topo.IsReplicaType(replicaAnalysis.TabletType) && + !replicaAnalysis.IsPrimary && replicaAnalysis.ReplicationStopped { + notReplicatingReplicas = append(notReplicatingReplicas, i) + } + } + if len(notReplicatingReplicas) == totalReplicas && totalReplicas > 0 { + analysis.Analysis = DeadPrimary + result[idx] = analysis + for i := len(notReplicatingReplicas) - 1; i >= 0; i-- { + idxToRemove := notReplicatingReplicas[i] + result = append(result[0:idxToRemove], result[idxToRemove+1:]...) + } + } + } + } + if err != nil { log.Error(err) } diff --git a/go/vt/vtorc/inst/analysis_dao_test.go b/go/vt/vtorc/inst/analysis_dao_test.go index 8bce5049ca8..fa8971fc0bc 100644 --- a/go/vt/vtorc/inst/analysis_dao_test.go +++ b/go/vt/vtorc/inst/analysis_dao_test.go @@ -567,6 +567,66 @@ func TestGetReplicationAnalysisDecision(t *testing.T) { keyspaceWanted: "ks", shardWanted: "0", codeWanted: NoProblem, + }, { + name: "DeadPrimary when VTOrc is starting up", + info: []*test.InfoForRecoveryAnalysis{{ + TabletInfo: &topodatapb.Tablet{ + Alias: &topodatapb.TabletAlias{Cell: "zon1", Uid: 101}, + Hostname: "localhost", + Keyspace: "ks", + Shard: "0", + Type: topodatapb.TabletType_PRIMARY, + MysqlHostname: "localhost", + MysqlPort: 6708, + }, + DurabilityPolicy: "none", + IsInvalid: 1, + }, { + TabletInfo: &topodatapb.Tablet{ + Alias: &topodatapb.TabletAlias{Cell: "zon1", Uid: 100}, + Hostname: "localhost", + Keyspace: "ks", + Shard: "0", + Type: topodatapb.TabletType_REPLICA, + MysqlHostname: "localhost", + MysqlPort: 6709, + }, + LastCheckValid: 1, + ReplicationStopped: 1, + }, { + TabletInfo: &topodatapb.Tablet{ + Alias: &topodatapb.TabletAlias{Cell: "zon1", Uid: 103}, + Hostname: "localhost", + Keyspace: "ks", + Shard: "0", + Type: topodatapb.TabletType_REPLICA, + MysqlHostname: "localhost", + MysqlPort: 6710, + }, + LastCheckValid: 1, + ReplicationStopped: 1, + }}, + keyspaceWanted: "ks", + shardWanted: "0", + codeWanted: DeadPrimary, + }, { + name: "Invalid Primary", + info: []*test.InfoForRecoveryAnalysis{{ + TabletInfo: &topodatapb.Tablet{ + Alias: &topodatapb.TabletAlias{Cell: "zon1", Uid: 101}, + Hostname: "localhost", + Keyspace: "ks", + Shard: "0", + Type: topodatapb.TabletType_PRIMARY, + MysqlHostname: "localhost", + MysqlPort: 6708, + }, + DurabilityPolicy: "none", + IsInvalid: 1, + }}, + keyspaceWanted: "ks", + shardWanted: "0", + codeWanted: InvalidPrimary, }, } for _, tt := range tests { From 426f7df58751b2517734bff96ed06cde95ec7208 Mon Sep 17 00:00:00 2001 From: Manan Gupta Date: Tue, 20 Jun 2023 12:09:38 +0530 Subject: [PATCH 3/8] feat: read vttablet records for instances that have no mysql port too Signed-off-by: Manan Gupta --- .../endtoend/vtorc/primaryfailure/primary_failure_test.go | 2 +- go/vt/vtorc/logic/tablet_discovery.go | 5 ----- 2 files changed, 1 insertion(+), 6 deletions(-) diff --git a/go/test/endtoend/vtorc/primaryfailure/primary_failure_test.go b/go/test/endtoend/vtorc/primaryfailure/primary_failure_test.go index dc13ba8ef16..d7a54fb421f 100644 --- a/go/test/endtoend/vtorc/primaryfailure/primary_failure_test.go +++ b/go/test/endtoend/vtorc/primaryfailure/primary_failure_test.go @@ -127,7 +127,7 @@ func TestDownPrimaryBeforeVTOrc(t *testing.T) { utils.CheckReplication(t, clusterInfo, curPrimary, []*cluster.Vttablet{rdonly, replica}, 10*time.Second) // Make the current primary vttablet unavailable. - _ = curPrimary.VttabletProcess.Kill() + _ = curPrimary.VttabletProcess.TearDown() err = curPrimary.MysqlctlProcess.Stop() require.NoError(t, err) diff --git a/go/vt/vtorc/logic/tablet_discovery.go b/go/vt/vtorc/logic/tablet_discovery.go index 30827036044..0abdf0fff44 100644 --- a/go/vt/vtorc/logic/tablet_discovery.go +++ b/go/vt/vtorc/logic/tablet_discovery.go @@ -195,8 +195,6 @@ func refreshTabletsInKeyspaceShard(ctx context.Context, keyspace, shard string, func refreshTablets(tablets map[string]*topo.TabletInfo, query string, args []any, loader func(tabletAlias string), forceRefresh bool, tabletsToIgnore []string) { // Discover new tablets. - // TODO(sougou): enhance this to work with multi-schema, - // where each instanceKey can have multiple tablets. latestInstances := make(map[string]bool) var wg sync.WaitGroup for _, tabletInfo := range tablets { @@ -206,9 +204,6 @@ func refreshTablets(tablets map[string]*topo.TabletInfo, query string, args []an } tabletAliasString := topoproto.TabletAliasString(tablet.Alias) latestInstances[tabletAliasString] = true - if tablet.MysqlHostname == "" { - continue - } old, err := inst.ReadTablet(tabletAliasString) if err != nil && err != inst.ErrTabletAliasNil { log.Error(err) From 0da9abf79fe79e34a93bfa3fb16e190d6a14e3f7 Mon Sep 17 00:00:00 2001 From: Manan Gupta Date: Tue, 20 Jun 2023 12:36:16 +0530 Subject: [PATCH 4/8] feat: refactor the code Signed-off-by: Manan Gupta --- go/vt/vtorc/inst/analysis_dao.go | 91 ++++++++++++--------- go/vt/vtorc/inst/analysis_dao_test.go | 7 +- go/vt/vtorc/logic/topology_recovery.go | 42 +++++----- go/vt/vtorc/logic/topology_recovery_test.go | 4 +- 4 files changed, 81 insertions(+), 63 deletions(-) diff --git a/go/vt/vtorc/inst/analysis_dao.go b/go/vt/vtorc/inst/analysis_dao.go index 72dadf8e49a..8409142c378 100644 --- a/go/vt/vtorc/inst/analysis_dao.go +++ b/go/vt/vtorc/inst/analysis_dao.go @@ -62,8 +62,14 @@ type clusterAnalysis struct { } // GetReplicationAnalysis will check for replication problems (dead primary; unreachable primary; etc) -func GetReplicationAnalysis(keyspace string, shard string, hints *ReplicationAnalysisHints) ([]ReplicationAnalysis, error) { - result := []ReplicationAnalysis{} +func GetReplicationAnalysis(keyspace string, shard string, hints *ReplicationAnalysisHints) ([]*ReplicationAnalysis, error) { + var result []*ReplicationAnalysis + appendAnalysis := func(analysis *ReplicationAnalysis) { + if analysis.Analysis == NoProblem && len(analysis.StructureAnalysis) == 0 { + return + } + result = append(result, analysis) + } // TODO(sougou); deprecate ReduceReplicationAnalysisCount args := sqlutils.Args(config.Config.ReasonableReplicationLagSeconds, ValidSecondsFromSeenToLastAttemptedCheck(), config.Config.ReasonableReplicationLagSeconds, keyspace, shard) @@ -287,7 +293,7 @@ func GetReplicationAnalysis(keyspace string, shard string, hints *ReplicationAna clusters := make(map[string]*clusterAnalysis) err := db.Db.QueryVTOrc(query, args, func(m sqlutils.RowMap) error { - a := ReplicationAnalysis{ + a := &ReplicationAnalysis{ Analysis: NoProblem, ProcessingNodeHostname: process.ThisHostname, ProcessingNodeToken: util.ProcessToken.Hash, @@ -537,13 +543,6 @@ func GetReplicationAnalysis(keyspace string, shard string, hints *ReplicationAna // a.Description = "Primary has no replicas" // } - appendAnalysis := func(analysis *ReplicationAnalysis) { - if a.Analysis == NoProblem && len(a.StructureAnalysis) == 0 { - return - } - result = append(result, a) - } - { // Moving on to structure analysis // We also do structural checks. See if there's potential danger in promotions @@ -584,7 +583,7 @@ func GetReplicationAnalysis(keyspace string, shard string, hints *ReplicationAna a.StructureAnalysis = append(a.StructureAnalysis, NotEnoughValidSemiSyncReplicasStructureWarning) } } - appendAnalysis(&a) + appendAnalysis(a) if a.CountReplicas > 0 && hints.AuditAnalysis { // Interesting enough for analysis @@ -595,32 +594,7 @@ func GetReplicationAnalysis(keyspace string, shard string, hints *ReplicationAna return nil }) - for idx, analysis := range result { - if analysis.Analysis == InvalidPrimary { - keyspaceName := analysis.ClusterDetails.Keyspace - shardName := analysis.ClusterDetails.Shard - keyspaceShard := getKeyspaceShardName(keyspaceName, shardName) - totalReplicas := clusters[keyspaceShard].totalTablets - 1 - var notReplicatingReplicas []int - for i := idx + 1; i < len(result); i++ { - replicaAnalysis := result[i] - if replicaAnalysis.ClusterDetails.Keyspace == keyspaceName && - replicaAnalysis.ClusterDetails.Shard == shardName && - topo.IsReplicaType(replicaAnalysis.TabletType) && - !replicaAnalysis.IsPrimary && replicaAnalysis.ReplicationStopped { - notReplicatingReplicas = append(notReplicatingReplicas, i) - } - } - if len(notReplicatingReplicas) == totalReplicas && totalReplicas > 0 { - analysis.Analysis = DeadPrimary - result[idx] = analysis - for i := len(notReplicatingReplicas) - 1; i >= 0; i-- { - idxToRemove := notReplicatingReplicas[i] - result = append(result[0:idxToRemove], result[idxToRemove+1:]...) - } - } - } - } + result = postProcessAnalyses(result, clusters) if err != nil { log.Error(err) @@ -629,6 +603,49 @@ func GetReplicationAnalysis(keyspace string, shard string, hints *ReplicationAna return result, err } +// postProcessAnalyses is used to update different analyses based on the information gleaned from looking at all the analyses together instead of individual data. +func postProcessAnalyses(result []*ReplicationAnalysis, clusters map[string]*clusterAnalysis) []*ReplicationAnalysis { + for { + // Store whether we have changed the result of replication analysis or not. + resultChanged := false + + // Go over all the analysis. + for _, analysis := range result { + // If one of them is an InvalidPrimary, then we see if all the other tablets in this keyspace shard are + // unable to replicate or not. + if analysis.Analysis == InvalidPrimary { + keyspaceName := analysis.ClusterDetails.Keyspace + shardName := analysis.ClusterDetails.Shard + keyspaceShard := getKeyspaceShardName(keyspaceName, shardName) + totalReplicas := clusters[keyspaceShard].totalTablets - 1 + var notReplicatingReplicas []int + for idx, replicaAnalysis := range result { + if replicaAnalysis.ClusterDetails.Keyspace == keyspaceName && + replicaAnalysis.ClusterDetails.Shard == shardName && + topo.IsReplicaType(replicaAnalysis.TabletType) && replicaAnalysis.ReplicationStopped { + notReplicatingReplicas = append(notReplicatingReplicas, idx) + } + } + // If none of the other tablets are able to replicate, then we conclude that this primary is not just Invalid, but also Dead. + // In this case, we update the analysis for the primary tablet and remove all the analyses of the replicas. + if totalReplicas > 0 && len(notReplicatingReplicas) == totalReplicas { + resultChanged = true + analysis.Analysis = DeadPrimary + for i := len(notReplicatingReplicas) - 1; i >= 0; i-- { + idxToRemove := notReplicatingReplicas[i] + result = append(result[0:idxToRemove], result[idxToRemove+1:]...) + } + break + } + } + } + if !resultChanged { + break + } + } + return result +} + // auditInstanceAnalysisInChangelog will write down an instance's analysis in the database_instance_analysis_changelog table. // To not repeat recurring analysis code, the database_instance_last_analysis table is used, so that only changes to // analysis codes are written. diff --git a/go/vt/vtorc/inst/analysis_dao_test.go b/go/vt/vtorc/inst/analysis_dao_test.go index fa8971fc0bc..a3ee02023a4 100644 --- a/go/vt/vtorc/inst/analysis_dao_test.go +++ b/go/vt/vtorc/inst/analysis_dao_test.go @@ -695,9 +695,10 @@ func TestGetReplicationAnalysis(t *testing.T) { }, // As long as we have the vitess record stating that this tablet is the primary // It would be incorrect to run a PRS. - // This situation only happens when we haven't been able to read the MySQL information even once for this tablet. - // So it is likely a new tablet. - codeWanted: NoProblem, + // We should still flag this tablet as Invalid. + codeWanted: InvalidPrimary, + keyspaceWanted: "ks", + shardWanted: "0", }, { name: "Removing Replica Tablet's MySQL record", sql: []string{ diff --git a/go/vt/vtorc/logic/topology_recovery.go b/go/vt/vtorc/logic/topology_recovery.go index 611636c6e20..7fca7c67909 100644 --- a/go/vt/vtorc/logic/topology_recovery.go +++ b/go/vt/vtorc/logic/topology_recovery.go @@ -195,8 +195,8 @@ func resolveRecovery(topologyRecovery *TopologyRecovery, successorInstance *inst } // recoverPrimaryHasPrimary resets the replication on the primary instance -func recoverPrimaryHasPrimary(ctx context.Context, analysisEntry inst.ReplicationAnalysis) (recoveryAttempted bool, topologyRecovery *TopologyRecovery, err error) { - topologyRecovery, err = AttemptRecoveryRegistration(&analysisEntry, false, true) +func recoverPrimaryHasPrimary(ctx context.Context, analysisEntry *inst.ReplicationAnalysis) (recoveryAttempted bool, topologyRecovery *TopologyRecovery, err error) { + topologyRecovery, err = AttemptRecoveryRegistration(analysisEntry, false, true) if topologyRecovery == nil { _ = AuditTopologyRecovery(topologyRecovery, fmt.Sprintf("found an active or recent recovery on %+v. Will not issue another fixPrimaryHasPrimary.", analysisEntry.AnalyzedInstanceAlias)) return false, nil, err @@ -218,7 +218,7 @@ func recoverPrimaryHasPrimary(ctx context.Context, analysisEntry inst.Replicatio // recoverDeadPrimary checks a given analysis, decides whether to take action, and possibly takes action // Returns true when action was taken. -func recoverDeadPrimary(ctx context.Context, analysisEntry inst.ReplicationAnalysis) (recoveryAttempted bool, topologyRecovery *TopologyRecovery, err error) { +func recoverDeadPrimary(ctx context.Context, analysisEntry *inst.ReplicationAnalysis) (recoveryAttempted bool, topologyRecovery *TopologyRecovery, err error) { if !analysisEntry.ClusterDetails.HasAutomatedPrimaryRecovery { return false, nil, nil } @@ -229,7 +229,7 @@ func recoverDeadPrimary(ctx context.Context, analysisEntry inst.ReplicationAnaly return false, nil, err } - topologyRecovery, err = AttemptRecoveryRegistration(&analysisEntry, true, true) + topologyRecovery, err = AttemptRecoveryRegistration(analysisEntry, true, true) if topologyRecovery == nil { _ = AuditTopologyRecovery(topologyRecovery, fmt.Sprintf("found an active or recent recovery on %+v. Will not issue another RecoverDeadPrimary.", analysisEntry.AnalyzedInstanceAlias)) return false, nil, err @@ -275,7 +275,7 @@ func recoverDeadPrimary(ctx context.Context, analysisEntry inst.ReplicationAnaly return true, topologyRecovery, err } -func postErsCompletion(topologyRecovery *TopologyRecovery, analysisEntry inst.ReplicationAnalysis, promotedReplica *inst.Instance) { +func postErsCompletion(topologyRecovery *TopologyRecovery, analysisEntry *inst.ReplicationAnalysis, promotedReplica *inst.Instance) { if promotedReplica != nil { message := fmt.Sprintf("promoted replica: %+v", promotedReplica.InstanceAlias) _ = AuditTopologyRecovery(topologyRecovery, message) @@ -285,12 +285,12 @@ func postErsCompletion(topologyRecovery *TopologyRecovery, analysisEntry inst.Re } // checkAndRecoverGenericProblem is a general-purpose recovery function -func checkAndRecoverLockedSemiSyncPrimary(ctx context.Context, analysisEntry inst.ReplicationAnalysis) (recoveryAttempted bool, topologyRecovery *TopologyRecovery, err error) { +func checkAndRecoverLockedSemiSyncPrimary(ctx context.Context, analysisEntry *inst.ReplicationAnalysis) (recoveryAttempted bool, topologyRecovery *TopologyRecovery, err error) { return false, nil, nil } // checkAndRecoverGenericProblem is a general-purpose recovery function -func checkAndRecoverGenericProblem(ctx context.Context, analysisEntry inst.ReplicationAnalysis) (bool, *TopologyRecovery, error) { +func checkAndRecoverGenericProblem(ctx context.Context, analysisEntry *inst.ReplicationAnalysis) (bool, *TopologyRecovery, error) { return false, nil, nil } @@ -370,8 +370,8 @@ func emergentlyRecordStaleBinlogCoordinates(tabletAlias string, binlogCoordinate // checkAndExecuteFailureDetectionProcesses tries to register for failure detection and potentially executes // failure-detection processes. -func checkAndExecuteFailureDetectionProcesses(analysisEntry inst.ReplicationAnalysis) (detectionRegistrationSuccess bool, processesExecutionAttempted bool, err error) { - if ok, _ := AttemptFailureDetectionRegistration(&analysisEntry); !ok { +func checkAndExecuteFailureDetectionProcesses(analysisEntry *inst.ReplicationAnalysis) (detectionRegistrationSuccess bool, processesExecutionAttempted bool, err error) { + if ok, _ := AttemptFailureDetectionRegistration(analysisEntry); !ok { if util.ClearToLog("checkAndExecuteFailureDetectionProcesses", analysisEntry.AnalyzedInstanceAlias) { log.Infof("checkAndExecuteFailureDetectionProcesses: could not register %+v detection on %+v", analysisEntry.Analysis, analysisEntry.AnalyzedInstanceAlias) } @@ -455,7 +455,7 @@ func hasActionableRecovery(recoveryFunctionCode recoveryFunction) bool { // getCheckAndRecoverFunction gets the recovery function for the given code. func getCheckAndRecoverFunction(recoveryFunctionCode recoveryFunction) ( - checkAndRecoverFunction func(ctx context.Context, analysisEntry inst.ReplicationAnalysis) (recoveryAttempted bool, topologyRecovery *TopologyRecovery, err error), + checkAndRecoverFunction func(ctx context.Context, analysisEntry *inst.ReplicationAnalysis) (recoveryAttempted bool, topologyRecovery *TopologyRecovery, err error), ) { switch recoveryFunctionCode { case noRecoveryFunc: @@ -515,7 +515,7 @@ func isClusterWideRecovery(recoveryFunctionCode recoveryFunction) bool { } // analysisEntriesHaveSameRecovery tells whether the two analysis entries have the same recovery function or not -func analysisEntriesHaveSameRecovery(prevAnalysis, newAnalysis inst.ReplicationAnalysis) bool { +func analysisEntriesHaveSameRecovery(prevAnalysis, newAnalysis *inst.ReplicationAnalysis) bool { prevRecoveryFunctionCode := getCheckAndRecoverFunctionCode(prevAnalysis.Analysis, prevAnalysis.AnalyzedInstanceAlias) newRecoveryFunctionCode := getCheckAndRecoverFunctionCode(newAnalysis.Analysis, newAnalysis.AnalyzedInstanceAlias) return prevRecoveryFunctionCode == newRecoveryFunctionCode @@ -542,14 +542,14 @@ func runEmergentOperations(analysisEntry *inst.ReplicationAnalysis) { // executeCheckAndRecoverFunction will choose the correct check & recovery function based on analysis. // It executes the function synchronuously -func executeCheckAndRecoverFunction(analysisEntry inst.ReplicationAnalysis) (err error) { +func executeCheckAndRecoverFunction(analysisEntry *inst.ReplicationAnalysis) (err error) { countPendingRecoveries.Add(1) defer countPendingRecoveries.Add(-1) checkAndRecoverFunctionCode := getCheckAndRecoverFunctionCode(analysisEntry.Analysis, analysisEntry.AnalyzedInstanceAlias) isActionableRecovery := hasActionableRecovery(checkAndRecoverFunctionCode) analysisEntry.IsActionableRecovery = isActionableRecovery - runEmergentOperations(&analysisEntry) + runEmergentOperations(analysisEntry) if checkAndRecoverFunctionCode == noRecoveryFunc { // Unhandled problem type @@ -702,7 +702,7 @@ func executeCheckAndRecoverFunction(analysisEntry inst.ReplicationAnalysis) (err } // checkIfAlreadyFixed checks whether the problem that the analysis entry represents has already been fixed by another agent or not -func checkIfAlreadyFixed(analysisEntry inst.ReplicationAnalysis) (bool, error) { +func checkIfAlreadyFixed(analysisEntry *inst.ReplicationAnalysis) (bool, error) { // Run a replication analysis again. We will check if the problem persisted analysisEntries, err := inst.GetReplicationAnalysis(analysisEntry.ClusterDetails.Keyspace, analysisEntry.ClusterDetails.Shard, &inst.ReplicationAnalysisHints{}) if err != nil { @@ -742,7 +742,7 @@ func CheckAndRecover() { } } -func postPrsCompletion(topologyRecovery *TopologyRecovery, analysisEntry inst.ReplicationAnalysis, promotedReplica *inst.Instance) { +func postPrsCompletion(topologyRecovery *TopologyRecovery, analysisEntry *inst.ReplicationAnalysis, promotedReplica *inst.Instance) { if promotedReplica != nil { message := fmt.Sprintf("promoted replica: %+v", promotedReplica.InstanceAlias) _ = AuditTopologyRecovery(topologyRecovery, message) @@ -752,8 +752,8 @@ func postPrsCompletion(topologyRecovery *TopologyRecovery, analysisEntry inst.Re } // electNewPrimary elects a new primary while none were present before. -func electNewPrimary(ctx context.Context, analysisEntry inst.ReplicationAnalysis) (recoveryAttempted bool, topologyRecovery *TopologyRecovery, err error) { - topologyRecovery, err = AttemptRecoveryRegistration(&analysisEntry, false /*failIfFailedInstanceInActiveRecovery*/, true /*failIfClusterInActiveRecovery*/) +func electNewPrimary(ctx context.Context, analysisEntry *inst.ReplicationAnalysis) (recoveryAttempted bool, topologyRecovery *TopologyRecovery, err error) { + topologyRecovery, err = AttemptRecoveryRegistration(analysisEntry, false /*failIfFailedInstanceInActiveRecovery*/, true /*failIfClusterInActiveRecovery*/) if topologyRecovery == nil || err != nil { _ = AuditTopologyRecovery(topologyRecovery, fmt.Sprintf("found an active or recent recovery on %+v. Will not issue another electNewPrimary.", analysisEntry.AnalyzedInstanceAlias)) return false, nil, err @@ -800,8 +800,8 @@ func electNewPrimary(ctx context.Context, analysisEntry inst.ReplicationAnalysis } // fixPrimary sets the primary as read-write. -func fixPrimary(ctx context.Context, analysisEntry inst.ReplicationAnalysis) (recoveryAttempted bool, topologyRecovery *TopologyRecovery, err error) { - topologyRecovery, err = AttemptRecoveryRegistration(&analysisEntry, false, true) +func fixPrimary(ctx context.Context, analysisEntry *inst.ReplicationAnalysis) (recoveryAttempted bool, topologyRecovery *TopologyRecovery, err error) { + topologyRecovery, err = AttemptRecoveryRegistration(analysisEntry, false, true) if topologyRecovery == nil { _ = AuditTopologyRecovery(topologyRecovery, fmt.Sprintf("found an active or recent recovery on %+v. Will not issue another fixPrimary.", analysisEntry.AnalyzedInstanceAlias)) return false, nil, err @@ -831,8 +831,8 @@ func fixPrimary(ctx context.Context, analysisEntry inst.ReplicationAnalysis) (re } // fixReplica sets the replica as read-only and points it at the current primary. -func fixReplica(ctx context.Context, analysisEntry inst.ReplicationAnalysis) (recoveryAttempted bool, topologyRecovery *TopologyRecovery, err error) { - topologyRecovery, err = AttemptRecoveryRegistration(&analysisEntry, false, true) +func fixReplica(ctx context.Context, analysisEntry *inst.ReplicationAnalysis) (recoveryAttempted bool, topologyRecovery *TopologyRecovery, err error) { + topologyRecovery, err = AttemptRecoveryRegistration(analysisEntry, false, true) if topologyRecovery == nil { _ = AuditTopologyRecovery(topologyRecovery, fmt.Sprintf("found an active or recent recovery on %+v. Will not issue another fixReplica.", analysisEntry.AnalyzedInstanceAlias)) return false, nil, err diff --git a/go/vt/vtorc/logic/topology_recovery_test.go b/go/vt/vtorc/logic/topology_recovery_test.go index 73fa3929eec..9abf34a2add 100644 --- a/go/vt/vtorc/logic/topology_recovery_test.go +++ b/go/vt/vtorc/logic/topology_recovery_test.go @@ -87,7 +87,7 @@ func TestAnalysisEntriesHaveSameRecovery(t *testing.T) { t.Parallel() for _, tt := range tests { t.Run(string(tt.prevAnalysisCode)+","+string(tt.newAnalysisCode), func(t *testing.T) { - res := analysisEntriesHaveSameRecovery(inst.ReplicationAnalysis{Analysis: tt.prevAnalysisCode}, inst.ReplicationAnalysis{Analysis: tt.newAnalysisCode}) + res := analysisEntriesHaveSameRecovery(&inst.ReplicationAnalysis{Analysis: tt.prevAnalysisCode}, &inst.ReplicationAnalysis{Analysis: tt.newAnalysisCode}) require.Equal(t, tt.shouldBeEqual, res) }) } @@ -117,7 +117,7 @@ func TestElectNewPrimaryPanic(t *testing.T) { } err = inst.SaveTablet(tablet) require.NoError(t, err) - analysisEntry := inst.ReplicationAnalysis{ + analysisEntry := &inst.ReplicationAnalysis{ AnalyzedInstanceAlias: topoproto.TabletAliasString(tablet.Alias), } ts = memorytopo.NewServer("zone1") From 93dd79f7e691fda2adc3bd97708068f560965b08 Mon Sep 17 00:00:00 2001 From: Manan Gupta Date: Tue, 20 Jun 2023 12:49:39 +0530 Subject: [PATCH 5/8] feat: add tests for the newly introduced function Signed-off-by: Manan Gupta --- go/vt/vtorc/inst/analysis_dao_test.go | 139 +++++++++++++++++++++++++- 1 file changed, 138 insertions(+), 1 deletion(-) diff --git a/go/vt/vtorc/inst/analysis_dao_test.go b/go/vt/vtorc/inst/analysis_dao_test.go index a3ee02023a4..b4d869b601f 100644 --- a/go/vt/vtorc/inst/analysis_dao_test.go +++ b/go/vt/vtorc/inst/analysis_dao_test.go @@ -25,7 +25,6 @@ import ( "github.com/stretchr/testify/require" "vitess.io/vitess/go/vt/external/golib/sqlutils" - topodatapb "vitess.io/vitess/go/vt/proto/topodata" "vitess.io/vitess/go/vt/vtorc/db" "vitess.io/vitess/go/vt/vtorc/test" @@ -810,3 +809,141 @@ func TestAuditInstanceAnalysisInChangelog(t *testing.T) { }) } } + +// TestPostProcessAnalyses tests the functionality of the postProcessAnalyses function. +func TestPostProcessAnalyses(t *testing.T) { + ks0 := ClusterInfo{ + Keyspace: "ks", + Shard: "0", + CountInstances: 3, + } + ks80 := ClusterInfo{ + Keyspace: "ks", + Shard: "80-", + CountInstances: 3, + } + clusters := map[string]*clusterAnalysis{ + getKeyspaceShardName(ks0.Keyspace, ks0.Shard): { + totalTablets: int(ks0.CountInstances), + }, + getKeyspaceShardName(ks80.Keyspace, ks80.Shard): { + totalTablets: int(ks80.CountInstances), + }, + } + + tests := []struct { + name string + analyses []*ReplicationAnalysis + want []*ReplicationAnalysis + }{ + { + name: "No processing needed", + analyses: []*ReplicationAnalysis{ + { + Analysis: ReplicationStopped, + TabletType: topodatapb.TabletType_REPLICA, + ClusterDetails: ks0, + }, { + Analysis: ReplicaSemiSyncMustBeSet, + TabletType: topodatapb.TabletType_REPLICA, + ClusterDetails: ks0, + }, { + Analysis: PrimaryHasPrimary, + TabletType: topodatapb.TabletType_REPLICA, + ClusterDetails: ks0, + }, + }, + }, { + name: "Conversion of InvalidPrimary to DeadPrimary", + analyses: []*ReplicationAnalysis{ + { + Analysis: InvalidPrimary, + AnalyzedInstanceAlias: "zone1-100", + TabletType: topodatapb.TabletType_PRIMARY, + ClusterDetails: ks0, + }, { + Analysis: NoProblem, + AnalyzedInstanceAlias: "zone1-202", + TabletType: topodatapb.TabletType_RDONLY, + ClusterDetails: ks80, + }, { + Analysis: ConnectedToWrongPrimary, + AnalyzedInstanceAlias: "zone1-101", + TabletType: topodatapb.TabletType_REPLICA, + ReplicationStopped: true, + ClusterDetails: ks0, + }, { + Analysis: ReplicationStopped, + AnalyzedInstanceAlias: "zone1-102", + TabletType: topodatapb.TabletType_RDONLY, + ReplicationStopped: true, + ClusterDetails: ks0, + }, { + Analysis: NoProblem, + AnalyzedInstanceAlias: "zone1-302", + TabletType: topodatapb.TabletType_REPLICA, + ClusterDetails: ks80, + }, + }, + want: []*ReplicationAnalysis{ + { + Analysis: DeadPrimary, + AnalyzedInstanceAlias: "zone1-100", + TabletType: topodatapb.TabletType_PRIMARY, + ClusterDetails: ks0, + }, { + Analysis: NoProblem, + AnalyzedInstanceAlias: "zone1-202", + TabletType: topodatapb.TabletType_RDONLY, + ClusterDetails: ks80, + }, { + Analysis: NoProblem, + AnalyzedInstanceAlias: "zone1-302", + TabletType: topodatapb.TabletType_REPLICA, + ClusterDetails: ks80, + }, + }, + }, + { + name: "Unable to convert InvalidPrimary to DeadPrimary", + analyses: []*ReplicationAnalysis{ + { + Analysis: InvalidPrimary, + AnalyzedInstanceAlias: "zone1-100", + TabletType: topodatapb.TabletType_PRIMARY, + ClusterDetails: ks0, + }, { + Analysis: NoProblem, + AnalyzedInstanceAlias: "zone1-202", + TabletType: topodatapb.TabletType_RDONLY, + ClusterDetails: ks80, + }, { + Analysis: NoProblem, + AnalyzedInstanceAlias: "zone1-101", + TabletType: topodatapb.TabletType_REPLICA, + ClusterDetails: ks0, + }, { + Analysis: ReplicationStopped, + AnalyzedInstanceAlias: "zone1-102", + TabletType: topodatapb.TabletType_RDONLY, + ReplicationStopped: true, + ClusterDetails: ks0, + }, { + Analysis: NoProblem, + AnalyzedInstanceAlias: "zone1-302", + TabletType: topodatapb.TabletType_REPLICA, + ClusterDetails: ks80, + }, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if tt.want == nil { + tt.want = tt.analyses + } + result := postProcessAnalyses(tt.analyses, clusters) + require.ElementsMatch(t, tt.want, result) + }) + } +} From 42d8993d1b17185b07266f128112e7d51b47e6b5 Mon Sep 17 00:00:00 2001 From: Manan Gupta Date: Tue, 20 Jun 2023 14:30:02 +0530 Subject: [PATCH 6/8] test: fix test expectations Signed-off-by: Manan Gupta --- go/test/endtoend/vtorc/api/api_test.go | 6 +++--- go/vt/vtorc/logic/tablet_discovery_test.go | 16 +++++++++++----- 2 files changed, 14 insertions(+), 8 deletions(-) diff --git a/go/test/endtoend/vtorc/api/api_test.go b/go/test/endtoend/vtorc/api/api_test.go index ebb39210572..065124b4e39 100644 --- a/go/test/endtoend/vtorc/api/api_test.go +++ b/go/test/endtoend/vtorc/api/api_test.go @@ -92,7 +92,7 @@ func TestAPIEndpoints(t *testing.T) { // Before we disable recoveries, let us wait until VTOrc has fixed all the issues (if any). _, _ = utils.MakeAPICallRetry(t, vtorc, "/api/replication-analysis", func(_ int, response string) bool { - return response != "[]" + return response != "null" }) t.Run("Disable Recoveries API", func(t *testing.T) { @@ -112,7 +112,7 @@ func TestAPIEndpoints(t *testing.T) { // Wait until VTOrc picks up on this issue and verify // that we see a not null result on the api/replication-analysis page status, resp := utils.MakeAPICallRetry(t, vtorc, "/api/replication-analysis", func(_ int, response string) bool { - return response == "[]" + return response == "null" }) assert.Equal(t, 200, status, resp) assert.Contains(t, resp, fmt.Sprintf(`"AnalyzedInstanceAlias": "%s"`, replica.Alias)) @@ -134,7 +134,7 @@ func TestAPIEndpoints(t *testing.T) { status, resp, err = utils.MakeAPICall(t, vtorc, "/api/replication-analysis?keyspace=ks&shard=80-") require.NoError(t, err) assert.Equal(t, 200, status, resp) - assert.Equal(t, "[]", resp) + assert.Equal(t, "null", resp) // Check that filtering using just the shard fails status, resp, err = utils.MakeAPICall(t, vtorc, "/api/replication-analysis?shard=0") diff --git a/go/vt/vtorc/logic/tablet_discovery_test.go b/go/vt/vtorc/logic/tablet_discovery_test.go index 1166dd2e40d..9a42d80ecc9 100644 --- a/go/vt/vtorc/logic/tablet_discovery_test.go +++ b/go/vt/vtorc/logic/tablet_discovery_test.go @@ -142,22 +142,28 @@ func TestRefreshTabletsInKeyspaceShard(t *testing.T) { }) t.Run("tablet shutdown removes mysql hostname and port. We shouldn't forget the tablet", func(t *testing.T) { + startPort := tab100.MysqlPort + startHostname := tab100.MysqlHostname defer func() { + tab100.MysqlPort = startPort + tab100.MysqlHostname = startHostname _, err = ts.UpdateTabletFields(context.Background(), tab100.Alias, func(tablet *topodatapb.Tablet) error { - tablet.MysqlHostname = hostname - tablet.MysqlPort = 100 + tablet.MysqlHostname = startHostname + tablet.MysqlPort = startPort return nil }) }() - // Let's assume tab100 shutdown. This would clear its tablet hostname and port + // Let's assume tab100 shutdown. This would clear its tablet hostname and port. + tab100.MysqlPort = 0 + tab100.MysqlHostname = "" _, err = ts.UpdateTabletFields(context.Background(), tab100.Alias, func(tablet *topodatapb.Tablet) error { tablet.MysqlHostname = "" tablet.MysqlPort = 0 return nil }) require.NoError(t, err) - // We expect no tablets to be refreshed. Also, tab100 shouldn't be forgotten - verifyRefreshTabletsInKeyspaceShard(t, false, 0, tablets, nil) + // tab100 shouldn't be forgotten + verifyRefreshTabletsInKeyspaceShard(t, false, 1, tablets, nil) }) t.Run("change a tablet and call refreshTabletsInKeyspaceShard again", func(t *testing.T) { From 529ac24579952ea53c41242c70d0608b4aada4ca Mon Sep 17 00:00:00 2001 From: Manan Gupta Date: Wed, 21 Jun 2023 16:46:43 +0530 Subject: [PATCH 7/8] feat: fix flakiness in tests Signed-off-by: Manan Gupta --- go/vt/vtorc/inst/analysis.go | 1 + go/vt/vtorc/inst/analysis_dao.go | 19 ++++++++++++++----- go/vt/vtorc/inst/analysis_dao_test.go | 27 ++++++++++++++++++++++++--- 3 files changed, 39 insertions(+), 8 deletions(-) diff --git a/go/vt/vtorc/inst/analysis.go b/go/vt/vtorc/inst/analysis.go index bd186ca894d..e586ac33fc8 100644 --- a/go/vt/vtorc/inst/analysis.go +++ b/go/vt/vtorc/inst/analysis.go @@ -32,6 +32,7 @@ const ( NoProblem AnalysisCode = "NoProblem" ClusterHasNoPrimary AnalysisCode = "ClusterHasNoPrimary" InvalidPrimary AnalysisCode = "InvalidPrimary" + InvalidReplica AnalysisCode = "InvalidReplica" DeadPrimaryWithoutReplicas AnalysisCode = "DeadPrimaryWithoutReplicas" DeadPrimary AnalysisCode = "DeadPrimary" DeadPrimaryAndReplicas AnalysisCode = "DeadPrimaryAndReplicas" diff --git a/go/vt/vtorc/inst/analysis_dao.go b/go/vt/vtorc/inst/analysis_dao.go index 8409142c378..1c77d70db4e 100644 --- a/go/vt/vtorc/inst/analysis_dao.go +++ b/go/vt/vtorc/inst/analysis_dao.go @@ -269,6 +269,8 @@ func GetReplicationAnalysis(keyspace string, shard string, hints *ReplicationAna ) LEFT JOIN database_instance primary_instance ON ( vitess_tablet.alias = primary_instance.alias + AND vitess_tablet.hostname = primary_instance.hostname + AND vitess_tablet.port = primary_instance.port ) LEFT JOIN vitess_tablet primary_tablet ON ( primary_tablet.hostname = primary_instance.source_host @@ -426,9 +428,10 @@ func GetReplicationAnalysis(keyspace string, shard string, hints *ReplicationAna isInvalid := m.GetBool("is_invalid") if a.IsClusterPrimary && isInvalid { a.Analysis = InvalidPrimary - a.Description = "VTOrc hasn't been able to reach the primary even once" + a.Description = "VTOrc hasn't been able to reach the primary even once since restart/shutdown" } else if isInvalid { - return nil + a.Analysis = InvalidReplica + a.Description = "VTOrc hasn't been able to reach the replica even once since restart/shutdown" } else if a.IsClusterPrimary && !a.LastCheckValid && a.CountReplicas == 0 { a.Analysis = DeadPrimaryWithoutReplicas a.Description = "Primary cannot be reached by vtorc and has no replica" @@ -594,6 +597,10 @@ func GetReplicationAnalysis(keyspace string, shard string, hints *ReplicationAna return nil }) + for _, analysis := range result { + log.Errorf("Analysis - Instance - %v, Code - %v, LastCheckValid - %v, ReplStopped - %v", analysis.AnalyzedInstanceAlias, analysis.Analysis, analysis.LastCheckValid, analysis.ReplicationStopped) + } + result = postProcessAnalyses(result, clusters) if err != nil { @@ -621,9 +628,11 @@ func postProcessAnalyses(result []*ReplicationAnalysis, clusters map[string]*clu var notReplicatingReplicas []int for idx, replicaAnalysis := range result { if replicaAnalysis.ClusterDetails.Keyspace == keyspaceName && - replicaAnalysis.ClusterDetails.Shard == shardName && - topo.IsReplicaType(replicaAnalysis.TabletType) && replicaAnalysis.ReplicationStopped { - notReplicatingReplicas = append(notReplicatingReplicas, idx) + replicaAnalysis.ClusterDetails.Shard == shardName && topo.IsReplicaType(replicaAnalysis.TabletType) { + // If the replica's last check is invalid or its replication is stopped, then we consider as not replicating. + if !replicaAnalysis.LastCheckValid || replicaAnalysis.ReplicationStopped { + notReplicatingReplicas = append(notReplicatingReplicas, idx) + } } } // If none of the other tablets are able to replicate, then we conclude that this primary is not just Invalid, but also Dead. diff --git a/go/vt/vtorc/inst/analysis_dao_test.go b/go/vt/vtorc/inst/analysis_dao_test.go index b4d869b601f..cabea7bfccd 100644 --- a/go/vt/vtorc/inst/analysis_dao_test.go +++ b/go/vt/vtorc/inst/analysis_dao_test.go @@ -565,7 +565,7 @@ func TestGetReplicationAnalysisDecision(t *testing.T) { }}, keyspaceWanted: "ks", shardWanted: "0", - codeWanted: NoProblem, + codeWanted: InvalidReplica, }, { name: "DeadPrimary when VTOrc is starting up", info: []*test.InfoForRecoveryAnalysis{{ @@ -708,7 +708,9 @@ func TestGetReplicationAnalysis(t *testing.T) { // We should wait for the MySQL information to be refreshed once. // This situation only happens when we haven't been able to read the MySQL information even once for this tablet. // So it is likely a new tablet. - codeWanted: NoProblem, + codeWanted: InvalidReplica, + keyspaceWanted: "ks", + shardWanted: "0", }, } @@ -815,7 +817,7 @@ func TestPostProcessAnalyses(t *testing.T) { ks0 := ClusterInfo{ Keyspace: "ks", Shard: "0", - CountInstances: 3, + CountInstances: 4, } ks80 := ClusterInfo{ Keyspace: "ks", @@ -842,13 +844,16 @@ func TestPostProcessAnalyses(t *testing.T) { { Analysis: ReplicationStopped, TabletType: topodatapb.TabletType_REPLICA, + LastCheckValid: true, ClusterDetails: ks0, }, { Analysis: ReplicaSemiSyncMustBeSet, + LastCheckValid: true, TabletType: topodatapb.TabletType_REPLICA, ClusterDetails: ks0, }, { Analysis: PrimaryHasPrimary, + LastCheckValid: true, TabletType: topodatapb.TabletType_REPLICA, ClusterDetails: ks0, }, @@ -863,24 +868,34 @@ func TestPostProcessAnalyses(t *testing.T) { ClusterDetails: ks0, }, { Analysis: NoProblem, + LastCheckValid: true, AnalyzedInstanceAlias: "zone1-202", TabletType: topodatapb.TabletType_RDONLY, ClusterDetails: ks80, }, { Analysis: ConnectedToWrongPrimary, + LastCheckValid: true, AnalyzedInstanceAlias: "zone1-101", TabletType: topodatapb.TabletType_REPLICA, ReplicationStopped: true, ClusterDetails: ks0, }, { Analysis: ReplicationStopped, + LastCheckValid: true, AnalyzedInstanceAlias: "zone1-102", TabletType: topodatapb.TabletType_RDONLY, ReplicationStopped: true, ClusterDetails: ks0, + }, { + Analysis: InvalidReplica, + AnalyzedInstanceAlias: "zone1-108", + TabletType: topodatapb.TabletType_REPLICA, + LastCheckValid: false, + ClusterDetails: ks0, }, { Analysis: NoProblem, AnalyzedInstanceAlias: "zone1-302", + LastCheckValid: true, TabletType: topodatapb.TabletType_REPLICA, ClusterDetails: ks80, }, @@ -893,11 +908,13 @@ func TestPostProcessAnalyses(t *testing.T) { ClusterDetails: ks0, }, { Analysis: NoProblem, + LastCheckValid: true, AnalyzedInstanceAlias: "zone1-202", TabletType: topodatapb.TabletType_RDONLY, ClusterDetails: ks80, }, { Analysis: NoProblem, + LastCheckValid: true, AnalyzedInstanceAlias: "zone1-302", TabletType: topodatapb.TabletType_REPLICA, ClusterDetails: ks80, @@ -915,21 +932,25 @@ func TestPostProcessAnalyses(t *testing.T) { }, { Analysis: NoProblem, AnalyzedInstanceAlias: "zone1-202", + LastCheckValid: true, TabletType: topodatapb.TabletType_RDONLY, ClusterDetails: ks80, }, { Analysis: NoProblem, + LastCheckValid: true, AnalyzedInstanceAlias: "zone1-101", TabletType: topodatapb.TabletType_REPLICA, ClusterDetails: ks0, }, { Analysis: ReplicationStopped, + LastCheckValid: true, AnalyzedInstanceAlias: "zone1-102", TabletType: topodatapb.TabletType_RDONLY, ReplicationStopped: true, ClusterDetails: ks0, }, { Analysis: NoProblem, + LastCheckValid: true, AnalyzedInstanceAlias: "zone1-302", TabletType: topodatapb.TabletType_REPLICA, ClusterDetails: ks80, From d7b699c23becc164321c1b920bbd7f1b96e76f96 Mon Sep 17 00:00:00 2001 From: Manan Gupta Date: Thu, 22 Jun 2023 11:00:16 +0530 Subject: [PATCH 8/8] feat: fix comments Signed-off-by: Manan Gupta --- go/vt/vtorc/inst/analysis_dao.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/go/vt/vtorc/inst/analysis_dao.go b/go/vt/vtorc/inst/analysis_dao.go index 1c77d70db4e..3fded2f814d 100644 --- a/go/vt/vtorc/inst/analysis_dao.go +++ b/go/vt/vtorc/inst/analysis_dao.go @@ -415,7 +415,7 @@ func GetReplicationAnalysis(keyspace string, shard string, hints *ReplicationAna } // ca has clusterwide info ca := clusters[keyspaceShard] - // Increment the total amount of tablets. + // Increment the total number of tablets. ca.totalTablets += 1 if ca.hasClusterwideAction { // We can only take one cluster level action at a time. @@ -616,7 +616,7 @@ func postProcessAnalyses(result []*ReplicationAnalysis, clusters map[string]*clu // Store whether we have changed the result of replication analysis or not. resultChanged := false - // Go over all the analysis. + // Go over all the analyses. for _, analysis := range result { // If one of them is an InvalidPrimary, then we see if all the other tablets in this keyspace shard are // unable to replicate or not.