Skip to content
This repository has been archived by the owner on Sep 30, 2024. It is now read-only.

Simplified concurrency control in moveReplicasViaGTID() #641

Merged
merged 5 commits into from
Oct 11, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 52 additions & 37 deletions go/inst/instance_topology.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,22 @@ func ASCIITopology(clusterName string, historyTimestampPattern string, tabulated
return result, nil
}

func shouldPostponeRelocatingReplica(replica *Instance, postponedFunctionsContainer *PostponedFunctionsContainer) bool {
if postponedFunctionsContainer == nil {
return false
}
if config.Config.PostponeReplicaRecoveryOnLagMinutes > 0 &&
replica.SQLDelay > config.Config.PostponeReplicaRecoveryOnLagMinutes*60 {
// This replica is lagging very much, AND
// we're configured to postpone operation on this replica so as not to delay everyone else.
return true
}
if replica.LastDiscoveryLatency > ReasonableDiscoveryLatency {
return true
}
return false
}

// GetInstanceMaster synchronously reaches into the replication topology
// and retrieves master's data
func GetInstanceMaster(instance *Instance) (*Instance, error) {
Expand Down Expand Up @@ -608,7 +624,7 @@ func MoveBelowGTID(instanceKey, otherKey *InstanceKey) (*Instance, error) {

// moveReplicasViaGTID moves a list of replicas under another instance via GTID, returning those replicas
// that could not be moved (do not use GTID)
func moveReplicasViaGTID(replicas [](*Instance), other *Instance) (movedReplicas [](*Instance), unmovedReplicas [](*Instance), err error, errs []error) {
func moveReplicasViaGTID(replicas [](*Instance), other *Instance, postponedFunctionsContainer *PostponedFunctionsContainer) (movedReplicas [](*Instance), unmovedReplicas [](*Instance), err error, errs []error) {
replicas = RemoveNilInstances(replicas)
replicas = RemoveInstance(replicas, &other.Key)
if len(replicas) == 0 {
Expand All @@ -618,38 +634,49 @@ func moveReplicasViaGTID(replicas [](*Instance), other *Instance) (movedReplicas

log.Infof("moveReplicasViaGTID: Will move %+v replicas below %+v via GTID", len(replicas), other.Key)

barrier := make(chan *InstanceKey)
replicaMutex := make(chan bool, 1)
var waitGroup sync.WaitGroup
var replicaMutex sync.Mutex
for _, replica := range replicas {
replica := replica

waitGroup.Add(1)
// Parallelize repoints
go func() {
defer func() { barrier <- &replica.Key }()
ExecuteOnTopology(func() {
defer waitGroup.Done()
moveFunc := func() error {
var replicaErr error
var movedReplica *Instance
if _, _, canMove := canMoveViaGTID(replica, other); canMove {
replica, replicaErr = moveInstanceBelowViaGTID(replica, other)
movedReplica, replicaErr = moveInstanceBelowViaGTID(replica, other)
if movedReplica != nil {
replica = movedReplica
}
} else {
replicaErr = fmt.Errorf("moveReplicasViaGTID: %+v cannot move below %+v via GTID", replica.Key, other.Key)
}
func() {
// Instantaneous mutex.
replicaMutex <- true
defer func() { <-replicaMutex }()
if replicaErr == nil {
movedReplicas = append(movedReplicas, replica)
} else {
unmovedReplicas = append(unmovedReplicas, replica)
errs = append(errs, replicaErr)
}
}()
})

// After having moved replicas, update local shared variables:
replicaMutex.Lock()
defer replicaMutex.Unlock()

if replicaErr == nil {
movedReplicas = append(movedReplicas, replica)
} else {
unmovedReplicas = append(unmovedReplicas, replica)
errs = append(errs, replicaErr)
}
return replicaErr
}
if shouldPostponeRelocatingReplica(replica, postponedFunctionsContainer) {
postponedFunctionsContainer.AddPostponedFunction(moveFunc, fmt.Sprintf("move-replicas-gtid %+v", replica.Key))
// We bail out and trust our invoker to later call upon this postponed function
} else {
ExecuteOnTopology(func() { moveFunc() })
}
}()
}
for range replicas {
<-barrier
}
waitGroup.Wait()

if len(errs) == len(replicas) {
// All returned with error
return movedReplicas, unmovedReplicas, fmt.Errorf("moveReplicasViaGTID: Error on all %+v operations", len(errs)), errs
Expand All @@ -673,7 +700,7 @@ func MoveReplicasGTID(masterKey *InstanceKey, belowKey *InstanceKey, pattern str
return movedReplicas, unmovedReplicas, err, errs
}
replicas = filterInstancesByPattern(replicas, pattern)
movedReplicas, unmovedReplicas, err, errs = moveReplicasViaGTID(replicas, belowInstance)
movedReplicas, unmovedReplicas, err, errs = moveReplicasViaGTID(replicas, belowInstance, nil)
if err != nil {
log.Errore(err)
}
Expand Down Expand Up @@ -1791,19 +1818,7 @@ func MultiMatchBelow(replicas [](*Instance), belowKey *InstanceKey, postponedFun
}
return replicaErr
}
postpone := false
if postponedFunctionsContainer != nil {
if config.Config.PostponeReplicaRecoveryOnLagMinutes > 0 &&
replica.SQLDelay > config.Config.PostponeReplicaRecoveryOnLagMinutes*60 {
// This replica is lagging very much, AND
// we're configured to postpone operation on this replica so as not to delay everyone else.
postpone = true
}
if replica.LastDiscoveryLatency > ReasonableDiscoveryLatency {
postpone = true
}
}
if postpone {
if shouldPostponeRelocatingReplica(replica, postponedFunctionsContainer) {
postponedFunctionsContainer.AddPostponedFunction(matchFunc, fmt.Sprintf("multi-match-below-independent %+v", replica.Key))
// We bail out and trust our invoker to later call upon this postponed function
} else {
Expand Down Expand Up @@ -2360,7 +2375,7 @@ func RegroupReplicasGTID(
replicasToMove := append(equalReplicas, laterReplicas...)
log.Debugf("RegroupReplicasGTID: working on %d replicas", len(replicasToMove))

movedReplicas, unmovedReplicas, err, _ = moveReplicasViaGTID(replicasToMove, candidateReplica)
movedReplicas, unmovedReplicas, err, _ = moveReplicasViaGTID(replicasToMove, candidateReplica, postponedFunctionsContainer)
unmovedReplicas = append(unmovedReplicas, aheadReplicas...)
return log.Errore(err)
}
Expand Down Expand Up @@ -2616,7 +2631,7 @@ func relocateReplicasInternal(replicas [](*Instance), instance, other *Instance)
}
// GTID
{
movedReplicas, unmovedReplicas, err, errs := moveReplicasViaGTID(replicas, other)
movedReplicas, unmovedReplicas, err, errs := moveReplicasViaGTID(replicas, other, nil)

if len(movedReplicas) == len(replicas) {
// Moved (or tried moving) everything via GTID
Expand Down
2 changes: 1 addition & 1 deletion go/logic/topology_recovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -1125,7 +1125,7 @@ func RecoverDeadCoMaster(topologyRecovery *TopologyRecovery, skipProcesses bool)
switch coMasterRecoveryType {
case MasterRecoveryGTID:
{
lostReplicas, _, cannotReplicateReplicas, promotedReplica, err = inst.RegroupReplicasGTID(failedInstanceKey, true, nil, nil, nil)
lostReplicas, _, cannotReplicateReplicas, promotedReplica, err = inst.RegroupReplicasGTID(failedInstanceKey, true, nil, &topologyRecovery.PostponedFunctionsContainer, nil)
}
case MasterRecoveryPseudoGTID:
{
Expand Down