Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve VTOrc failure detection to be able to better handle dead primary failures #13190

Merged
merged 8 commits into from
Jun 22, 2023
Merged
Prev Previous commit
Next Next commit
feat: refactor the code
Signed-off-by: Manan Gupta <manan@planetscale.com>
GuptaManan100 committed Jun 20, 2023

Verified

This commit was signed with the committer’s verified signature.
GuptaManan100 Manan Gupta
commit 0da9abf79fe79e34a93bfa3fb16e190d6a14e3f7
91 changes: 54 additions & 37 deletions go/vt/vtorc/inst/analysis_dao.go
Original file line number Diff line number Diff line change
@@ -62,8 +62,14 @@ type clusterAnalysis struct {
}

// GetReplicationAnalysis will check for replication problems (dead primary; unreachable primary; etc)
func GetReplicationAnalysis(keyspace string, shard string, hints *ReplicationAnalysisHints) ([]ReplicationAnalysis, error) {
result := []ReplicationAnalysis{}
func GetReplicationAnalysis(keyspace string, shard string, hints *ReplicationAnalysisHints) ([]*ReplicationAnalysis, error) {
var result []*ReplicationAnalysis
appendAnalysis := func(analysis *ReplicationAnalysis) {
if analysis.Analysis == NoProblem && len(analysis.StructureAnalysis) == 0 {
return
}
result = append(result, analysis)
}

// TODO(sougou); deprecate ReduceReplicationAnalysisCount
args := sqlutils.Args(config.Config.ReasonableReplicationLagSeconds, ValidSecondsFromSeenToLastAttemptedCheck(), config.Config.ReasonableReplicationLagSeconds, keyspace, shard)
@@ -287,7 +293,7 @@ func GetReplicationAnalysis(keyspace string, shard string, hints *ReplicationAna

clusters := make(map[string]*clusterAnalysis)
err := db.Db.QueryVTOrc(query, args, func(m sqlutils.RowMap) error {
a := ReplicationAnalysis{
a := &ReplicationAnalysis{
Analysis: NoProblem,
ProcessingNodeHostname: process.ThisHostname,
ProcessingNodeToken: util.ProcessToken.Hash,
@@ -537,13 +543,6 @@ func GetReplicationAnalysis(keyspace string, shard string, hints *ReplicationAna
// a.Description = "Primary has no replicas"
// }

appendAnalysis := func(analysis *ReplicationAnalysis) {
if a.Analysis == NoProblem && len(a.StructureAnalysis) == 0 {
return
}
result = append(result, a)
}

{
// Moving on to structure analysis
// We also do structural checks. See if there's potential danger in promotions
@@ -584,7 +583,7 @@ func GetReplicationAnalysis(keyspace string, shard string, hints *ReplicationAna
a.StructureAnalysis = append(a.StructureAnalysis, NotEnoughValidSemiSyncReplicasStructureWarning)
}
}
appendAnalysis(&a)
appendAnalysis(a)

if a.CountReplicas > 0 && hints.AuditAnalysis {
// Interesting enough for analysis
@@ -595,32 +594,7 @@ func GetReplicationAnalysis(keyspace string, shard string, hints *ReplicationAna
return nil
})

for idx, analysis := range result {
if analysis.Analysis == InvalidPrimary {
keyspaceName := analysis.ClusterDetails.Keyspace
shardName := analysis.ClusterDetails.Shard
keyspaceShard := getKeyspaceShardName(keyspaceName, shardName)
totalReplicas := clusters[keyspaceShard].totalTablets - 1
var notReplicatingReplicas []int
for i := idx + 1; i < len(result); i++ {
replicaAnalysis := result[i]
if replicaAnalysis.ClusterDetails.Keyspace == keyspaceName &&
replicaAnalysis.ClusterDetails.Shard == shardName &&
topo.IsReplicaType(replicaAnalysis.TabletType) &&
!replicaAnalysis.IsPrimary && replicaAnalysis.ReplicationStopped {
notReplicatingReplicas = append(notReplicatingReplicas, i)
}
}
if len(notReplicatingReplicas) == totalReplicas && totalReplicas > 0 {
analysis.Analysis = DeadPrimary
result[idx] = analysis
for i := len(notReplicatingReplicas) - 1; i >= 0; i-- {
idxToRemove := notReplicatingReplicas[i]
result = append(result[0:idxToRemove], result[idxToRemove+1:]...)
}
}
}
}
result = postProcessAnalyses(result, clusters)

if err != nil {
log.Error(err)
@@ -629,6 +603,49 @@ func GetReplicationAnalysis(keyspace string, shard string, hints *ReplicationAna
return result, err
}

// postProcessAnalyses is used to update different analyses based on the information gleaned from looking at all the analyses together instead of individual data.
GuptaManan100 marked this conversation as resolved.
Show resolved Hide resolved
func postProcessAnalyses(result []*ReplicationAnalysis, clusters map[string]*clusterAnalysis) []*ReplicationAnalysis {
GuptaManan100 marked this conversation as resolved.
Show resolved Hide resolved
GuptaManan100 marked this conversation as resolved.
Show resolved Hide resolved
for {
// Store whether we have changed the result of replication analysis or not.
resultChanged := false

// Go over all the analysis.
GuptaManan100 marked this conversation as resolved.
Show resolved Hide resolved
for _, analysis := range result {
// If one of them is an InvalidPrimary, then we see if all the other tablets in this keyspace shard are
// unable to replicate or not.
if analysis.Analysis == InvalidPrimary {
keyspaceName := analysis.ClusterDetails.Keyspace
shardName := analysis.ClusterDetails.Shard
keyspaceShard := getKeyspaceShardName(keyspaceName, shardName)
totalReplicas := clusters[keyspaceShard].totalTablets - 1
var notReplicatingReplicas []int
for idx, replicaAnalysis := range result {
if replicaAnalysis.ClusterDetails.Keyspace == keyspaceName &&
replicaAnalysis.ClusterDetails.Shard == shardName &&
topo.IsReplicaType(replicaAnalysis.TabletType) && replicaAnalysis.ReplicationStopped {
notReplicatingReplicas = append(notReplicatingReplicas, idx)
}
}
// If none of the other tablets are able to replicate, then we conclude that this primary is not just Invalid, but also Dead.
// In this case, we update the analysis for the primary tablet and remove all the analyses of the replicas.
if totalReplicas > 0 && len(notReplicatingReplicas) == totalReplicas {
resultChanged = true
analysis.Analysis = DeadPrimary
for i := len(notReplicatingReplicas) - 1; i >= 0; i-- {
idxToRemove := notReplicatingReplicas[i]
result = append(result[0:idxToRemove], result[idxToRemove+1:]...)
}
break
}
}
}
if !resultChanged {
break
}
}
return result
}

// auditInstanceAnalysisInChangelog will write down an instance's analysis in the database_instance_analysis_changelog table.
// To not repeat recurring analysis code, the database_instance_last_analysis table is used, so that only changes to
// analysis codes are written.
7 changes: 4 additions & 3 deletions go/vt/vtorc/inst/analysis_dao_test.go
Original file line number Diff line number Diff line change
@@ -695,9 +695,10 @@ func TestGetReplicationAnalysis(t *testing.T) {
},
// As long as we have the vitess record stating that this tablet is the primary
// It would be incorrect to run a PRS.
// This situation only happens when we haven't been able to read the MySQL information even once for this tablet.
// So it is likely a new tablet.
codeWanted: NoProblem,
// We should still flag this tablet as Invalid.
codeWanted: InvalidPrimary,
keyspaceWanted: "ks",
shardWanted: "0",
}, {
name: "Removing Replica Tablet's MySQL record",
sql: []string{
42 changes: 21 additions & 21 deletions go/vt/vtorc/logic/topology_recovery.go
Original file line number Diff line number Diff line change
@@ -195,8 +195,8 @@ func resolveRecovery(topologyRecovery *TopologyRecovery, successorInstance *inst
}

// recoverPrimaryHasPrimary resets the replication on the primary instance
func recoverPrimaryHasPrimary(ctx context.Context, analysisEntry inst.ReplicationAnalysis) (recoveryAttempted bool, topologyRecovery *TopologyRecovery, err error) {
topologyRecovery, err = AttemptRecoveryRegistration(&analysisEntry, false, true)
func recoverPrimaryHasPrimary(ctx context.Context, analysisEntry *inst.ReplicationAnalysis) (recoveryAttempted bool, topologyRecovery *TopologyRecovery, err error) {
topologyRecovery, err = AttemptRecoveryRegistration(analysisEntry, false, true)
if topologyRecovery == nil {
_ = AuditTopologyRecovery(topologyRecovery, fmt.Sprintf("found an active or recent recovery on %+v. Will not issue another fixPrimaryHasPrimary.", analysisEntry.AnalyzedInstanceAlias))
return false, nil, err
@@ -218,7 +218,7 @@ func recoverPrimaryHasPrimary(ctx context.Context, analysisEntry inst.Replicatio

// recoverDeadPrimary checks a given analysis, decides whether to take action, and possibly takes action
// Returns true when action was taken.
func recoverDeadPrimary(ctx context.Context, analysisEntry inst.ReplicationAnalysis) (recoveryAttempted bool, topologyRecovery *TopologyRecovery, err error) {
func recoverDeadPrimary(ctx context.Context, analysisEntry *inst.ReplicationAnalysis) (recoveryAttempted bool, topologyRecovery *TopologyRecovery, err error) {
if !analysisEntry.ClusterDetails.HasAutomatedPrimaryRecovery {
return false, nil, nil
}
@@ -229,7 +229,7 @@ func recoverDeadPrimary(ctx context.Context, analysisEntry inst.ReplicationAnaly
return false, nil, err
}

topologyRecovery, err = AttemptRecoveryRegistration(&analysisEntry, true, true)
topologyRecovery, err = AttemptRecoveryRegistration(analysisEntry, true, true)
if topologyRecovery == nil {
_ = AuditTopologyRecovery(topologyRecovery, fmt.Sprintf("found an active or recent recovery on %+v. Will not issue another RecoverDeadPrimary.", analysisEntry.AnalyzedInstanceAlias))
return false, nil, err
@@ -275,7 +275,7 @@ func recoverDeadPrimary(ctx context.Context, analysisEntry inst.ReplicationAnaly
return true, topologyRecovery, err
}

func postErsCompletion(topologyRecovery *TopologyRecovery, analysisEntry inst.ReplicationAnalysis, promotedReplica *inst.Instance) {
func postErsCompletion(topologyRecovery *TopologyRecovery, analysisEntry *inst.ReplicationAnalysis, promotedReplica *inst.Instance) {
if promotedReplica != nil {
message := fmt.Sprintf("promoted replica: %+v", promotedReplica.InstanceAlias)
_ = AuditTopologyRecovery(topologyRecovery, message)
@@ -285,12 +285,12 @@ func postErsCompletion(topologyRecovery *TopologyRecovery, analysisEntry inst.Re
}

// checkAndRecoverGenericProblem is a general-purpose recovery function
func checkAndRecoverLockedSemiSyncPrimary(ctx context.Context, analysisEntry inst.ReplicationAnalysis) (recoveryAttempted bool, topologyRecovery *TopologyRecovery, err error) {
func checkAndRecoverLockedSemiSyncPrimary(ctx context.Context, analysisEntry *inst.ReplicationAnalysis) (recoveryAttempted bool, topologyRecovery *TopologyRecovery, err error) {
return false, nil, nil
}

// checkAndRecoverGenericProblem is a general-purpose recovery function
func checkAndRecoverGenericProblem(ctx context.Context, analysisEntry inst.ReplicationAnalysis) (bool, *TopologyRecovery, error) {
func checkAndRecoverGenericProblem(ctx context.Context, analysisEntry *inst.ReplicationAnalysis) (bool, *TopologyRecovery, error) {
return false, nil, nil
}

@@ -370,8 +370,8 @@ func emergentlyRecordStaleBinlogCoordinates(tabletAlias string, binlogCoordinate

// checkAndExecuteFailureDetectionProcesses tries to register for failure detection and potentially executes
// failure-detection processes.
func checkAndExecuteFailureDetectionProcesses(analysisEntry inst.ReplicationAnalysis) (detectionRegistrationSuccess bool, processesExecutionAttempted bool, err error) {
if ok, _ := AttemptFailureDetectionRegistration(&analysisEntry); !ok {
func checkAndExecuteFailureDetectionProcesses(analysisEntry *inst.ReplicationAnalysis) (detectionRegistrationSuccess bool, processesExecutionAttempted bool, err error) {
if ok, _ := AttemptFailureDetectionRegistration(analysisEntry); !ok {
if util.ClearToLog("checkAndExecuteFailureDetectionProcesses", analysisEntry.AnalyzedInstanceAlias) {
log.Infof("checkAndExecuteFailureDetectionProcesses: could not register %+v detection on %+v", analysisEntry.Analysis, analysisEntry.AnalyzedInstanceAlias)
}
@@ -455,7 +455,7 @@ func hasActionableRecovery(recoveryFunctionCode recoveryFunction) bool {

// getCheckAndRecoverFunction gets the recovery function for the given code.
func getCheckAndRecoverFunction(recoveryFunctionCode recoveryFunction) (
checkAndRecoverFunction func(ctx context.Context, analysisEntry inst.ReplicationAnalysis) (recoveryAttempted bool, topologyRecovery *TopologyRecovery, err error),
checkAndRecoverFunction func(ctx context.Context, analysisEntry *inst.ReplicationAnalysis) (recoveryAttempted bool, topologyRecovery *TopologyRecovery, err error),
) {
switch recoveryFunctionCode {
case noRecoveryFunc:
@@ -515,7 +515,7 @@ func isClusterWideRecovery(recoveryFunctionCode recoveryFunction) bool {
}

// analysisEntriesHaveSameRecovery tells whether the two analysis entries have the same recovery function or not
func analysisEntriesHaveSameRecovery(prevAnalysis, newAnalysis inst.ReplicationAnalysis) bool {
func analysisEntriesHaveSameRecovery(prevAnalysis, newAnalysis *inst.ReplicationAnalysis) bool {
prevRecoveryFunctionCode := getCheckAndRecoverFunctionCode(prevAnalysis.Analysis, prevAnalysis.AnalyzedInstanceAlias)
newRecoveryFunctionCode := getCheckAndRecoverFunctionCode(newAnalysis.Analysis, newAnalysis.AnalyzedInstanceAlias)
return prevRecoveryFunctionCode == newRecoveryFunctionCode
@@ -542,14 +542,14 @@ func runEmergentOperations(analysisEntry *inst.ReplicationAnalysis) {

// executeCheckAndRecoverFunction will choose the correct check & recovery function based on analysis.
// It executes the function synchronuously
func executeCheckAndRecoverFunction(analysisEntry inst.ReplicationAnalysis) (err error) {
func executeCheckAndRecoverFunction(analysisEntry *inst.ReplicationAnalysis) (err error) {
countPendingRecoveries.Add(1)
defer countPendingRecoveries.Add(-1)

checkAndRecoverFunctionCode := getCheckAndRecoverFunctionCode(analysisEntry.Analysis, analysisEntry.AnalyzedInstanceAlias)
isActionableRecovery := hasActionableRecovery(checkAndRecoverFunctionCode)
analysisEntry.IsActionableRecovery = isActionableRecovery
runEmergentOperations(&analysisEntry)
runEmergentOperations(analysisEntry)

if checkAndRecoverFunctionCode == noRecoveryFunc {
// Unhandled problem type
@@ -702,7 +702,7 @@ func executeCheckAndRecoverFunction(analysisEntry inst.ReplicationAnalysis) (err
}

// checkIfAlreadyFixed checks whether the problem that the analysis entry represents has already been fixed by another agent or not
func checkIfAlreadyFixed(analysisEntry inst.ReplicationAnalysis) (bool, error) {
func checkIfAlreadyFixed(analysisEntry *inst.ReplicationAnalysis) (bool, error) {
// Run a replication analysis again. We will check if the problem persisted
analysisEntries, err := inst.GetReplicationAnalysis(analysisEntry.ClusterDetails.Keyspace, analysisEntry.ClusterDetails.Shard, &inst.ReplicationAnalysisHints{})
if err != nil {
@@ -742,7 +742,7 @@ func CheckAndRecover() {
}
}

func postPrsCompletion(topologyRecovery *TopologyRecovery, analysisEntry inst.ReplicationAnalysis, promotedReplica *inst.Instance) {
func postPrsCompletion(topologyRecovery *TopologyRecovery, analysisEntry *inst.ReplicationAnalysis, promotedReplica *inst.Instance) {
if promotedReplica != nil {
message := fmt.Sprintf("promoted replica: %+v", promotedReplica.InstanceAlias)
_ = AuditTopologyRecovery(topologyRecovery, message)
@@ -752,8 +752,8 @@ func postPrsCompletion(topologyRecovery *TopologyRecovery, analysisEntry inst.Re
}

// electNewPrimary elects a new primary while none were present before.
func electNewPrimary(ctx context.Context, analysisEntry inst.ReplicationAnalysis) (recoveryAttempted bool, topologyRecovery *TopologyRecovery, err error) {
topologyRecovery, err = AttemptRecoveryRegistration(&analysisEntry, false /*failIfFailedInstanceInActiveRecovery*/, true /*failIfClusterInActiveRecovery*/)
func electNewPrimary(ctx context.Context, analysisEntry *inst.ReplicationAnalysis) (recoveryAttempted bool, topologyRecovery *TopologyRecovery, err error) {
topologyRecovery, err = AttemptRecoveryRegistration(analysisEntry, false /*failIfFailedInstanceInActiveRecovery*/, true /*failIfClusterInActiveRecovery*/)
if topologyRecovery == nil || err != nil {
_ = AuditTopologyRecovery(topologyRecovery, fmt.Sprintf("found an active or recent recovery on %+v. Will not issue another electNewPrimary.", analysisEntry.AnalyzedInstanceAlias))
return false, nil, err
@@ -800,8 +800,8 @@ func electNewPrimary(ctx context.Context, analysisEntry inst.ReplicationAnalysis
}

// fixPrimary sets the primary as read-write.
func fixPrimary(ctx context.Context, analysisEntry inst.ReplicationAnalysis) (recoveryAttempted bool, topologyRecovery *TopologyRecovery, err error) {
topologyRecovery, err = AttemptRecoveryRegistration(&analysisEntry, false, true)
func fixPrimary(ctx context.Context, analysisEntry *inst.ReplicationAnalysis) (recoveryAttempted bool, topologyRecovery *TopologyRecovery, err error) {
topologyRecovery, err = AttemptRecoveryRegistration(analysisEntry, false, true)
if topologyRecovery == nil {
_ = AuditTopologyRecovery(topologyRecovery, fmt.Sprintf("found an active or recent recovery on %+v. Will not issue another fixPrimary.", analysisEntry.AnalyzedInstanceAlias))
return false, nil, err
@@ -831,8 +831,8 @@ func fixPrimary(ctx context.Context, analysisEntry inst.ReplicationAnalysis) (re
}

// fixReplica sets the replica as read-only and points it at the current primary.
func fixReplica(ctx context.Context, analysisEntry inst.ReplicationAnalysis) (recoveryAttempted bool, topologyRecovery *TopologyRecovery, err error) {
topologyRecovery, err = AttemptRecoveryRegistration(&analysisEntry, false, true)
func fixReplica(ctx context.Context, analysisEntry *inst.ReplicationAnalysis) (recoveryAttempted bool, topologyRecovery *TopologyRecovery, err error) {
topologyRecovery, err = AttemptRecoveryRegistration(analysisEntry, false, true)
if topologyRecovery == nil {
_ = AuditTopologyRecovery(topologyRecovery, fmt.Sprintf("found an active or recent recovery on %+v. Will not issue another fixReplica.", analysisEntry.AnalyzedInstanceAlias))
return false, nil, err
4 changes: 2 additions & 2 deletions go/vt/vtorc/logic/topology_recovery_test.go
Original file line number Diff line number Diff line change
@@ -87,7 +87,7 @@ func TestAnalysisEntriesHaveSameRecovery(t *testing.T) {
t.Parallel()
for _, tt := range tests {
t.Run(string(tt.prevAnalysisCode)+","+string(tt.newAnalysisCode), func(t *testing.T) {
res := analysisEntriesHaveSameRecovery(inst.ReplicationAnalysis{Analysis: tt.prevAnalysisCode}, inst.ReplicationAnalysis{Analysis: tt.newAnalysisCode})
res := analysisEntriesHaveSameRecovery(&inst.ReplicationAnalysis{Analysis: tt.prevAnalysisCode}, &inst.ReplicationAnalysis{Analysis: tt.newAnalysisCode})
require.Equal(t, tt.shouldBeEqual, res)
})
}
@@ -117,7 +117,7 @@ func TestElectNewPrimaryPanic(t *testing.T) {
}
err = inst.SaveTablet(tablet)
require.NoError(t, err)
analysisEntry := inst.ReplicationAnalysis{
analysisEntry := &inst.ReplicationAnalysis{
AnalyzedInstanceAlias: topoproto.TabletAliasString(tablet.Alias),
}
ts = memorytopo.NewServer("zone1")