Skip to content

Commit

Permalink
Fix possible nil ptr in snapshot/pvc clone controllers
Browse files Browse the repository at this point in the history
We slap on the clone-without-source index key regardless of source.PVC/source.Snapshot so
it was possible for a DV with PVC source to get queued for the snapshot controller
(and hence the nil ptr).

The test addition demonstrates a real use case where this would happen.

Signed-off-by: Alex Kalenyuk <akalenyu@redhat.com>
  • Loading branch information
akalenyu committed Sep 18, 2023
1 parent f7f95c5 commit c512296
Show file tree
Hide file tree
Showing 4 changed files with 14 additions and 11 deletions.
9 changes: 3 additions & 6 deletions pkg/controller/datavolume/clone-controller-base.go
Original file line number Diff line number Diff line change
Expand Up @@ -524,7 +524,7 @@ func isCrossNamespaceClone(dv *cdiv1.DataVolume) bool {
}

// addCloneWithoutSourceWatch reconciles clones created without source once the matching PVC is created
func addCloneWithoutSourceWatch(mgr manager.Manager, datavolumeController controller.Controller, typeToWatch client.Object, indexingKey string) error {
func addCloneWithoutSourceWatch(mgr manager.Manager, datavolumeController controller.Controller, typeToWatch client.Object, indexingKey string, op dataVolumeOp) error {
getKey := func(namespace, name string) string {
return namespace + "/" + name
}
Expand All @@ -533,7 +533,7 @@ func addCloneWithoutSourceWatch(mgr manager.Manager, datavolumeController contro
dv := obj.(*cdiv1.DataVolume)
if source := dv.Spec.Source; source != nil {
_, sourceName, sourceNamespace := cc.GetCloneSourceInfo(dv)
if sourceName != "" {
if getDataVolumeOp(mgr.GetLogger(), dv, mgr.GetClient()) == op && sourceName != "" {
ns := cc.GetNamespace(sourceNamespace, obj.GetNamespace())
return []string{getKey(ns, sourceName)}
}
Expand All @@ -552,10 +552,7 @@ func addCloneWithoutSourceWatch(mgr manager.Manager, datavolumeController contro
return
}
for _, dv := range dvList.Items {
op := getDataVolumeOp(mgr.GetLogger(), &dv, mgr.GetClient())
if op == dataVolumePvcClone || op == dataVolumeSnapshotClone {
reqs = append(reqs, reconcile.Request{NamespacedName: types.NamespacedName{Namespace: dv.Namespace, Name: dv.Name}})
}
reqs = append(reqs, reconcile.Request{NamespacedName: types.NamespacedName{Namespace: dv.Namespace, Name: dv.Name}})
}
return
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/datavolume/pvc-clone-controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func (r *PvcCloneReconciler) addDataVolumeCloneControllerWatches(mgr manager.Man
}

// Watch to reconcile clones created without source
if err := addCloneWithoutSourceWatch(mgr, datavolumeController, &corev1.PersistentVolumeClaim{}, "spec.source.pvc"); err != nil {
if err := addCloneWithoutSourceWatch(mgr, datavolumeController, &corev1.PersistentVolumeClaim{}, "spec.source.pvc", dataVolumePvcClone); err != nil {
return err
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/datavolume/snapshot-clone-controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ func (r *SnapshotCloneReconciler) addDataVolumeSnapshotCloneControllerWatches(mg
}
}

if err := addCloneWithoutSourceWatch(mgr, datavolumeController, &snapshotv1.VolumeSnapshot{}, "spec.source.snapshot"); err != nil {
if err := addCloneWithoutSourceWatch(mgr, datavolumeController, &snapshotv1.VolumeSnapshot{}, "spec.source.snapshot", dataVolumeSnapshotClone); err != nil {
return err
}

Expand Down
12 changes: 9 additions & 3 deletions tests/dataimportcron_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -670,11 +670,12 @@ var _ = Describe("DataImportCron", func() {
if !f.IsSnapshotStorageClassAvailable() {
Skip("Volumesnapshot support needed to test DataImportCron with Volumesnapshot sources")
}
size := "1Gi"

configureStorageProfileResultingFormat(cdiv1.DataImportCronSourceFormatPvc)

By(fmt.Sprintf("Create new DataImportCron %s, url %s", cronName, *reg.URL))
cron = utils.NewDataImportCronWithStorageSpec(cronName, "1Gi", scheduleOnceAYear, dataSourceName, importsToKeep, *reg)
cron = utils.NewDataImportCronWithStorageSpec(cronName, size, scheduleOnceAYear, dataSourceName, importsToKeep, *reg)
retentionPolicy := cdiv1.DataImportCronRetainNone
cron.Spec.RetentionPolicy = &retentionPolicy

Expand All @@ -691,6 +692,12 @@ var _ = Describe("DataImportCron", func() {
err = f.CrClient.List(context.TODO(), snapshots, &client.ListOptions{Namespace: ns})
Expect(err).ToNot(HaveOccurred())
Expect(snapshots.Items).To(BeEmpty())
// Ensure existing PVC clones from this source don't mess up future ones
cloneDV := utils.NewDataVolumeForImageCloningAndStorageSpec("target-dv-from-pvc", size, ns, currentImportDv, nil, nil)
cloneDV, err = utils.CreateDataVolumeFromDefinition(f.CdiClient, f.Namespace.Name, cloneDV)
Expect(err).ToNot(HaveOccurred())
f.ForceBindPvcIfDvIsWaitForFirstConsumer(cloneDV)
err = utils.WaitForDataVolumePhase(f, cloneDV.Namespace, cdiv1.Succeeded, cloneDV.Name)

// Now simulate an upgrade, where a new CDI version has identified
// more storage types that scale better with snapshots
Expand All @@ -713,8 +720,7 @@ var _ = Describe("DataImportCron", func() {
}
Expect(dataSource.Spec.Source).To(Equal(expectedSource))
// Verify content
size := "1Gi"
targetDV := utils.NewDataVolumeWithSourceRefAndStorageAPI("target-dv", &size, dataSource.Namespace, dataSource.Name)
targetDV := utils.NewDataVolumeWithSourceRefAndStorageAPI("target-dv-from-snap", &size, dataSource.Namespace, dataSource.Name)
By(fmt.Sprintf("Create new target datavolume %s", targetDV.Name))
targetDataVolume, err := utils.CreateDataVolumeFromDefinition(f.CdiClient, ns, targetDV)
Expect(err).ToNot(HaveOccurred())
Expand Down

0 comments on commit c512296

Please sign in to comment.