diff --git a/controllers/drplacementcontrol.go b/controllers/drplacementcontrol.go index 83b42bd0e..463b7c2a1 100644 --- a/controllers/drplacementcontrol.go +++ b/controllers/drplacementcontrol.go @@ -11,14 +11,11 @@ import ( "github.com/go-logr/logr" clrapiv1beta1 "github.com/open-cluster-management-io/api/cluster/v1beta1" - ocmworkv1 "github.com/open-cluster-management/api/work/v1" errorswrapper "github.com/pkg/errors" plrv1 "github.com/stolostron/multicloud-operators-placementrule/pkg/apis/apps/v1" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/types" - "sigs.k8s.io/yaml" rmn "github.com/ramendr/ramen/api/v1alpha1" rmnutil "github.com/ramendr/ramen/controllers/util" @@ -31,6 +28,9 @@ const ( DRPCNameAnnotation = "drplacementcontrol.ramendr.openshift.io/drpc-name" DRPCNamespaceAnnotation = "drplacementcontrol.ramendr.openshift.io/drpc-namespace" + // Annotation that stores the UID of DRPC that created the resource on the managed cluster using a ManifestWork + DRPCUIDAnnotation = "drplacementcontrol.ramendr.openshift.io/drpc-uid" + // Annotation for the last cluster on which the application was running LastAppDeploymentCluster = "drplacementcontrol.ramendr.openshift.io/last-app-deployment-cluster" @@ -865,9 +865,7 @@ func (d *DRPCInstance) ensureCleanupAndVolSyncReplicationSetup(srcCluster string srcCluster, ok)) } - clusterToSkip := srcCluster - - err = d.EnsureCleanup(clusterToSkip) + err = d.EnsureCleanup(srcCluster) if err != nil { return err } @@ -1261,7 +1259,7 @@ func (d *DRPCInstance) getVRGFromManifestWork(clusterName string) (*rmn.VolumeRe return nil, fmt.Errorf("%w", err) } - vrg, err := d.extractVRGFromManifestWork(mw) + vrg, err := rmnutil.ExtractVRGFromManifestWork(mw) if err != nil { return nil, err } @@ -1285,9 +1283,7 @@ func (d *DRPCInstance) vrgExistsAndPrimary(targetCluster string) bool { } func (d *DRPCInstance) mwExistsAndPlacementUpdated(targetCluster string) (bool, error) { - vrgMWName := d.mwu.BuildManifestWorkName(rmnutil.MWTypeVRG) - - _, err := d.mwu.FindManifestWork(vrgMWName, targetCluster) + _, err := d.mwu.FindManifestWorkByType(rmnutil.MWTypeVRG, targetCluster) if err != nil { if errors.IsNotFound(err) { return false, nil @@ -1525,6 +1521,7 @@ func (d *DRPCInstance) generateVRG(dstCluster string, repState rmn.ReplicationSt Annotations: map[string]string{ DestinationClusterAnnotationKey: dstCluster, DoNotDeletePVCAnnotation: d.instance.GetAnnotations()[DoNotDeletePVCAnnotation], + DRPCUIDAnnotation: string(d.instance.UID), }, }, Spec: rmn.VolumeReplicationGroupSpec{ @@ -1792,10 +1789,7 @@ func (d *DRPCInstance) ensureVRGManifestWorkOnClusterDeleted(clusterName string) const done = true - mwName := d.mwu.BuildManifestWorkName(rmnutil.MWTypeVRG) - mw := &ocmworkv1.ManifestWork{} - - err := d.reconciler.Get(d.ctx, types.NamespacedName{Name: mwName, Namespace: clusterName}, mw) + mw, err := d.mwu.FindManifestWorkByType(rmnutil.MWTypeVRG, clusterName) if err != nil { if errors.IsNotFound(err) { return done, nil @@ -2082,22 +2076,6 @@ func (d *DRPCInstance) updateVRGToRunFinalSync(clusterName string) error { return nil } -func (d *DRPCInstance) extractVRGFromManifestWork(mw *ocmworkv1.ManifestWork) (*rmn.VolumeReplicationGroup, error) { - if len(mw.Spec.Workload.Manifests) == 0 { - return nil, fmt.Errorf("invalid VRG ManifestWork for type: %s", mw.Name) - } - - vrgClientManifest := &mw.Spec.Workload.Manifests[0] - vrg := &rmn.VolumeReplicationGroup{} - - err := yaml.Unmarshal(vrgClientManifest.RawExtension.Raw, &vrg) - if err != nil { - return nil, fmt.Errorf("unable to unmarshal VRG object (%w)", err) - } - - return vrg, nil -} - func (d *DRPCInstance) updateManifestWork(clusterName string, vrg *rmn.VolumeReplicationGroup) error { mw, err := d.mwu.FindManifestWorkByType(rmnutil.MWTypeVRG, clusterName) if err != nil { diff --git a/controllers/drplacementcontrol_controller.go b/controllers/drplacementcontrol_controller.go index e06b57b0e..ec317c80c 100644 --- a/controllers/drplacementcontrol_controller.go +++ b/controllers/drplacementcontrol_controller.go @@ -953,6 +953,12 @@ func (r *DRPlacementControlReconciler) reconcileDRPCInstance(d *DRPCInstance, lo beforeProcessing = *d.instance.Status.LastUpdateTime } + if !ensureVRGsManagedByDRPC(d.log, d.mwu, d.vrgs, d.instance, d.vrgNamespace) { + log.Info("Requeing... VRG adoption in progress") + + return ctrl.Result{Requeue: true}, nil + } + requeue := d.startProcessing() log.Info("Finished processing", "Requeue?", requeue) @@ -1158,14 +1164,6 @@ func (r *DRPlacementControlReconciler) finalizeDRPC(ctx context.Context, drpc *r return fmt.Errorf("failed to get DRPolicy while finalizing DRPC (%w)", err) } - // delete manifestworks (VRGs) - for _, drClusterName := range rmnutil.DrpolicyClusterNames(drPolicy) { - err := mwu.DeleteManifestWorksForCluster(drClusterName) - if err != nil { - return fmt.Errorf("%w", err) - } - } - drClusters, err := getDRClusters(ctx, r.Client, drPolicy) if err != nil { return fmt.Errorf("failed to get drclusters. Error (%w)", err) @@ -1177,6 +1175,18 @@ func (r *DRPlacementControlReconciler) finalizeDRPC(ctx context.Context, drpc *r return fmt.Errorf("failed to retrieve VRGs. We'll retry later. Error (%w)", err) } + if !ensureVRGsManagedByDRPC(r.Log, mwu, vrgs, drpc, vrgNamespace) { + return fmt.Errorf("VRG adoption in progress") + } + + // delete manifestworks (VRGs) + for _, drClusterName := range rmnutil.DrpolicyClusterNames(drPolicy) { + err := mwu.DeleteManifestWorksForCluster(drClusterName) + if err != nil { + return fmt.Errorf("%w", err) + } + } + if len(vrgs) != 0 { return fmt.Errorf("waiting for VRGs count to go to zero") } @@ -2286,6 +2296,21 @@ func (r *DRPlacementControlReconciler) determineDRPCState( return Stop, "", err } + mwu := rmnutil.MWUtil{ + Client: r.Client, + APIReader: r.APIReader, + Ctx: ctx, + Log: log, + InstName: drpc.Name, + TargetNamespace: vrgNamespace, + } + + if !ensureVRGsManagedByDRPC(log, mwu, vrgs, drpc, vrgNamespace) { + msg := "VRG adoption in progress" + + return Stop, msg, nil + } + // IF 2 clusters queried, and both queries failed, then STOP if successfullyQueriedClusterCount == 0 { msg := "Stop - Number of clusters queried is 0" @@ -2426,3 +2451,183 @@ func (r *DRPlacementControlReconciler) determineDRPCState( return AllowFailover, msg, nil } + +// ensureVRGsManagedByDRPC ensures that VRGs reported by ManagedClusterView are managed by the current instance of +// DRPC. This is done using the DRPC UID annotation on the viewed VRG matching the current DRPC UID and if not +// creating or updating the exisiting ManifestWork for the VRG. +// Returns a bool indicating true if VRGs are managed by the current DRPC resource +func ensureVRGsManagedByDRPC( + log logr.Logger, + mwu rmnutil.MWUtil, + vrgs map[string]*rmn.VolumeReplicationGroup, + drpc *rmn.DRPlacementControl, + vrgNamespace string, +) bool { + ensured := true + + for cluster, viewVRG := range vrgs { + if rmnutil.ResourceIsDeleted(viewVRG) { + log.Info("VRG reported by view undergoing deletion, during adoption", + "cluster", cluster, "namespace", viewVRG.Namespace, "name", viewVRG.Name) + + continue + } + + if viewVRG.GetAnnotations() != nil { + if v, ok := viewVRG.Annotations[DRPCUIDAnnotation]; ok && v == string(drpc.UID) { + continue + } + } + + adopted := adoptVRG(log, mwu, viewVRG, cluster, drpc, vrgNamespace) + + ensured = ensured && adopted + } + + return ensured +} + +// adoptVRG creates or updates the VRG ManifestWork to ensure that the current DRPC is managing the VRG resource +// Returns a bool indicating if adoption was completed (which is mostly false except when VRG MW is deleted) +func adoptVRG( + log logr.Logger, + mwu rmnutil.MWUtil, + viewVRG *rmn.VolumeReplicationGroup, + cluster string, + drpc *rmn.DRPlacementControl, + vrgNamespace string, +) bool { + adopted := true + + mw, err := mwu.FindManifestWorkByType(rmnutil.MWTypeVRG, cluster) + if err != nil { + if !errors.IsNotFound(err) { + log.Info("error fetching VRG ManifestWork during adoption", "error", err, "cluster", cluster) + + return !adopted + } + + adoptOrphanVRG(log, mwu, viewVRG, cluster, drpc, vrgNamespace) + + return !adopted + } + + if rmnutil.ResourceIsDeleted(mw) { + log.Info("VRG ManifestWork found deleted during adoption", "cluster", cluster) + + return adopted + } + + vrg, err := rmnutil.ExtractVRGFromManifestWork(mw) + if err != nil { + log.Info("error extracting VRG from ManifestWork during adoption", "error", err, "cluster", cluster) + + return !adopted + } + + // NOTE: upgrade use case, to add DRPC UID for existing VRG MW + adoptExistingVRGManifestWork(log, mwu, vrg, cluster, drpc, vrgNamespace) + + return !adopted +} + +// adoptExistingVRGManifestWork updates an existing VRG ManifestWork as managed by the current DRPC resource +func adoptExistingVRGManifestWork( + log logr.Logger, + mwu rmnutil.MWUtil, + vrg *rmn.VolumeReplicationGroup, + cluster string, + drpc *rmn.DRPlacementControl, + vrgNamespace string, +) { + log.Info("adopting existing VRG ManifestWork", "cluster", cluster, "namespace", vrg.Namespace, "name", vrg.Name) + + if vrg.GetAnnotations() == nil { + vrg.Annotations = make(map[string]string) + } + + if v, ok := vrg.Annotations[DRPCUIDAnnotation]; ok && v == string(drpc.UID) { + // Annotation may already be set but not reflected on the resource view yet + log.Info("detected VRGs DRPC UID annotation as existing", + "cluster", cluster, "namespace", vrg.Namespace, "name", vrg.Name) + + return + } + + vrg.Annotations[DRPCUIDAnnotation] = string(drpc.UID) + + annotations := make(map[string]string) + annotations[DRPCNameAnnotation] = drpc.Name + annotations[DRPCNamespaceAnnotation] = drpc.Namespace + + err := mwu.CreateOrUpdateVRGManifestWork(drpc.Name, vrgNamespace, cluster, *vrg, annotations) + if err != nil { + log.Info("error updating VRG via ManifestWork during adoption", "error", err, "cluster", cluster) + } +} + +// adoptOpphanVRG creates a missing ManifestWork for a VRG found via a ManagedClusterView +func adoptOrphanVRG( + log logr.Logger, + mwu rmnutil.MWUtil, + viewVRG *rmn.VolumeReplicationGroup, + cluster string, + drpc *rmn.DRPlacementControl, + vrgNamespace string, +) { + log.Info("adopting orphaned VRG ManifestWork", + "cluster", cluster, "namespace", viewVRG.Namespace, "name", viewVRG.Name) + + annotations := make(map[string]string) + annotations[DRPCNameAnnotation] = drpc.Name + annotations[DRPCNamespaceAnnotation] = drpc.Namespace + + // Adopt the namespace as well + err := mwu.CreateOrUpdateNamespaceManifest(drpc.Name, vrgNamespace, cluster, annotations) + if err != nil { + log.Info("error creating namespace via ManifestWork during adoption", "error", err, "cluster", cluster) + + return + } + + vrg := constructVRGFromView(viewVRG) + if vrg.GetAnnotations() == nil { + vrg.Annotations = make(map[string]string) + } + + vrg.Annotations[DRPCUIDAnnotation] = string(drpc.UID) + + if err := mwu.CreateOrUpdateVRGManifestWork( + drpc.Name, vrgNamespace, + cluster, *vrg, annotations); err != nil { + log.Info("error creating VRG via ManifestWork during adoption", "error", err, "cluster", cluster) + } +} + +// constructVRGFromView selectively constructs a VRG from a view, using its spec and only those annotations that +// would be set by the hub on the ManifestWork +func constructVRGFromView(viewVRG *rmn.VolumeReplicationGroup) *rmn.VolumeReplicationGroup { + vrg := &rmn.VolumeReplicationGroup{ + TypeMeta: metav1.TypeMeta{Kind: "VolumeReplicationGroup", APIVersion: "ramendr.openshift.io/v1alpha1"}, + ObjectMeta: metav1.ObjectMeta{ + Name: viewVRG.Name, + Namespace: viewVRG.Namespace, + }, + } + + viewVRG.Spec.DeepCopyInto(&vrg.Spec) + + for k, v := range viewVRG.GetAnnotations() { + switch k { + case DestinationClusterAnnotationKey: + fallthrough + case DoNotDeletePVCAnnotation: + fallthrough + case DRPCUIDAnnotation: + rmnutil.AddAnnotation(vrg, k, v) + default: + } + } + + return vrg +} diff --git a/controllers/drplacementcontrol_controller_test.go b/controllers/drplacementcontrol_controller_test.go index 605bf6ffd..308741815 100644 --- a/controllers/drplacementcontrol_controller_test.go +++ b/controllers/drplacementcontrol_controller_test.go @@ -388,6 +388,16 @@ func resetClusterDown() { ClusterIsDown = "" } +var ToggleUIDChecks bool // default false + +func setToggleUIDChecks() { + ToggleUIDChecks = true +} + +func resetToggleUIDChecks() { + ToggleUIDChecks = false +} + //nolint:funlen,cyclop,gocognit func (f FakeMCVGetter) GetVRGFromManagedCluster(resourceName, resourceNamespace, managedCluster string, annnotations map[string]string, @@ -465,52 +475,57 @@ func doGetFakeVRGsFromManagedClusters(managedCluster string, vrgNamespace string } vrgFromMW, err := getVRGFromManifestWork(managedCluster, vrgNamespace) - if err != nil { - if errors.IsNotFound(err) { - if getFunctionNameAtIndex(4) == "getVRGs" { // Called only from DRCluster reconciler, at present - return fakeVRGWithMModesProtectedPVC(vrgNamespace) - } + if err != nil && !errors.IsNotFound(err) { + return nil, err + } + + if errors.IsNotFound(err) { + if getFunctionNameAtIndex(4) == "getVRGs" { // Called only from DRCluster reconciler, at present + return fakeVRGWithMModesProtectedPVC(vrgNamespace) + } + + if getFunctionNameAtIndex(4) == "determineDRPCState" && ToggleUIDChecks { + // Fake it, no UID + return getDefaultVRG(vrgNamespace), nil } return nil, err } - if vrgFromMW != nil { - vrgFromMW.Generation = 1 - vrgFromMW.Status = vrgStatus - vrgFromMW.Status.Conditions = append(vrgFromMW.Status.Conditions, metav1.Condition{ - Type: controllers.VRGConditionTypeClusterDataReady, - Reason: controllers.VRGConditionReasonClusterDataRestored, - Status: metav1.ConditionTrue, - Message: "Cluster Data Ready", - LastTransitionTime: metav1.Now(), - ObservedGeneration: vrgFromMW.Generation, - }) - vrgFromMW.Status.Conditions = append(vrgFromMW.Status.Conditions, metav1.Condition{ - Type: controllers.VRGConditionTypeClusterDataProtected, - Reason: controllers.VRGConditionReasonClusterDataRestored, - Status: metav1.ConditionTrue, - Message: "Cluster Data Protected", - LastTransitionTime: metav1.Now(), - ObservedGeneration: vrgFromMW.Generation, - }) - vrgFromMW.Status.Conditions = append(vrgFromMW.Status.Conditions, metav1.Condition{ - Type: controllers.VRGConditionTypeDataProtected, - Reason: controllers.VRGConditionReasonDataProtected, - Status: metav1.ConditionTrue, - Message: "Data Protected", - LastTransitionTime: metav1.Now(), - ObservedGeneration: vrgFromMW.Generation, - }) + vrgFromMW.Generation = 1 + vrgFromMW.Status = vrgStatus + vrgFromMW.Status.Conditions = append(vrgFromMW.Status.Conditions, metav1.Condition{ + Type: controllers.VRGConditionTypeClusterDataReady, + Reason: controllers.VRGConditionReasonClusterDataRestored, + Status: metav1.ConditionTrue, + Message: "Cluster Data Ready", + LastTransitionTime: metav1.Now(), + ObservedGeneration: vrgFromMW.Generation, + }) + vrgFromMW.Status.Conditions = append(vrgFromMW.Status.Conditions, metav1.Condition{ + Type: controllers.VRGConditionTypeClusterDataProtected, + Reason: controllers.VRGConditionReasonClusterDataRestored, + Status: metav1.ConditionTrue, + Message: "Cluster Data Protected", + LastTransitionTime: metav1.Now(), + ObservedGeneration: vrgFromMW.Generation, + }) + vrgFromMW.Status.Conditions = append(vrgFromMW.Status.Conditions, metav1.Condition{ + Type: controllers.VRGConditionTypeDataProtected, + Reason: controllers.VRGConditionReasonDataProtected, + Status: metav1.ConditionTrue, + Message: "Data Protected", + LastTransitionTime: metav1.Now(), + ObservedGeneration: vrgFromMW.Generation, + }) - protectedPVC := &rmn.ProtectedPVC{} - protectedPVC.Name = "random name" - protectedPVC.StorageIdentifiers.ReplicationID.ID = MModeReplicationID - protectedPVC.StorageIdentifiers.StorageProvisioner = MModeCSIProvisioner - protectedPVC.StorageIdentifiers.ReplicationID.Modes = []rmn.MMode{rmn.MModeFailover} + protectedPVC := &rmn.ProtectedPVC{} + protectedPVC.Name = "random name" + protectedPVC.StorageIdentifiers.ReplicationID.ID = MModeReplicationID + protectedPVC.StorageIdentifiers.StorageProvisioner = MModeCSIProvisioner + protectedPVC.StorageIdentifiers.ReplicationID.Modes = []rmn.MMode{rmn.MModeFailover} - vrgFromMW.Status.ProtectedPVCs = append(vrgFromMW.Status.ProtectedPVCs, *protectedPVC) - } + vrgFromMW.Status.ProtectedPVCs = append(vrgFromMW.Status.ProtectedPVCs, *protectedPVC) return vrgFromMW, nil } @@ -974,6 +989,23 @@ func moveVRGToSecondary(clusterNamespace, resourceNamespace, mwType string, prot return vrg, err } +// createVRGMW creates a basic (always Primary) ManifestWork for a VRG, used to fake existing VRG MW +// to test upgrade cases for DRPC based UID adoption +func createVRGMW(name, namespace, homeCluster string) { + vrg := getDefaultVRG(namespace) + + mwu := rmnutil.MWUtil{ + Client: k8sClient, + APIReader: k8sClient, + Ctx: context.TODO(), + Log: logr.Logger{}, + InstName: name, + TargetNamespace: namespace, + } + + Expect(mwu.CreateOrUpdateVRGManifestWork(name, namespace, homeCluster, *vrg, nil)).To(Succeed()) +} + func updateVRGMW(manifestLookupKey types.NamespacedName, dataProtected bool) (*rmn.VolumeReplicationGroup, error) { mw := &ocmworkv1.ManifestWork{} @@ -1467,6 +1499,7 @@ func waitForCompletion(expectedState string) { fmt.Sprintf("failed waiting for state to match. expecting: %s, found %s", expectedState, drstate)) } +//nolint:unparam func waitForDRPCPhaseAndProgression(namespace string, drState rmn.DRState) { Eventually(func() bool { drpc := getLatestDRPC(namespace) @@ -2490,6 +2523,63 @@ var _ = Describe("DRPlacementControl Reconciler", func() { deleteDRClustersAsync() }) }) + + Context("DRPlacementControl Reconciler HubRecovery VRG Adoption (Subscription)", func() { + var userPlacementRule1 *plrv1.PlacementRule + var drpc1 *rmn.DRPlacementControl + + Specify("DRClusters", func() { + populateDRClusters() + }) + + When("Application deployed for the first time", func() { + It("Should deploy drpc", func() { + createNamespacesAsync(getNamespaceObj(DefaultDRPCNamespace)) + createManagedClusters(asyncClusters) + createDRClustersAsync() + createDRPolicyAsync() + setToggleUIDChecks() + + // Create an existing VRG MW on East, to simulate upgrade cases (West1 will report an + // orphan VRG for orphan cases) + createVRGMW(DRPCCommonName, DefaultDRPCNamespace, East1ManagedCluster) + + var placementObj client.Object + placementObj, drpc1 = CreatePlacementAndDRPC( + DefaultDRPCNamespace, UserPlacementRuleName, East1ManagedCluster, UsePlacementRule) + userPlacementRule1 = placementObj.(*plrv1.PlacementRule) + Expect(userPlacementRule1).NotTo(BeNil()) + waitForDRPCPhaseAndProgression(DefaultDRPCNamespace, rmn.Deployed) + uploadVRGtoS3Store(DRPCCommonName, DefaultDRPCNamespace, East1ManagedCluster, rmn.VRGAction("")) + resetToggleUIDChecks() + }) + }) + + When("Deleting DRPolicy with DRPC references", func() { + It("Should retain the deleted DRPolicy in the API server", func() { + By("\n\n*** DELETE drpolicy ***\n\n") + deleteDRPolicyAsync() + }) + }) + + When("Deleting user PlacementRule", func() { + It("Should cleanup DRPC", func() { + By("\n\n*** DELETE User PlacementRule ***\n\n") + deleteUserPlacementRule(UserPlacementRuleName, DefaultDRPCNamespace) + }) + }) + + When("Deleting DRPC", func() { + It("Should delete all VRGs", func() { + Expect(k8sClient.Delete(context.TODO(), drpc1)).Should(Succeed()) + deleteNamespaceMWsFromAllClusters(DefaultDRPCNamespace) + }) + }) + + Specify("delete drclusters", func() { + deleteDRClustersAsync() + }) + }) }) func verifyDRPCStateAndProgression(expectedAction rmn.DRAction, expectedPhase rmn.DRState, @@ -2536,6 +2626,7 @@ func checkConditionAllowFailover(namespace string) { Expect(drpc.Status.Phase).To(Equal(rmn.WaitForUser)) } +//nolint:unparam func uploadVRGtoS3Store(name, namespace, dstCluster string, action rmn.VRGAction) { vrg := buildVRG(name, namespace, dstCluster, action) s3ProfileNames := []string{s3Profiles[0].S3ProfileName, s3Profiles[1].S3ProfileName} diff --git a/controllers/drplacementcontrolvolsync.go b/controllers/drplacementcontrolvolsync.go index 689668d7e..abd1d2694 100644 --- a/controllers/drplacementcontrolvolsync.go +++ b/controllers/drplacementcontrolvolsync.go @@ -248,7 +248,7 @@ func (d *DRPCInstance) updateVRGSpec(clusterName string, tgtVRG *rmn.VolumeRepli d.log.Info(fmt.Sprintf("Updating VRG ownedby MW %s for cluster %s", mw.Name, clusterName)) - vrg, err := d.extractVRGFromManifestWork(mw) + vrg, err := rmnutil.ExtractVRGFromManifestWork(mw) if err != nil { d.log.Error(err, "failed to update VRG state") @@ -345,7 +345,7 @@ func (d *DRPCInstance) ResetVolSyncRDOnPrimary(clusterName string) error { d.log.Info(fmt.Sprintf("Resetting RD VRG ownedby MW %s for cluster %s", mw.Name, clusterName)) - vrg, err := d.extractVRGFromManifestWork(mw) + vrg, err := rmnutil.ExtractVRGFromManifestWork(mw) if err != nil { d.log.Error(err, "failed to extract VRG state") diff --git a/controllers/util/misc.go b/controllers/util/misc.go index 255c8b1d7..b4ffda446 100644 --- a/controllers/util/misc.go +++ b/controllers/util/misc.go @@ -115,3 +115,8 @@ func UpdateStringMap(dst *map[string]string, src map[string]string) { (*dst)[key] = val } } + +// OptionalEqual returns True if optional field values are equal, or one of them is unset. +func OptionalEqual(a, b string) bool { + return a == "" || b == "" || a == b +} diff --git a/controllers/util/mw_util.go b/controllers/util/mw_util.go index 924d31622..a6949af4c 100644 --- a/controllers/util/mw_util.go +++ b/controllers/util/mw_util.go @@ -22,9 +22,7 @@ import ( "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/util/retry" "sigs.k8s.io/controller-runtime/pkg/client" - - dto "github.com/prometheus/client_model/go" - "sigs.k8s.io/controller-runtime/pkg/metrics" + "sigs.k8s.io/yaml" csiaddonsv1alpha1 "github.com/csi-addons/kubernetes-csi-addons/apis/csiaddons/v1alpha1" rmn "github.com/ramendr/ramen/api/v1alpha1" @@ -564,66 +562,18 @@ func (mwu *MWUtil) DeleteManifestWork(mwName, mwNamespace string) error { return nil } -func GetMetricValueSingle(name string, mfType dto.MetricType) (float64, error) { - mf, err := getMetricFamilyFromRegistry(name) - if err != nil { - return 0.0, fmt.Errorf("GetMetricValueSingle returned error finding MetricFamily: %w", err) +func ExtractVRGFromManifestWork(mw *ocmworkv1.ManifestWork) (*rmn.VolumeReplicationGroup, error) { + if len(mw.Spec.Workload.Manifests) == 0 { + return nil, fmt.Errorf("invalid VRG ManifestWork for type: %s", mw.Name) } - val, err := getMetricValueFromMetricFamilyByType(mf, mfType) - if err != nil { - return 0.0, fmt.Errorf("GetMetricValueSingle returned error finding Value: %w", err) - } + vrgClientManifest := &mw.Spec.Workload.Manifests[0] + vrg := &rmn.VolumeReplicationGroup{} - return val, nil -} - -func getMetricFamilyFromRegistry(name string) (*dto.MetricFamily, error) { - metricsFamilies, err := metrics.Registry.Gather() // TODO: see if this can be made more generic + err := yaml.Unmarshal(vrgClientManifest.RawExtension.Raw, &vrg) if err != nil { - return nil, fmt.Errorf("found error during Gather step of getMetricFamilyFromRegistry: %w", err) - } - - if len(metricsFamilies) == 0 { - return nil, fmt.Errorf("couldn't get metricsFamilies from Prometheus Registry") - } - - // TODO: find out if there's a better way to search than linear scan - for i := 0; i < len(metricsFamilies); i++ { - if *metricsFamilies[i].Name == name { - return metricsFamilies[i], nil - } + return nil, fmt.Errorf("unable to unmarshal VRG object (%w)", err) } - return nil, fmt.Errorf(fmt.Sprint("couldn't find MetricFamily with name", name)) -} - -func getMetricValueFromMetricFamilyByType(mf *dto.MetricFamily, mfType dto.MetricType) (float64, error) { - if *mf.Type != mfType { - return 0.0, fmt.Errorf("getMetricValueFromMetricFamilyByType passed invalid type. Wanted %s, got %s", - string(mfType), string(*mf.Type)) - } - - if len(mf.Metric) != 1 { - return 0.0, fmt.Errorf("getMetricValueFromMetricFamilyByType only supports Metric length=1") - } - - switch mfType { - case dto.MetricType_COUNTER: - return *mf.Metric[0].Counter.Value, nil - case dto.MetricType_GAUGE: - return *mf.Metric[0].Gauge.Value, nil - case dto.MetricType_HISTOGRAM: - // Count is more useful for testing over Sum; get Sum elsewhere if needed - return float64(*mf.Metric[0].Histogram.SampleCount), nil - case dto.MetricType_GAUGE_HISTOGRAM: - fallthrough - case dto.MetricType_SUMMARY: - fallthrough - case dto.MetricType_UNTYPED: - fallthrough - default: - return 0.0, fmt.Errorf("getMetricValueFromMetricFamilyByType doesn't support type %s yet. Implement this", - string(mfType)) - } + return vrg, nil } diff --git a/controllers/vrg_volrep.go b/controllers/vrg_volrep.go index cf4db6129..e14922acf 100644 --- a/controllers/vrg_volrep.go +++ b/controllers/vrg_volrep.go @@ -2162,7 +2162,7 @@ func (v *VRGInstance) pvMatches(x, y *corev1.PersistentVolume) bool { "y", y.Spec.PersistentVolumeSource.CSI.FSType) return false - case x.Spec.ClaimRef.Kind != y.Spec.ClaimRef.Kind: + case !rmnutil.OptionalEqual(x.Spec.ClaimRef.Kind, y.Spec.ClaimRef.Kind): v.log.Info("PVs ClaimRef.Kind mismatch", "x", x.Spec.ClaimRef.Kind, "y", y.Spec.ClaimRef.Kind) return false diff --git a/docs/devel-quick-start.md b/docs/devel-quick-start.md index 212db6fdf..004a46c40 100644 --- a/docs/devel-quick-start.md +++ b/docs/devel-quick-start.md @@ -72,6 +72,22 @@ enough resources: link `venv` for activating the environment. To activate the environment use: +1. Install Go 1.20 + + Ramen requires now Go 1.20 due to backward incompatible changes in Go + 1.21 and later. If your system Go is newer and you don't want to + downgrade it, you can install Go 1.20 according to + [Managing Go installations](https://go.dev/doc/manage-install). + + To use Go 1.20 from the ~/sdk, change the PATH in the shell used to + build ramen: + + ``` + $ export PATH="/home/username/sdk/go1.20.14/bin:$PATH" + $ go version + go version go1.20.14 linux/amd64 + ``` + That's all! You are ready to submit your first pull request! ## Running the tests diff --git a/go.mod b/go.mod index cc103c23a..358e5a604 100644 --- a/go.mod +++ b/go.mod @@ -19,7 +19,6 @@ require ( github.com/operator-framework/api v0.17.6 github.com/pkg/errors v0.9.1 github.com/prometheus/client_golang v1.16.0 - github.com/prometheus/client_model v0.4.0 github.com/ramendr/ramen/api v0.0.0-20240117171503-e11c56eac24d github.com/ramendr/recipe v0.0.0-20230817160432-729dc7fd8932 github.com/stolostron/multicloud-operators-foundation v0.0.0-20220824091202-e9cd9710d009 @@ -69,6 +68,7 @@ require ( github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect + github.com/prometheus/client_model v0.4.0 // indirect github.com/prometheus/common v0.44.0 // indirect github.com/prometheus/procfs v0.10.1 // indirect github.com/sirupsen/logrus v1.9.3 // indirect