Skip to content

Commit

Permalink
Reorganize pvc creation logic
Browse files Browse the repository at this point in the history
  • Loading branch information
falfaroc committed Aug 23, 2024
1 parent cad09a4 commit 5cad19e
Showing 1 changed file with 56 additions and 59 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/utils/pointer"
reconcile "sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/ratelimiter"
Expand Down Expand Up @@ -381,7 +382,7 @@ func (r *ReplicationGroupReconciler) processSnapshotEvent(ctx context.Context, g
// create default snapshot class if it does not exist
// example driver class: csi-vxflexos.dellemc.com
// example default snapshot class: default-csi-vxflexos
snClass := group.Annotations[r.Domain+"/snapshotClass"]
snClass := group.Annotations[controller.SnapshotClass]
driverClass := group.Labels[controller.DriverName]
if snClass == "" {
part := strings.Split(driverClass, ".")[0]
Expand All @@ -392,13 +393,17 @@ func (r *ReplicationGroupReconciler) processSnapshotEvent(ctx context.Context, g
return err
}
}

sc, err := remoteClient.GetSnapshotClass(ctx, snClass)
if err != nil {
log.V(common.ErrorLevel).Error(err, "failing to retrieve snapshot class, creating a default class")
sc = makeSnapshotClass(driverClass, snClass)
if !errors.IsNotFound(err) {
return fmt.Errorf("error getting snapshot class: %s", err.Error())
}

log.V(common.InfoLevel).Info("Snapshotclass %s not found, creating a default class", snClass)
sc = makeSnapshotClassRef(driverClass, snClass)
if err = remoteClient.CreateSnapshotClass(ctx, sc); err != nil {
log.V(common.ErrorLevel).Error(err, "unable to create default snapshot class")
return err
return fmt.Errorf("unable to create default snapshot class: %s", err.Error())
}
}

Expand Down Expand Up @@ -438,34 +443,38 @@ func (r *ReplicationGroupReconciler) processSnapshotEvent(ctx context.Context, g
}

func (r *ReplicationGroupReconciler) createPVCsFromSnapshots(ctx context.Context, group *repv1.DellCSIReplicationGroup, remoteClient connection.RemoteClusterClient, log logr.Logger, snClass, storageClass string) error {
if group == nil {
return fmt.Errorf("group is nil")
}
if remoteClient == nil {
return fmt.Errorf("remoteClient is nil")
// Check to see if the storage class has replication enabled. Error if so.
if sc, err := remoteClient.GetStorageClass(ctx, storageClass); err == nil {
if val, ok := sc.Annotations[controller.StorageClassReplicationParam]; ok && val == "true" {
return fmt.Errorf("storage class %s has replication enabled", storageClass)
}
}

log.Info("starting create pvcs from snapshot")
rgName := group.Name

pvcList, err := remoteClient.ListPersistentVolumeClaim(ctx, client.MatchingLabels{r.Domain + "/replicationGroupName": rgName})
// Retrieve the list of pvcs in the source cluster.
var pvcList v1.PersistentVolumeClaimList
err := r.List(ctx, &pvcList, client.MatchingLabels{controller.ReplicationGroup: group.Name})
if err != nil {
log.Error(err, "error getting pvcs: %v")
return err
}

log.V(common.InfoLevel).Info("Found %d pvcs", len(pvcList.Items))
log.V(common.InfoLevel).Info(fmt.Sprintf("Found %d pvcs", len(pvcList.Items)))
for _, pvc := range pvcList.Items {
// step 1: retrieve the latest snapshot content from pvc
pvName := pvc.Spec.VolumeName
pv, err := remoteClient.GetPersistentVolume(ctx, pvName)
var pv v1.PersistentVolume
err = r.Get(ctx, types.NamespacedName{Name: pvName}, &pv)
if err != nil {
log.Error(err, "error getting pv: %v")
return fmt.Errorf("error getting pv %s: %s", pvName, err.Error())
}
pvHandle := pv.Spec.CSI.VolumeHandle
snContentList, err := remoteClient.ListSnapshotContent(ctx, client.MatchingLabels{"pv-handle": pvHandle})

snContentList, err := remoteClient.ListSnapshotContent(ctx, client.MatchingLabels{"pv-handle": pv.Spec.CSI.VolumeHandle})
if err != nil {
log.Error(err, "error getting snapshot contents: %v")
return fmt.Errorf("error listing snapshot content for pv %s: %s", pv.Spec.CSI.VolumeHandle, err.Error())
}

// return error if list is empty
if len(snContentList.Items) == 0 {
return fmt.Errorf("no snapshot contents found for volume %s", pvName) // cannot be changed
Expand All @@ -490,14 +499,16 @@ func (r *ReplicationGroupReconciler) createPVCsFromSnapshots(ctx context.Context

err = remoteClient.CreateNamespace(ctx, nsRef)
if err != nil {
msg := "unable to create the desired namespace" + newNamespace
log.V(common.ErrorLevel).Error(err, msg)
return err
return fmt.Errorf("unable to create the desired namespace %s: %s", newNamespace, err.Error())
}
}

snContent, err := remoteClient.GetSnapshotContent(ctx, snContentLatestName)
if err != nil {
return fmt.Errorf("error getting snapshot content %s: %s", snContentLatestName, err.Error())
}

clonedSnapshotContentName := "cloned-" + snContentLatestName
snContent, _ := remoteClient.GetSnapshotContent(ctx, snContentLatestName)
snName := snContent.Spec.VolumeSnapshotRef.Name

clonedSnapshotContent := &s1.VolumeSnapshotContent{
Expand All @@ -524,45 +535,14 @@ func (r *ReplicationGroupReconciler) createPVCsFromSnapshots(ctx context.Context
return err
}

// step 3: create new snapshot
newSnapshot := &s1.VolumeSnapshot{
ObjectMeta: metav1.ObjectMeta{
Name: snName,
Namespace: newNamespace,
},
Spec: s1.VolumeSnapshotSpec{
Source: s1.VolumeSnapshotSource{
VolumeSnapshotContentName: &clonedSnapshotContentName,
},
VolumeSnapshotClassName: &snClass,
},
}

newSnapshot := makeSnapshotObject(snName, clonedSnapshotContentName, snClass, newNamespace)
err = remoteClient.CreateSnapshotObject(ctx, newSnapshot)
if err != nil {
log.V(common.ErrorLevel).Error(err, "error creating new Snapshot in namespace "+newNamespace)
return err
}

// step 4: create pvc from snapshot
pvcName := pvc.Name
newPVC := &v1.PersistentVolumeClaim{
ObjectMeta: metav1.ObjectMeta{
Name: pvcName,
Namespace: newNamespace,
},
Spec: v1.PersistentVolumeClaimSpec{
StorageClassName: pointer.String(storageClass),
AccessModes: pvc.Spec.AccessModes,
Resources: pvc.Spec.Resources,
DataSource: &v1.TypedLocalObjectReference{
APIGroup: pointer.String("snapshot.storage.k8s.io"),
Kind: "VolumeSnapshot",
Name: snName,
},
},
}

newPVC := makePersistentVolumeClaimFromSnapshot(pvc.Name, newNamespace, snName, storageClass, pvc.Spec)
err = remoteClient.CreatePersistentVolumeClaim(ctx, newPVC)
if err != nil {
log.V(common.ErrorLevel).Error(err, "error creating PVC in namespace "+newNamespace)
Expand Down Expand Up @@ -608,7 +588,7 @@ func makeSnapshotObject(snapName, contentName, className, namespace string) *s1.
return volsnap
}

func makeSnapshotClass(driver, snapClass string) *s1.VolumeSnapshotClass {
func makeSnapshotClassRef(driver, snapClass string) *s1.VolumeSnapshotClass {
return &s1.VolumeSnapshotClass{
Driver: driver,
DeletionPolicy: "Delete",
Expand All @@ -619,12 +599,10 @@ func makeSnapshotClass(driver, snapClass string) *s1.VolumeSnapshotClass {
}

func makeVolSnapContent(snapName, volumeName string, snapRef v1.ObjectReference, sc *s1.VolumeSnapshotClass) *s1.VolumeSnapshotContent {
matchingLabels := make(map[string]string)
matchingLabels["pv-handle"] = volumeName
volsnapcontent := &s1.VolumeSnapshotContent{
ObjectMeta: metav1.ObjectMeta{
Name: "volume-" + volumeName + "-" + strconv.FormatInt(time.Now().Unix(), 10),
Labels: matchingLabels,
Labels: map[string]string{"pv-handle": volumeName},
},
Spec: s1.VolumeSnapshotContentSpec{
VolumeSnapshotRef: snapRef,
Expand All @@ -639,6 +617,25 @@ func makeVolSnapContent(snapName, volumeName string, snapRef v1.ObjectReference,
return volsnapcontent
}

func makePersistentVolumeClaimFromSnapshot(name, namespace, snName, storageClass string, pvcSpec v1.PersistentVolumeClaimSpec) *v1.PersistentVolumeClaim {
return &v1.PersistentVolumeClaim{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: namespace,
},
Spec: v1.PersistentVolumeClaimSpec{
StorageClassName: &storageClass,
AccessModes: pvcSpec.AccessModes,
Resources: pvcSpec.Resources,
DataSource: &v1.TypedLocalObjectReference{
APIGroup: pointer.String("snapshot.storage.k8s.io"),
Kind: "VolumeSnapshot",
Name: snName,
},
},
}
}

// SetupWithManager start using reconciler by creating new controller managed by provided manager
func (r *ReplicationGroupReconciler) SetupWithManager(mgr ctrl.Manager, limiter ratelimiter.RateLimiter, maxReconcilers int) error {
return ctrl.NewControllerManagedBy(mgr).
Expand Down

0 comments on commit 5cad19e

Please sign in to comment.