Skip to content

Commit

Permalink
Merge pull request RamenDR#201 from red-hat-storage/sync_us--main
Browse files Browse the repository at this point in the history
Syncing latest changes from upstream main for ramen
  • Loading branch information
ShyamsundarR committed Feb 22, 2024
2 parents 5e1a7ed + 0e0db36 commit d8f9a96
Show file tree
Hide file tree
Showing 9 changed files with 385 additions and 140 deletions.
38 changes: 8 additions & 30 deletions controllers/drplacementcontrol.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,11 @@ import (

"github.com/go-logr/logr"
clrapiv1beta1 "github.com/open-cluster-management-io/api/cluster/v1beta1"
ocmworkv1 "github.com/open-cluster-management/api/work/v1"
errorswrapper "github.com/pkg/errors"
plrv1 "github.com/stolostron/multicloud-operators-placementrule/pkg/apis/apps/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/yaml"

rmn "github.com/ramendr/ramen/api/v1alpha1"
rmnutil "github.com/ramendr/ramen/controllers/util"
Expand All @@ -31,6 +28,9 @@ const (
DRPCNameAnnotation = "drplacementcontrol.ramendr.openshift.io/drpc-name"
DRPCNamespaceAnnotation = "drplacementcontrol.ramendr.openshift.io/drpc-namespace"

// Annotation that stores the UID of DRPC that created the resource on the managed cluster using a ManifestWork
DRPCUIDAnnotation = "drplacementcontrol.ramendr.openshift.io/drpc-uid"

// Annotation for the last cluster on which the application was running
LastAppDeploymentCluster = "drplacementcontrol.ramendr.openshift.io/last-app-deployment-cluster"

Expand Down Expand Up @@ -865,9 +865,7 @@ func (d *DRPCInstance) ensureCleanupAndVolSyncReplicationSetup(srcCluster string
srcCluster, ok))
}

clusterToSkip := srcCluster

err = d.EnsureCleanup(clusterToSkip)
err = d.EnsureCleanup(srcCluster)
if err != nil {
return err
}
Expand Down Expand Up @@ -1261,7 +1259,7 @@ func (d *DRPCInstance) getVRGFromManifestWork(clusterName string) (*rmn.VolumeRe
return nil, fmt.Errorf("%w", err)
}

vrg, err := d.extractVRGFromManifestWork(mw)
vrg, err := rmnutil.ExtractVRGFromManifestWork(mw)
if err != nil {
return nil, err
}
Expand All @@ -1285,9 +1283,7 @@ func (d *DRPCInstance) vrgExistsAndPrimary(targetCluster string) bool {
}

func (d *DRPCInstance) mwExistsAndPlacementUpdated(targetCluster string) (bool, error) {
vrgMWName := d.mwu.BuildManifestWorkName(rmnutil.MWTypeVRG)

_, err := d.mwu.FindManifestWork(vrgMWName, targetCluster)
_, err := d.mwu.FindManifestWorkByType(rmnutil.MWTypeVRG, targetCluster)
if err != nil {
if errors.IsNotFound(err) {
return false, nil
Expand Down Expand Up @@ -1525,6 +1521,7 @@ func (d *DRPCInstance) generateVRG(dstCluster string, repState rmn.ReplicationSt
Annotations: map[string]string{
DestinationClusterAnnotationKey: dstCluster,
DoNotDeletePVCAnnotation: d.instance.GetAnnotations()[DoNotDeletePVCAnnotation],
DRPCUIDAnnotation: string(d.instance.UID),
},
},
Spec: rmn.VolumeReplicationGroupSpec{
Expand Down Expand Up @@ -1792,10 +1789,7 @@ func (d *DRPCInstance) ensureVRGManifestWorkOnClusterDeleted(clusterName string)

const done = true

mwName := d.mwu.BuildManifestWorkName(rmnutil.MWTypeVRG)
mw := &ocmworkv1.ManifestWork{}

err := d.reconciler.Get(d.ctx, types.NamespacedName{Name: mwName, Namespace: clusterName}, mw)
mw, err := d.mwu.FindManifestWorkByType(rmnutil.MWTypeVRG, clusterName)
if err != nil {
if errors.IsNotFound(err) {
return done, nil
Expand Down Expand Up @@ -2082,22 +2076,6 @@ func (d *DRPCInstance) updateVRGToRunFinalSync(clusterName string) error {
return nil
}

func (d *DRPCInstance) extractVRGFromManifestWork(mw *ocmworkv1.ManifestWork) (*rmn.VolumeReplicationGroup, error) {
if len(mw.Spec.Workload.Manifests) == 0 {
return nil, fmt.Errorf("invalid VRG ManifestWork for type: %s", mw.Name)
}

vrgClientManifest := &mw.Spec.Workload.Manifests[0]
vrg := &rmn.VolumeReplicationGroup{}

err := yaml.Unmarshal(vrgClientManifest.RawExtension.Raw, &vrg)
if err != nil {
return nil, fmt.Errorf("unable to unmarshal VRG object (%w)", err)
}

return vrg, nil
}

func (d *DRPCInstance) updateManifestWork(clusterName string, vrg *rmn.VolumeReplicationGroup) error {
mw, err := d.mwu.FindManifestWorkByType(rmnutil.MWTypeVRG, clusterName)
if err != nil {
Expand Down
221 changes: 213 additions & 8 deletions controllers/drplacementcontrol_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -953,6 +953,12 @@ func (r *DRPlacementControlReconciler) reconcileDRPCInstance(d *DRPCInstance, lo
beforeProcessing = *d.instance.Status.LastUpdateTime
}

if !ensureVRGsManagedByDRPC(d.log, d.mwu, d.vrgs, d.instance, d.vrgNamespace) {
log.Info("Requeing... VRG adoption in progress")

return ctrl.Result{Requeue: true}, nil
}

requeue := d.startProcessing()
log.Info("Finished processing", "Requeue?", requeue)

Expand Down Expand Up @@ -1158,14 +1164,6 @@ func (r *DRPlacementControlReconciler) finalizeDRPC(ctx context.Context, drpc *r
return fmt.Errorf("failed to get DRPolicy while finalizing DRPC (%w)", err)
}

// delete manifestworks (VRGs)
for _, drClusterName := range rmnutil.DrpolicyClusterNames(drPolicy) {
err := mwu.DeleteManifestWorksForCluster(drClusterName)
if err != nil {
return fmt.Errorf("%w", err)
}
}

drClusters, err := getDRClusters(ctx, r.Client, drPolicy)
if err != nil {
return fmt.Errorf("failed to get drclusters. Error (%w)", err)
Expand All @@ -1177,6 +1175,18 @@ func (r *DRPlacementControlReconciler) finalizeDRPC(ctx context.Context, drpc *r
return fmt.Errorf("failed to retrieve VRGs. We'll retry later. Error (%w)", err)
}

if !ensureVRGsManagedByDRPC(r.Log, mwu, vrgs, drpc, vrgNamespace) {
return fmt.Errorf("VRG adoption in progress")
}

// delete manifestworks (VRGs)
for _, drClusterName := range rmnutil.DrpolicyClusterNames(drPolicy) {
err := mwu.DeleteManifestWorksForCluster(drClusterName)
if err != nil {
return fmt.Errorf("%w", err)
}
}

if len(vrgs) != 0 {
return fmt.Errorf("waiting for VRGs count to go to zero")
}
Expand Down Expand Up @@ -2286,6 +2296,21 @@ func (r *DRPlacementControlReconciler) determineDRPCState(
return Stop, "", err
}

mwu := rmnutil.MWUtil{
Client: r.Client,
APIReader: r.APIReader,
Ctx: ctx,
Log: log,
InstName: drpc.Name,
TargetNamespace: vrgNamespace,
}

if !ensureVRGsManagedByDRPC(log, mwu, vrgs, drpc, vrgNamespace) {
msg := "VRG adoption in progress"

return Stop, msg, nil
}

// IF 2 clusters queried, and both queries failed, then STOP
if successfullyQueriedClusterCount == 0 {
msg := "Stop - Number of clusters queried is 0"
Expand Down Expand Up @@ -2426,3 +2451,183 @@ func (r *DRPlacementControlReconciler) determineDRPCState(

return AllowFailover, msg, nil
}

// ensureVRGsManagedByDRPC ensures that VRGs reported by ManagedClusterView are managed by the current instance of
// DRPC. This is done using the DRPC UID annotation on the viewed VRG matching the current DRPC UID and if not
// creating or updating the exisiting ManifestWork for the VRG.
// Returns a bool indicating true if VRGs are managed by the current DRPC resource
func ensureVRGsManagedByDRPC(
log logr.Logger,
mwu rmnutil.MWUtil,
vrgs map[string]*rmn.VolumeReplicationGroup,
drpc *rmn.DRPlacementControl,
vrgNamespace string,
) bool {
ensured := true

for cluster, viewVRG := range vrgs {
if rmnutil.ResourceIsDeleted(viewVRG) {
log.Info("VRG reported by view undergoing deletion, during adoption",
"cluster", cluster, "namespace", viewVRG.Namespace, "name", viewVRG.Name)

continue
}

if viewVRG.GetAnnotations() != nil {
if v, ok := viewVRG.Annotations[DRPCUIDAnnotation]; ok && v == string(drpc.UID) {
continue
}
}

adopted := adoptVRG(log, mwu, viewVRG, cluster, drpc, vrgNamespace)

ensured = ensured && adopted
}

return ensured
}

// adoptVRG creates or updates the VRG ManifestWork to ensure that the current DRPC is managing the VRG resource
// Returns a bool indicating if adoption was completed (which is mostly false except when VRG MW is deleted)
func adoptVRG(
log logr.Logger,
mwu rmnutil.MWUtil,
viewVRG *rmn.VolumeReplicationGroup,
cluster string,
drpc *rmn.DRPlacementControl,
vrgNamespace string,
) bool {
adopted := true

mw, err := mwu.FindManifestWorkByType(rmnutil.MWTypeVRG, cluster)
if err != nil {
if !errors.IsNotFound(err) {
log.Info("error fetching VRG ManifestWork during adoption", "error", err, "cluster", cluster)

return !adopted
}

adoptOrphanVRG(log, mwu, viewVRG, cluster, drpc, vrgNamespace)

return !adopted
}

if rmnutil.ResourceIsDeleted(mw) {
log.Info("VRG ManifestWork found deleted during adoption", "cluster", cluster)

return adopted
}

vrg, err := rmnutil.ExtractVRGFromManifestWork(mw)
if err != nil {
log.Info("error extracting VRG from ManifestWork during adoption", "error", err, "cluster", cluster)

return !adopted
}

// NOTE: upgrade use case, to add DRPC UID for existing VRG MW
adoptExistingVRGManifestWork(log, mwu, vrg, cluster, drpc, vrgNamespace)

return !adopted
}

// adoptExistingVRGManifestWork updates an existing VRG ManifestWork as managed by the current DRPC resource
func adoptExistingVRGManifestWork(
log logr.Logger,
mwu rmnutil.MWUtil,
vrg *rmn.VolumeReplicationGroup,
cluster string,
drpc *rmn.DRPlacementControl,
vrgNamespace string,
) {
log.Info("adopting existing VRG ManifestWork", "cluster", cluster, "namespace", vrg.Namespace, "name", vrg.Name)

if vrg.GetAnnotations() == nil {
vrg.Annotations = make(map[string]string)
}

if v, ok := vrg.Annotations[DRPCUIDAnnotation]; ok && v == string(drpc.UID) {
// Annotation may already be set but not reflected on the resource view yet
log.Info("detected VRGs DRPC UID annotation as existing",
"cluster", cluster, "namespace", vrg.Namespace, "name", vrg.Name)

return
}

vrg.Annotations[DRPCUIDAnnotation] = string(drpc.UID)

annotations := make(map[string]string)
annotations[DRPCNameAnnotation] = drpc.Name
annotations[DRPCNamespaceAnnotation] = drpc.Namespace

err := mwu.CreateOrUpdateVRGManifestWork(drpc.Name, vrgNamespace, cluster, *vrg, annotations)
if err != nil {
log.Info("error updating VRG via ManifestWork during adoption", "error", err, "cluster", cluster)
}
}

// adoptOpphanVRG creates a missing ManifestWork for a VRG found via a ManagedClusterView
func adoptOrphanVRG(
log logr.Logger,
mwu rmnutil.MWUtil,
viewVRG *rmn.VolumeReplicationGroup,
cluster string,
drpc *rmn.DRPlacementControl,
vrgNamespace string,
) {
log.Info("adopting orphaned VRG ManifestWork",
"cluster", cluster, "namespace", viewVRG.Namespace, "name", viewVRG.Name)

annotations := make(map[string]string)
annotations[DRPCNameAnnotation] = drpc.Name
annotations[DRPCNamespaceAnnotation] = drpc.Namespace

// Adopt the namespace as well
err := mwu.CreateOrUpdateNamespaceManifest(drpc.Name, vrgNamespace, cluster, annotations)
if err != nil {
log.Info("error creating namespace via ManifestWork during adoption", "error", err, "cluster", cluster)

return
}

vrg := constructVRGFromView(viewVRG)
if vrg.GetAnnotations() == nil {
vrg.Annotations = make(map[string]string)
}

vrg.Annotations[DRPCUIDAnnotation] = string(drpc.UID)

if err := mwu.CreateOrUpdateVRGManifestWork(
drpc.Name, vrgNamespace,
cluster, *vrg, annotations); err != nil {
log.Info("error creating VRG via ManifestWork during adoption", "error", err, "cluster", cluster)
}
}

// constructVRGFromView selectively constructs a VRG from a view, using its spec and only those annotations that
// would be set by the hub on the ManifestWork
func constructVRGFromView(viewVRG *rmn.VolumeReplicationGroup) *rmn.VolumeReplicationGroup {
vrg := &rmn.VolumeReplicationGroup{
TypeMeta: metav1.TypeMeta{Kind: "VolumeReplicationGroup", APIVersion: "ramendr.openshift.io/v1alpha1"},
ObjectMeta: metav1.ObjectMeta{
Name: viewVRG.Name,
Namespace: viewVRG.Namespace,
},
}

viewVRG.Spec.DeepCopyInto(&vrg.Spec)

for k, v := range viewVRG.GetAnnotations() {
switch k {
case DestinationClusterAnnotationKey:
fallthrough
case DoNotDeletePVCAnnotation:
fallthrough
case DRPCUIDAnnotation:
rmnutil.AddAnnotation(vrg, k, v)
default:
}
}

return vrg
}
Loading

0 comments on commit d8f9a96

Please sign in to comment.