Skip to content

Commit

Permalink
Delete old version DV's with DIC GC
Browse files Browse the repository at this point in the history
Signed-off-by: Ido Aharon <iaharon@redhat.com>
  • Loading branch information
ido106 committed Sep 6, 2023
1 parent b625320 commit 00cfb22
Show file tree
Hide file tree
Showing 3 changed files with 230 additions and 5 deletions.
25 changes: 23 additions & 2 deletions pkg/controller/dataimportcron-controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -804,7 +804,7 @@ func (r *DataImportCronReconciler) garbageCollectOldImports(ctx context.Context,
maxImports = int(*cron.Spec.ImportsToKeep)
}

if err := r.garbageCollectPVCs(ctx, cron.Namespace, selector, maxImports); err != nil {
if err := r.garbageCollectPVCs(ctx, cron.Namespace, cron.Name, selector, maxImports); err != nil {
return err
}
if err := r.garbageCollectSnapshots(ctx, cron.Namespace, selector, maxImports); err != nil {
Expand All @@ -814,7 +814,7 @@ func (r *DataImportCronReconciler) garbageCollectOldImports(ctx context.Context,
return nil
}

func (r *DataImportCronReconciler) garbageCollectPVCs(ctx context.Context, namespace string, selector labels.Selector, maxImports int) error {
func (r *DataImportCronReconciler) garbageCollectPVCs(ctx context.Context, namespace, cronName string, selector labels.Selector, maxImports int) error {
pvcList := &corev1.PersistentVolumeClaimList{}

if err := r.client.List(ctx, pvcList, &client.ListOptions{Namespace: namespace, LabelSelector: selector}); err != nil {
Expand All @@ -832,6 +832,27 @@ func (r *DataImportCronReconciler) garbageCollectPVCs(ctx context.Context, names
}
}

dvList := &cdiv1.DataVolumeList{}
if err := r.client.List(ctx, dvList, &client.ListOptions{Namespace: namespace, LabelSelector: selector}); err != nil {
return err
}

if len(dvList.Items) > maxImports {
for _, dv := range dvList.Items {
pvc := &corev1.PersistentVolumeClaim{}
if err := r.client.Get(ctx, types.NamespacedName{Namespace: namespace, Name: dv.Name}, pvc); err != nil {
return err
}

if pvc.Labels[common.DataImportCronLabel] != cronName {
r.log.Info("Deleting dv/pvc", "name", pvc.Name, "pvc.uid", pvc.UID)
if err := r.deleteDvPvc(ctx, dv.Name, dv.Namespace); err != nil {
return err
}
}
}
}

return nil
}

Expand Down
58 changes: 58 additions & 0 deletions pkg/controller/dataimportcron-controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -706,6 +706,64 @@ var _ = Describe("All DataImportCron Tests", func() {
Entry("has no tag", imageStreamName, 1),
)

It("Should succeed garbage collecting old version DVs", func() {
cron = newDataImportCron(cronName)
importsToKeep := int32(1)
cron.Spec.ImportsToKeep = &importsToKeep
cron.Annotations[AnnSourceDesiredDigest] = testDigest
reconciler = createDataImportCronReconciler(cron)

// Labeled DV and unlabeled PVC
dv1 := cc.NewImportDataVolume("test-dv1")
dv1.Labels = map[string]string{common.DataImportCronLabel: cronName}
err := reconciler.client.Create(context.TODO(), dv1)
Expect(err).ToNot(HaveOccurred())

pvc1 := cc.CreatePvc(dv1.Name, dv1.Namespace, nil, nil)
err = reconciler.client.Create(context.TODO(), pvc1)
Expect(err).ToNot(HaveOccurred())

// Labeled DV and PVC
dv2 := cc.NewImportDataVolume("test-dv2")
dv2.Labels = map[string]string{common.DataImportCronLabel: cronName}
err = reconciler.client.Create(context.TODO(), dv2)
Expect(err).ToNot(HaveOccurred())

pvc2 := cc.CreatePvc(dv2.Name, dv2.Namespace, nil, nil)
pvc2.Labels = map[string]string{common.DataImportCronLabel: cronName}
err = reconciler.client.Create(context.TODO(), pvc2)
Expect(err).ToNot(HaveOccurred())

// Unlabeled DV and PVC
dv3 := cc.NewImportDataVolume("test-dv3")
err = reconciler.client.Create(context.TODO(), dv3)
Expect(err).ToNot(HaveOccurred())

pvc3 := cc.CreatePvc(dv3.Name, dv3.Namespace, nil, nil)
err = reconciler.client.Create(context.TODO(), pvc3)
Expect(err).ToNot(HaveOccurred())

err = reconciler.garbageCollectOldImports(context.TODO(), cron)
Expect(err).ToNot(HaveOccurred())

// Ensure the old version DV is deleted (labeled DV and unlabeled PVC).
// The labeled PVC will not be deleted here, as there is no relevant controller.
err = reconciler.client.Get(context.TODO(), dvKey(dv1.Name), dv1)
Expect(k8serrors.IsNotFound(err)).To(BeTrue())

// Ensure the new version DV is not deleted (labeled DV and labeled PVC).
err = reconciler.client.Get(context.TODO(), dvKey(dv2.Name), dv2)
Expect(err).ToNot(HaveOccurred())
err = reconciler.client.Get(context.TODO(), dvKey(pvc2.Name), pvc2)
Expect(err).ToNot(HaveOccurred())

// Ensure unrelated DVs and PVCs are not deleted (unlabeled DV and PVC)
err = reconciler.client.Get(context.TODO(), dvKey(dv3.Name), dv3)
Expect(err).ToNot(HaveOccurred())
err = reconciler.client.Get(context.TODO(), dvKey(pvc3.Name), pvc3)
Expect(err).ToNot(HaveOccurred())
})

It("should pass through metadata to DataVolume and DataSource", func() {
cron = newDataImportCron(cronName)
cron.Annotations[AnnSourceDesiredDigest] = testDigest
Expand Down
152 changes: 149 additions & 3 deletions tests/dataimportcron_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ var _ = Describe("DataImportCron", func() {
ns string
scName string
originalProfileSpec *cdiv1.StorageProfileSpec
dvName = "dv-garbage"
oldDvName = "src-garbage-0"
)

BeforeEach(func() {
Expand Down Expand Up @@ -367,6 +369,120 @@ var _ = Describe("DataImportCron", func() {
Entry("[test_id:8266] succeed deleting error DVs when importing new ones", false, true, 2, cdiv1.DataImportCronSourceFormatPvc),
)

It("[test_id:XXXX] Succeed garbage collecting old version DVs", func() {
reg, err := getDataVolumeSourceRegistry(f)
Expect(err).ToNot(HaveOccurred())

By(fmt.Sprintf("Create labeled DataVolume %s for garbage collection test", dvName))
dv := utils.NewDataVolumeWithRegistryImport(dvName, "5Gi", "")
dv.Spec.Source.Registry = reg
dv.Labels = map[string]string{common.DataImportCronLabel: cronName}
cc.AddAnnotation(dv, cc.AnnDeleteAfterCompletion, "false")
dv, err = utils.CreateDataVolumeFromDefinition(f.CdiClient, ns, dv)
Expect(err).ToNot(HaveOccurred())

By("Wait for import completion")
f.ForceBindPvcIfDvIsWaitForFirstConsumer(dv)
err = utils.WaitForDataVolumePhase(f, ns, cdiv1.Succeeded, dv.Name)
Expect(err).ToNot(HaveOccurred(), "Datavolume not in phase succeeded in time")

By(fmt.Sprintf("Verify pvc was created %s", dv.Name))
pvc, err := utils.WaitForPVC(f.K8sClient, ns, dv.Name)
Expect(err).ToNot(HaveOccurred())
By(fmt.Sprintf("Verify DataImportCronLabel is passed to the pvc: %s", pvc.Labels[common.DataImportCronLabel]))
Expect(pvc.Labels[common.DataImportCronLabel]).To(Equal(cronName))

pvc.Labels[common.DataImportCronLabel] = ""
By("Update DataImportCron label to be empty in the pvc")
_, err = f.K8sClient.CoreV1().PersistentVolumeClaims(pvc.Namespace).Update(context.TODO(), pvc, metav1.UpdateOptions{})
Expect(err).ToNot(HaveOccurred())

By(fmt.Sprintf("Create new DataImportCron %s, url %s", cronName, *reg.URL))
cron = utils.NewDataImportCron(cronName, "5Gi", scheduleEveryMinute, dataSourceName, importsToKeep, *reg)
retentionPolicy := cdiv1.DataImportCronRetainNone
cron.Spec.RetentionPolicy = &retentionPolicy
cron, err = f.CdiClient.CdiV1beta1().DataImportCrons(ns).Create(context.TODO(), cron, metav1.CreateOptions{})
Expect(err).ToNot(HaveOccurred())

By("Wait for conditions")
waitForConditions(corev1.ConditionFalse, corev1.ConditionTrue)

By("Verify CurrentImports update")
currentImportDvName := cron.Status.CurrentImports[0].DataVolumeName
Expect(currentImportDvName).ToNot(BeEmpty())

dataSource, err := f.CdiClient.CdiV1beta1().DataSources(ns).Get(context.TODO(), dataSourceName, metav1.GetOptions{})
Expect(err).ToNot(HaveOccurred())

if pvc := dataSource.Spec.Source.PVC; pvc != nil {
By(fmt.Sprintf("Verify pvc was created %s", pvc.Name))
_, err = utils.WaitForPVC(f.K8sClient, ns, pvc.Name)
Expect(err).ToNot(HaveOccurred())

By("Wait for import completion")
err = utils.WaitForDataVolumePhase(f, ns, cdiv1.Succeeded, pvc.Name)
Expect(err).ToNot(HaveOccurred(), "Datavolume not in phase succeeded in time")
} else if snap := dataSource.Spec.Source.Snapshot; snap != nil {
By("Wait for snapshot to be ready")
snapshot := &snapshotv1.VolumeSnapshot{
ObjectMeta: metav1.ObjectMeta{
Name: snap.Name,
Namespace: snap.Namespace,
},
}
snapshot = utils.WaitSnapshotReady(f.CrClient, snapshot)
By("Verify snapshot is ready to be used")
Expect(snapshot.Status.ReadyToUse).ToNot(BeNil())
Expect(*snapshot.Status.ReadyToUse).To(BeTrue())
}

By("Check garbage collection")
Eventually(func() bool {
_, err := f.CdiClient.CdiV1beta1().DataVolumes(ns).Get(context.TODO(), dvName, metav1.GetOptions{})
return err != nil
}, dataImportCronTimeout, pollingInterval).Should(Equal(true), "Garbage collection failed cleaning old imports")

if pvc := dataSource.Spec.Source.PVC; pvc != nil {
By("Check that the PVC is timestamped and not garbage collected")
pvc, err := f.K8sClient.CoreV1().PersistentVolumeClaims(ns).Get(context.TODO(), pvc.Name, metav1.GetOptions{})
Expect(err).ToNot(HaveOccurred())
// timestamp
lastUse := pvc.Annotations[controller.AnnLastUseTime]
Expect(lastUse).ToNot(BeEmpty())
ts, err := time.Parse(time.RFC3339Nano, lastUse)
Expect(err).ToNot(HaveOccurred())
Expect(ts).To(BeTemporally("<", time.Now()))
} else if snap := dataSource.Spec.Source.Snapshot; snap != nil {
By("Check that the snapshot is timestamped and not garbage collected")
snapshot := &snapshotv1.VolumeSnapshot{
ObjectMeta: metav1.ObjectMeta{
Name: snap.Name,
Namespace: snap.Namespace,
},
}
snapshot = utils.WaitSnapshotReady(f.CrClient, snapshot)
lastUse := snapshot.Annotations[controller.AnnLastUseTime]
Expect(lastUse).ToNot(BeEmpty())
ts, err := time.Parse(time.RFC3339Nano, lastUse)
Expect(err).ToNot(HaveOccurred())
Expect(ts).To(BeTemporally("<", time.Now()))
}

By("Delete cron")
err = f.CdiClient.CdiV1beta1().DataImportCrons(ns).Delete(context.TODO(), cronName, metav1.DeleteOptions{})
Expect(err).ToNot(HaveOccurred())

By("Verify DataSource deletion")
Eventually(func() bool {
_, err := f.CdiClient.CdiV1beta1().DataSources(ns).Get(context.TODO(), dataSourceName, metav1.GetOptions{})
return errors.IsNotFound(err)
}, dataImportCronTimeout, pollingInterval).Should(BeTrue(), "DataSource was not deleted")

By("Verify original PVC deletion")
_, err = f.K8sClient.CoreV1().PersistentVolumeClaims(ns).Get(context.TODO(), dv.Name, metav1.GetOptions{})
Expect(err).To(HaveOccurred())
})

It("[test_id:10040] Should get digest updated by external poller", func() {
By("Create DataImportCron with only initial poller job")
cron = utils.NewDataImportCron(cronName, "5Gi", scheduleOnceAYear, dataSourceName, importsToKeep, *reg)
Expand Down Expand Up @@ -447,7 +563,7 @@ var _ = Describe("DataImportCron", func() {
configureStorageProfileResultingFormat(format)

garbageSources := 3
for i := 0; i < garbageSources; i++ {
for i := 1; i <= garbageSources; i++ {
srcName := fmt.Sprintf("src-garbage-%d", i)
By(fmt.Sprintf("Create %s", srcName))
switch format {
Expand Down Expand Up @@ -478,9 +594,33 @@ var _ = Describe("DataImportCron", func() {

switch format {
case cdiv1.DataImportCronSourceFormatPvc:
By(fmt.Sprintf("Create labeled DataVolume %s for old DVs garbage collection test", oldDvName))
dv := utils.NewDataVolumeWithRegistryImport(oldDvName, "5Gi", "")
dv.Spec.Source.Registry = reg
dv.Labels = map[string]string{common.DataImportCronLabel: cronName}
cc.AddAnnotation(dv, cc.AnnDeleteAfterCompletion, "false")
dv, err = utils.CreateDataVolumeFromDefinition(f.CdiClient, ns, dv)
Expect(err).ToNot(HaveOccurred())

By("Wait for import completion")
f.ForceBindPvcIfDvIsWaitForFirstConsumer(dv)
err = utils.WaitForDataVolumePhase(f, ns, cdiv1.Succeeded, dv.Name)
Expect(err).ToNot(HaveOccurred(), "Datavolume not in phase succeeded in time")

By(fmt.Sprintf("Verify PVC was created %s", dv.Name))
pvc, err := utils.WaitForPVC(f.K8sClient, ns, dv.Name)
Expect(err).ToNot(HaveOccurred())
By(fmt.Sprintf("Verify DataImportCronLabel is passed to the PVC: %s", pvc.Labels[common.DataImportCronLabel]))
Expect(pvc.Labels[common.DataImportCronLabel]).To(Equal(cronName))

pvc.Labels[common.DataImportCronLabel] = ""
By("Update DataImportCron label to be empty in the PVC")
_, err = f.K8sClient.CoreV1().PersistentVolumeClaims(pvc.Namespace).Update(context.TODO(), pvc, metav1.UpdateOptions{})
Expect(err).ToNot(HaveOccurred())

pvcList, err := f.K8sClient.CoreV1().PersistentVolumeClaims(ns).List(context.TODO(), metav1.ListOptions{})
Expect(err).ToNot(HaveOccurred())
Expect(pvcList.Items).To(HaveLen(garbageSources))
Expect(pvcList.Items).To(HaveLen(garbageSources + 1))
case cdiv1.DataImportCronSourceFormatSnapshot:
snapshots := &snapshotv1.VolumeSnapshotList{}
err := f.CrClient.List(context.TODO(), snapshots, &client.ListOptions{Namespace: ns})
Expand All @@ -506,6 +646,12 @@ var _ = Describe("DataImportCron", func() {
By("Check garbage collection")
switch format {
case cdiv1.DataImportCronSourceFormatPvc:
By("Check old DV garbage collection")
Eventually(func() bool {
_, err := f.CdiClient.CdiV1beta1().DataVolumes(ns).Get(context.TODO(), oldDvName, metav1.GetOptions{})
return err != nil
}, dataImportCronTimeout, pollingInterval).Should(Equal(true), "Garbage collection failed cleaning old DV")

pvcList := &corev1.PersistentVolumeClaimList{}
Eventually(func() int {
pvcList, err = f.K8sClient.CoreV1().PersistentVolumeClaims(ns).List(context.TODO(), metav1.ListOptions{})
Expand Down Expand Up @@ -551,7 +697,7 @@ var _ = Describe("DataImportCron", func() {
Expect(found).To(BeTrue())
}
},
Entry("[test_id:7406] with PVC sources", cdiv1.DataImportCronSourceFormatPvc),
Entry("[test_id:7406] with PVC & DV sources", cdiv1.DataImportCronSourceFormatPvc),
Entry("[test_id:10033] with snapshot sources", cdiv1.DataImportCronSourceFormatSnapshot),
)

Expand Down

0 comments on commit 00cfb22

Please sign in to comment.