Skip to content

Commit

Permalink
Refactor VolSync Replication Code for Consistent Final Sync Preparation
Browse files Browse the repository at this point in the history
Refactor final sync preparation and execution for PVCs targeted for VolSync
replication, aligning code and behavior with VolRep.

PR link: #1395

Signed-off-by: Benamar Mekhissi <bmekhiss@ibm.com>
  • Loading branch information
Benamar Mekhissi committed May 14, 2024
1 parent cfdfbc7 commit 55cd381
Show file tree
Hide file tree
Showing 10 changed files with 723 additions and 425 deletions.
3 changes: 0 additions & 3 deletions api/v1alpha1/drplacementcontrol_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,10 +104,7 @@ const (
ProgressionFailingOverToCluster = ProgressionStatus("FailingOverToCluster")
ProgressionWaitForFencing = ProgressionStatus("WaitForFencing")
ProgressionWaitForStorageMaintenanceActivation = ProgressionStatus("WaitForStorageMaintenanceActivation")
ProgressionPreparingFinalSync = ProgressionStatus("PreparingFinalSync")
ProgressionClearingPlacement = ProgressionStatus("ClearingPlacement")
ProgressionRunningFinalSync = ProgressionStatus("RunningFinalSync")
ProgressionFinalSyncComplete = ProgressionStatus("FinalSyncComplete")
ProgressionEnsuringVolumesAreSecondary = ProgressionStatus("EnsuringVolumesAreSecondary")
ProgressionWaitingForResourceRestore = ProgressionStatus("WaitingForResourceRestore")
ProgressionUpdatedPlacement = ProgressionStatus("UpdatedPlacement")
Expand Down
223 changes: 5 additions & 218 deletions controllers/drplacementcontrol.go
Original file line number Diff line number Diff line change
Expand Up @@ -812,7 +812,7 @@ func (d *DRPCInstance) RunRelocate() (bool, error) {
}

if curHomeCluster != "" && curHomeCluster != preferredCluster {
result, err := d.quiesceAndRunFinalSync(curHomeCluster)
result, err := d.ensurequiescing(curHomeCluster, rmn.Relocating)
if err != nil {
return !done, err
}
Expand Down Expand Up @@ -884,23 +884,15 @@ func (d *DRPCInstance) ensureCleanupAndVolSyncReplicationSetup(srcCluster string
return nil
}

func (d *DRPCInstance) quiesceAndRunFinalSync(homeCluster string) (bool, error) {
func (d *DRPCInstance) ensurequiescing(homeCluster string, drState rmn.DRState) (bool, error) {
const done = true

result, err := d.prepareForFinalSync(homeCluster)
if err != nil {
return !done, err
}

if !result {
d.setProgression(rmn.ProgressionPreparingFinalSync)

return !done, nil
}
d.log.Info(fmt.Sprintf("Ensuring quiescing for cluster %s", homeCluster))

// We need to clear the placement so that the workload is deleted.
clusterDecision := d.reconciler.getClusterDecision(d.userPlacement)
if clusterDecision.ClusterName != "" {
d.setDRState(rmn.Relocating)
d.setDRState(drState)
addOrUpdateCondition(&d.instance.Status.Conditions, rmn.ConditionAvailable, d.instance.Generation,
d.getConditionStatusForTypeAvailable(), string(d.instance.Status.Phase), "Starting quiescing for relocation")

Expand All @@ -913,80 +905,6 @@ func (d *DRPCInstance) quiesceAndRunFinalSync(homeCluster string) (bool, error)
}
}

// Ensure final sync has been taken
result, err = d.runFinalSync(homeCluster)
if err != nil {
return !done, err
}

if !result {
d.setProgression(rmn.ProgressionRunningFinalSync)

return !done, nil
}

d.setProgression(rmn.ProgressionFinalSyncComplete)

return done, nil
}

func (d *DRPCInstance) prepareForFinalSync(homeCluster string) (bool, error) {
d.log.Info(fmt.Sprintf("Preparing final sync on cluster %s", homeCluster))

const done = true

vrg, ok := d.vrgs[homeCluster]

if !ok {
d.log.Info(fmt.Sprintf("prepareForFinalSync: VRG not available on cluster %s", homeCluster))

return !done, fmt.Errorf("VRG not found on Cluster %s", homeCluster)
}

if !vrg.Status.PrepareForFinalSyncComplete {
err := d.updateVRGToPrepareForFinalSync(homeCluster)
if err != nil {
return !done, err
}

// updated VRG to run final sync. Give it time...
d.log.Info(fmt.Sprintf("Giving enough time to prepare for final sync on cluster %s", homeCluster))

return !done, nil
}

d.log.Info("Preparing for final sync completed", "cluster", homeCluster)

return done, nil
}

func (d *DRPCInstance) runFinalSync(homeCluster string) (bool, error) {
d.log.Info(fmt.Sprintf("Running final sync on cluster %s", homeCluster))

const done = true

vrg, ok := d.vrgs[homeCluster]

if !ok {
d.log.Info(fmt.Sprintf("runFinalSync: VRG not available on cluster %s", homeCluster))

return !done, fmt.Errorf("VRG not found on Cluster %s", homeCluster)
}

if !vrg.Status.FinalSyncComplete {
err := d.updateVRGToRunFinalSync(homeCluster)
if err != nil {
return !done, err
}

// updated VRG to run final sync. Give it time...
d.log.Info(fmt.Sprintf("Giving it enough time to run final sync on cluster %s", homeCluster))

return !done, nil
}

d.log.Info("Running final sync completed", "cluster", homeCluster)

return done, nil
}

Expand Down Expand Up @@ -1989,11 +1907,6 @@ func (d *DRPCInstance) updateVRGState(clusterName string, state rmn.ReplicationS
}

vrg.Spec.ReplicationState = state
if state == rmn.Secondary {
// Turn off the final sync flags
vrg.Spec.PrepareForFinalSync = false
vrg.Spec.RunFinalSync = false
}

d.setVRGAction(vrg)

Expand All @@ -2007,66 +1920,6 @@ func (d *DRPCInstance) updateVRGState(clusterName string, state rmn.ReplicationS
return true, nil
}

func (d *DRPCInstance) updateVRGToPrepareForFinalSync(clusterName string) error {
d.log.Info(fmt.Sprintf("Updating VRG Spec to prepare for final sync on cluster %s", clusterName))

vrg, err := d.getVRGFromManifestWork(clusterName)
if err != nil {
return fmt.Errorf("failed to update VRG state. ClusterName %s (%w)",
clusterName, err)
}

if vrg.Spec.PrepareForFinalSync {
d.log.Info(fmt.Sprintf("VRG %s on cluster %s already has the prepare for final sync flag set",
vrg.Name, clusterName))

return nil
}

vrg.Spec.PrepareForFinalSync = true
vrg.Spec.RunFinalSync = false

err = d.updateManifestWork(clusterName, vrg)
if err != nil {
return err
}

d.log.Info(fmt.Sprintf("Updated VRG %s running in cluster %s to prepare for the final sync",
vrg.Name, clusterName))

return nil
}

func (d *DRPCInstance) updateVRGToRunFinalSync(clusterName string) error {
d.log.Info(fmt.Sprintf("Updating VRG Spec to run final sync on cluster %s", clusterName))

vrg, err := d.getVRGFromManifestWork(clusterName)
if err != nil {
return fmt.Errorf("failed to update VRG state. ClusterName %s (%w)",
clusterName, err)
}

if vrg.Spec.RunFinalSync {
d.log.Info(fmt.Sprintf("VRG %s on cluster %s already has the final sync flag set",
vrg.Name, clusterName))

return nil
}

vrg.Spec.RunFinalSync = true
vrg.Spec.PrepareForFinalSync = false

err = d.updateManifestWork(clusterName, vrg)
if err != nil {
return err
}

d.log.Info(fmt.Sprintf("Updated VRG %s running in cluster %s to run the final sync",
vrg.Name, clusterName))

return nil
}

func (d *DRPCInstance) updateManifestWork(clusterName string, vrg *rmn.VolumeReplicationGroup) error {
mw, err := d.mwu.FindManifestWorkByType(rmnutil.MWTypeVRG, clusterName)
if err != nil {
Expand Down Expand Up @@ -2111,79 +1964,13 @@ func updateDRPCProgression(
return false
}

/*
DRPC Status.Progression has several distinct progressions depending on the action being performed. The following
comment is to help identify which progressions belong to which actions for reference purposes.
deployProgressions are used to indicate progression during initial deployment processing
deployProgressions := {
ProgressionCreatingMW,
ProgressionUpdatingPlRule,
ProgressionEnsuringVolSyncSetup,
ProgressionSettingupVolsyncDest,
ProgressionCompleted,
}
failoverProgressions are used to indicate progression during failover action processing
- preFailoverProgressions indicates Progressions that are noted before creating VRG on the failoverCluster
- postFailoverProgressions indicates Progressions that are noted post creating VRG on the failoverCluster
preFailoverProgressions := {
ProgressionWaitForReadiness,
ProgressionCheckingFailoverPrequisites,
ProgressionWaitForFencing,
ProgressionWaitForStorageMaintenanceActivation,
}
postFailoverProgressions := {
ProgressionFailingOverToCluster,
ProgressionWaitingForResourceRestore,
ProgressionEnsuringVolSyncSetup,
ProgressionSettingupVolsyncDest,
ProgressionUpdatedPlacement,
ProgressionCompleted,
ProgressionCleaningUp,
}
relocateProgressions are used to indicate progression during relocate action processing
- preSwitch indicates Progressions that are noted before creating VRG on the preferredCluster
- postSwitch indicates Progressions that are noted post creating VRG on the preferredCluster
preRelocateProgressions := []rmn.ProgressionStatus{
rmn.ProgressionPreparingFinalSync,
rmn.ProgressionClearingPlacement,
rmn.ProgressionRunningFinalSync,
rmn.ProgressionFinalSyncComplete,
rmn.ProgressionEnsuringVolumesAreSecondary,
}
postRelocateProgressions := {
ProgressionCompleted,
ProgressionCleaningUp,
ProgressionWaitingForResourceRestore,
ProgressionUpdatedPlacement,
ProgressionEnsuringVolSyncSetup,
ProgressionSettingupVolsyncDest,
}
specialProgressions are used to indicate special cases irrespective of action or initial deployment
specialProgressions := {
ProgressionDeleting,
ProgressionActionPaused,
}
*/
func (d *DRPCInstance) setProgression(nextProgression rmn.ProgressionStatus) {
updateDRPCProgression(d.instance, nextProgression, d.log)
}

func IsPreRelocateProgression(status rmn.ProgressionStatus) bool {
preRelocateProgressions := []rmn.ProgressionStatus{
rmn.ProgressionPreparingFinalSync,
rmn.ProgressionClearingPlacement,
rmn.ProgressionRunningFinalSync,
rmn.ProgressionFinalSyncComplete,
rmn.ProgressionEnsuringVolumesAreSecondary,
}

Expand Down
28 changes: 27 additions & 1 deletion controllers/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ const (
// VolSync related conditions. These conditions are only applicable
// at individual PVCs and not generic VRG conditions.
VRGConditionTypeVolSyncRepSourceSetup = "ReplicationSourceSetup"
VRGConditionTypeVolSyncFinalSyncInProgress = "FinalSyncInProgress"
VRGConditionTypeVolSyncFinalSync = "FinalSync"
VRGConditionTypeVolSyncRepDestinationSetup = "ReplicationDestinationSetup"
VRGConditionTypeVolSyncPVsRestored = "PVsRestored"
)
Expand Down Expand Up @@ -459,3 +459,29 @@ func setVRGConditionTypeVolSyncPVRestoreError(conditions *[]metav1.Condition, ob
Message: message,
})
}

// sets conditions when Primary VolSync has finished setting up the Replication Destination
func setVRGConditionTypeVolSyncFinalSyncInProgress(conditions *[]metav1.Condition, observedGeneration int64,
message string,
) {
setStatusCondition(conditions, metav1.Condition{
Type: VRGConditionTypeVolSyncFinalSync,
Reason: VRGConditionReasonVolSyncFinalSyncInProgress,
ObservedGeneration: observedGeneration,
Status: metav1.ConditionFalse,
Message: message,
})
}

// sets conditions when Primary VolSync has finished setting up the Replication Destination
func setVRGConditionTypeVolSyncFinalSyncComplete(conditions *[]metav1.Condition, observedGeneration int64,
message string,
) {
setStatusCondition(conditions, metav1.Condition{
Type: VRGConditionTypeVolSyncFinalSync,
Reason: VRGConditionReasonVolSyncFinalSyncComplete,
ObservedGeneration: observedGeneration,
Status: metav1.ConditionTrue,
Message: message,
})
}
2 changes: 2 additions & 0 deletions controllers/util/misc.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ import (
const (
OCMBackupLabelKey string = "cluster.open-cluster-management.io/backup"
OCMBackupLabelValue string = "ramen"

AppPVCNameAnnotation = "ramendr.openshift.io/app-pvc-name"
)

type ResourceUpdater struct {
Expand Down
Loading

0 comments on commit 55cd381

Please sign in to comment.