Skip to content

Commit

Permalink
Enable cross-namespace dataSourceRef population
Browse files Browse the repository at this point in the history
Kubernetes v1.26 just introduced an alpha feature that lets you specify PVCs with dataSourceRefs pointing to different namespace.

This commit modifies the common CDI populator code to support population of PVCs with sources from different namespaces.

Signed-off-by: Alvaro Romero <alromero@redhat.com>
  • Loading branch information
alromeros committed Apr 24, 2023
1 parent d852b3a commit b17f57a
Show file tree
Hide file tree
Showing 6 changed files with 141 additions and 31 deletions.
2 changes: 1 addition & 1 deletion pkg/controller/common/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -939,7 +939,7 @@ func SetRestrictedSecurityContext(podSpec *v1.PodSpec) {
// SetNodeNameIfPopulator sets NodeName in a pod spec when the PVC is being handled by a CDI volume populator
func SetNodeNameIfPopulator(pvc *corev1.PersistentVolumeClaim, podSpec *v1.PodSpec) {
_, isPopulator := pvc.Annotations[AnnPopulatorKind]
nodeName, _ := pvc.Annotations[AnnSelectedNode]
nodeName := pvc.Annotations[AnnSelectedNode]
if isPopulator && nodeName != "" {
podSpec.NodeName = nodeName
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/populators/import-populator.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ func (r *ImportPopulatorReconciler) getPopulationSource(namespace, name string)

// Import-specific implementation of reconcileTargetPVC
func (r *ImportPopulatorReconciler) reconcileTargetPVC(pvc, pvcPrime *corev1.PersistentVolumeClaim) (reconcile.Result, error) {
phase, _ := pvcPrime.Annotations[cc.AnnPodPhase]
phase := pvcPrime.Annotations[cc.AnnPodPhase]
if err := r.updateImportProgress(phase, pvc, pvcPrime); err != nil {
return reconcile.Result{}, err
}
Expand Down
44 changes: 38 additions & 6 deletions pkg/controller/populators/import-populator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,11 +73,11 @@ var _ = Describe("Import populator tests", func() {
AnnDefaultStorageClass: "true",
}, map[string]string{}, "csi-plugin")

getVolumeImportSource := func(preallocation bool) *cdiv1.VolumeImportSource {
getVolumeImportSource := func(preallocation bool, namespace string) *cdiv1.VolumeImportSource {
return &cdiv1.VolumeImportSource{
ObjectMeta: metav1.ObjectMeta{
Name: samplePopulatorName,
Namespace: metav1.NamespaceDefault,
Namespace: namespace,
},
Spec: cdiv1.VolumeImportSourceSpec{
ContentType: cdiv1.DataVolumeKubeVirt,
Expand Down Expand Up @@ -120,12 +120,44 @@ var _ = Describe("Import populator tests", func() {
Kind: cdiv1.VolumeImportSourceRef,
Name: samplePopulatorName,
}
nsName := "test-import"
namespacedDataSourceRef := &corev1.TypedObjectReference{
APIGroup: &apiGroup,
Kind: cdiv1.VolumeImportSourceRef,
Name: samplePopulatorName,
Namespace: &nsName,
}

var _ = Describe("Import populator reconcile", func() {
It("should trigger succeeded event when podPhase is succeeded during population", func() {
targetPvc := CreatePvcInStorageClass(targetPvcName, metav1.NamespaceDefault, &scName, nil, nil, corev1.ClaimPending)
targetPvc.Spec.DataSourceRef = dataSourceRef
volumeImportSource := getVolumeImportSource(true)
volumeImportSource := getVolumeImportSource(true, metav1.NamespaceDefault)
pvcPrime := getPVCPrime(targetPvc, nil)
pvcPrime.Annotations = map[string]string{AnnPodPhase: string(corev1.PodSucceeded)}

By("Reconcile")
reconciler = createImportPopulatorReconciler(targetPvc, pvcPrime, volumeImportSource, sc)
result, err := reconciler.Reconcile(context.TODO(), reconcile.Request{NamespacedName: types.NamespacedName{Name: targetPvcName, Namespace: metav1.NamespaceDefault}})
Expect(err).To(Not(HaveOccurred()))
Expect(result).To(Not(BeNil()))

By("Checking events recorded")
close(reconciler.recorder.(*record.FakeRecorder).Events)
found := false
for event := range reconciler.recorder.(*record.FakeRecorder).Events {
if strings.Contains(event, importSucceeded) {
found = true
}
}
reconciler.recorder = nil
Expect(found).To(BeTrue())
})

It("should succeed with VolumeImportSource from different namespace", func() {
targetPvc := CreatePvcInStorageClass(targetPvcName, metav1.NamespaceDefault, &scName, nil, nil, corev1.ClaimPending)
targetPvc.Spec.DataSourceRef = namespacedDataSourceRef
volumeImportSource := getVolumeImportSource(true, nsName)
pvcPrime := getPVCPrime(targetPvc, nil)
pvcPrime.Annotations = map[string]string{AnnPodPhase: string(corev1.PodSucceeded)}

Expand All @@ -150,7 +182,7 @@ var _ = Describe("Import populator tests", func() {
It("Should trigger failed import event when pod phase is podfailed", func() {
targetPvc := CreatePvcInStorageClass(targetPvcName, metav1.NamespaceDefault, &scName, nil, nil, corev1.ClaimPending)
targetPvc.Spec.DataSourceRef = dataSourceRef
volumeImportSource := getVolumeImportSource(true)
volumeImportSource := getVolumeImportSource(true, metav1.NamespaceDefault)
pvcPrime := getPVCPrime(targetPvc, nil)
pvcPrime.Annotations = map[string]string{AnnPodPhase: string(corev1.PodFailed)}

Expand All @@ -175,7 +207,7 @@ var _ = Describe("Import populator tests", func() {
It("Should retrigger reconcile while import pod is running", func() {
targetPvc := CreatePvcInStorageClass(targetPvcName, metav1.NamespaceDefault, &scName, nil, nil, corev1.ClaimPending)
targetPvc.Spec.DataSourceRef = dataSourceRef
volumeImportSource := getVolumeImportSource(true)
volumeImportSource := getVolumeImportSource(true, metav1.NamespaceDefault)
pvcPrime := getPVCPrime(targetPvc, nil)
pvcPrime.Annotations = map[string]string{AnnPodPhase: string(corev1.PodRunning)}

Expand All @@ -190,7 +222,7 @@ var _ = Describe("Import populator tests", func() {
It("Should create PVC Prime with proper import annotations", func() {
targetPvc := CreatePvcInStorageClass(targetPvcName, metav1.NamespaceDefault, &scName, nil, nil, corev1.ClaimBound)
targetPvc.Spec.DataSourceRef = dataSourceRef
volumeImportSource := getVolumeImportSource(true)
volumeImportSource := getVolumeImportSource(true, metav1.NamespaceDefault)

By("Reconcile")
reconciler = createImportPopulatorReconciler(targetPvc, volumeImportSource, sc)
Expand Down
21 changes: 13 additions & 8 deletions pkg/controller/populators/populator-base.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,10 @@ func CreateCommonPopulatorIndexes(mgr manager.Manager) error {
if err := mgr.GetFieldIndexer().IndexField(context.TODO(), &corev1.PersistentVolumeClaim{}, dataSourceRefField, func(obj client.Object) []string {
pvc := obj.(*corev1.PersistentVolumeClaim)
dataSourceRef := pvc.Spec.DataSourceRef
if dataSourceRef != nil && dataSourceRef.APIGroup != nil &&
*dataSourceRef.APIGroup == cc.AnnAPIGroup && dataSourceRef.Name != "" {
return []string{getPopulatorIndexKey(obj.GetNamespace(), pvc.Spec.DataSourceRef.Kind, pvc.Spec.DataSourceRef.Name)}
if isDataSourceRefValid(dataSourceRef) {
namespace := getPopulationSourceNamespace(pvc)
apiGroup := *dataSourceRef.APIGroup
return []string{getPopulatorIndexKey(apiGroup, dataSourceRef.Kind, namespace, dataSourceRef.Name)}
}
return nil
}); err != nil {
Expand Down Expand Up @@ -106,7 +107,9 @@ func addCommonPopulatorsWatches(mgr manager.Manager, c controller.Controller, lo

mapDataSourceRefToPVC := func(obj client.Object) (reqs []reconcile.Request) {
var pvcs corev1.PersistentVolumeClaimList
matchingFields := client.MatchingFields{dataSourceRefField: getPopulatorIndexKey(obj.GetNamespace(), sourceKind, obj.GetName())}
matchingFields := client.MatchingFields{
dataSourceRefField: getPopulatorIndexKey(cc.AnnAPIGroup, sourceKind, obj.GetNamespace(), obj.GetName()),
}
if err := mgr.GetClient().List(context.TODO(), &pvcs, matchingFields); err != nil {
log.Error(err, "Unable to list PVCs", "matchingFields", matchingFields)
return reqs
Expand Down Expand Up @@ -241,8 +244,8 @@ func (r *ReconcilerBase) createPVCPrime(pvc *corev1.PersistentVolumeClaim, sourc
}
util.SetRecommendedLabels(pvcPrime, r.installerLabels, "cdi-controller")

requestedSize, _ := pvc.Spec.Resources.Requests[corev1.ResourceStorage]
// disk or image size, inflate it with overhead
// disk or image size, inflate it with overhead if necessary
requestedSize := pvc.Spec.Resources.Requests[corev1.ResourceStorage]
requestedSize, err := cc.InflateSizeWithOverhead(r.client, requestedSize.Value(), &pvc.Spec)
if err != nil {
return nil, err
Expand Down Expand Up @@ -297,15 +300,17 @@ func (r *ReconcilerBase) reconcileCommon(pvc *corev1.PersistentVolumeClaim, popu

// We should ignore PVCs that aren't for this populator to handle
dataSourceRef := pvc.Spec.DataSourceRef
if dataSourceRef == nil || !IsPVCDataSourceRefKind(pvc, r.sourceKind) || dataSourceRef.Name == "" {
if !IsPVCDataSourceRefKind(pvc, r.sourceKind) {
log.V(1).Info("reconciled unexpected PVC, ignoring")
return nil, nil
}
// Wait until dataSourceRef exists
populationSource, err := populator.getPopulationSource(pvc.Namespace, dataSourceRef.Name)
namespace := getPopulationSourceNamespace(pvc)
populationSource, err := populator.getPopulationSource(namespace, dataSourceRef.Name)
if populationSource == nil {
return nil, err
}
// Check storage class
ready, waitForFirstConsumer, err := r.handleStorageClass(pvc)
if !ready || err != nil {
return nil, err
Expand Down
30 changes: 21 additions & 9 deletions pkg/controller/populators/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,28 @@ const (
annMigratedTo = "pv.kubernetes.io/migrated-to"
)

// IsPVCDataSourceRefKind returns if the PVC has DataSourceRef that
// IsPVCDataSourceRefKind returns if the PVC has a valid DataSourceRef that
// is equal to the given kind
func IsPVCDataSourceRefKind(pvc *corev1.PersistentVolumeClaim, kind string) bool {
dataSourceRef := pvc.Spec.DataSourceRef
return dataSourceRef != nil && dataSourceRef.APIGroup != nil && *dataSourceRef.APIGroup == cc.AnnAPIGroup && dataSourceRef.Kind == kind
return isDataSourceRefValid(dataSourceRef) && dataSourceRef.Kind == kind
}

func isDataSourceRefValid(dataSourceRef *corev1.TypedObjectReference) bool {
return dataSourceRef != nil && dataSourceRef.APIGroup != nil &&
*dataSourceRef.APIGroup == cc.AnnAPIGroup && dataSourceRef.Name != ""
}

func getPopulationSourceNamespace(pvc *corev1.PersistentVolumeClaim) string {
namespace := pvc.GetNamespace()
// The populator CR can be in a different namespace from the target PVC
// if the CrossNamespaceVolumeDataSource feature gate is enabled in the
// kube-apiserver and the kube-controller-manager.
dataSourceRef := pvc.Spec.DataSourceRef
if dataSourceRef != nil && dataSourceRef.Namespace != nil && *dataSourceRef.Namespace != "" {
namespace = *pvc.Spec.DataSourceRef.Namespace
}
return namespace
}

func isPVCPrimeDataSourceRefKind(pvc *corev1.PersistentVolumeClaim, kind string) bool {
Expand All @@ -67,13 +84,8 @@ func PVCPrimeName(targetPVC *corev1.PersistentVolumeClaim) string {
return fmt.Sprintf("%s-%s", primePvcPrefix, targetPVC.UID)
}

func getPopulatorIndexKey(namespace, kind, name string) string {
return namespace + "/" + kind + "/" + name
}

func isPVCOwnedByDataVolume(pvc *corev1.PersistentVolumeClaim) bool {
owner := metav1.GetControllerOf(pvc)
return (owner != nil && owner.Kind == "DataVolume") || cc.HasAnnOwnedByDataVolume(pvc)
func getPopulatorIndexKey(apiGroup, kind, namespace, name string) string {
return fmt.Sprintf("%s/%s/%s/%s", apiGroup, kind, namespace, name)
}

func checkIntreeStorageClass(pvc *corev1.PersistentVolumeClaim, sc *storagev1.StorageClass) bool {
Expand Down
73 changes: 67 additions & 6 deletions tests/import_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1425,6 +1425,19 @@ var _ = Describe("Import populator", func() {
return pvcDef
}

// namespacedImportPopulationPVCDefinition creates a PVC with namespaced import datasourceref
namespacedImportPopulationPVCDefinition := func(namespace string) *v1.PersistentVolumeClaim {
pvcDef := utils.NewPVCDefinition("import-populator-pvc-test", "1Gi", nil, nil)
apiGroup := controller.AnnAPIGroup
pvcDef.Spec.DataSourceRef = &v1.TypedObjectReference{
APIGroup: &apiGroup,
Kind: cdiv1.VolumeImportSourceRef,
Name: "import-populator-test",
Namespace: &namespace,
}
return pvcDef
}

// importPopulatorCR creates an import source CR
importPopulatorCR := func(namespace string, contentType cdiv1.DataVolumeContentType, preallocation bool) *cdiv1.VolumeImportSource {
return &cdiv1.VolumeImportSource{
Expand Down Expand Up @@ -1543,6 +1556,17 @@ var _ = Describe("Import populator", func() {
return err
}

createNamespacedImportPopulator := func(namespace string) error {
By("Creating namespaced Import Populator CR with HTTP source")
importPopulatorCR := importPopulatorCR(namespace, cdiv1.DataVolumeKubeVirt, true)
importPopulatorCR.Spec.HTTP = &cdiv1.DataVolumeSourceHTTP{
URL: tinyCoreIsoURL(),
}
_, err := f.CdiClient.CdiV1beta1().VolumeImportSources(namespace).Create(
context.TODO(), importPopulatorCR, metav1.CreateOptions{})
return err
}

verifyCleanup := func(pvc *v1.PersistentVolumeClaim) {
if pvc != nil {
Eventually(func() bool {
Expand All @@ -1555,9 +1579,6 @@ var _ = Describe("Import populator", func() {

BeforeEach(func() {
verifyCleanup(pvc)
if !f.IsCSIVolumeCloneStorageClassAvailable() {
Skip("No CSI drivers available - Population not supported")
}
})

AfterEach(func() {
Expand All @@ -1577,7 +1598,6 @@ var _ = Describe("Import populator", func() {

DescribeTable("should import fileSystem PVC", func(expectedMD5 string, volumeImportSourceFunc func(cdiv1.DataVolumeContentType, bool) error, preallocation bool) {
pvc = importPopulationPVCDefinition()
pvc.Spec.StorageClassName = &f.CsiCloneSCName
pvc = f.CreateScheduledPVCFromDefinition(pvc)
err = volumeImportSourceFunc(cdiv1.DataVolumeKubeVirt, preallocation)
Expect(err).ToNot(HaveOccurred())
Expand Down Expand Up @@ -1647,7 +1667,6 @@ var _ = Describe("Import populator", func() {
pvc = importPopulationPVCDefinition()
volumeMode := v1.PersistentVolumeBlock
pvc.Spec.VolumeMode = &volumeMode
pvc.Spec.StorageClassName = &f.CsiCloneSCName
pvc = f.CreateScheduledPVCFromDefinition(pvc)
err = volumeImportSourceFunc(cdiv1.DataVolumeKubeVirt, true)
Expect(err).ToNot(HaveOccurred())
Expand Down Expand Up @@ -1689,7 +1708,6 @@ var _ = Describe("Import populator", func() {

It("should import archive", func() {
pvc = importPopulationPVCDefinition()
pvc.Spec.StorageClassName = &f.CsiCloneSCName
pvc = f.CreateScheduledPVCFromDefinition(pvc)
err = createHTTPImportPopulatorCR(cdiv1.DataVolumeArchive, true)
Expect(err).ToNot(HaveOccurred())
Expand Down Expand Up @@ -1720,6 +1738,49 @@ var _ = Describe("Import populator", func() {
return err != nil && k8serrors.IsNotFound(err)
}, timeout, pollingInterval).Should(BeTrue())
})

// TODO: Only k8s from v1.26 will allow PVCs with namespaced DataSourceRef.
// We need to find a way to check if CrossNamespaceVolumeDataSource feature gate is enabled
// More information in: https://kubernetes.io/blog/2023/01/02/cross-namespace-data-sources-alpha/#trying-it-out
XIt("should import with VolumeImportSource from different namespace", func() {
// We create the CR namespace
crNs, err := f.CreateNamespace(f.NsPrefix, map[string]string{
framework.NsPrefixLabel: f.NsPrefix,
})
Expect(err).NotTo(HaveOccurred())
f.AddNamespaceToDelete(crNs)

pvc = namespacedImportPopulationPVCDefinition(crNs.Name)
pvc = f.CreateScheduledPVCFromDefinition(pvc)
err = createNamespacedImportPopulator(crNs.Name)
Expect(err).ToNot(HaveOccurred())

By("Verify PVC prime was created")
pvcPrime, err = utils.WaitForPVC(f.K8sClient, pvc.Namespace, populators.PVCPrimeName(pvc))
Expect(err).ToNot(HaveOccurred())

By("Verify target PVC is bound")
err = utils.WaitForPersistentVolumeClaimPhase(f.K8sClient, pvc.Namespace, v1.ClaimBound, pvc.Name)
Expect(err).ToNot(HaveOccurred())

By("Verify content")
same, err := f.VerifyTargetPVCArchiveContent(f.Namespace, pvc, "3")
Expect(err).ToNot(HaveOccurred())
Expect(same).To(BeTrue())

By("Verify 100.0% annotation")
progress, ok, err := utils.WaitForPVCAnnotation(f.K8sClient, f.Namespace.Name, pvc, controller.AnnImportProgressReporting)
Expect(err).ToNot(HaveOccurred())
Expect(ok).To(BeTrue())
Expect(progress).Should(BeEquivalentTo("100.0%"))

By("Wait for PVC prime to be deleted")
Eventually(func() bool {
// Make sure pvcPrime was deleted after upload population
_, err := f.FindPVC(pvcPrime.Name)
return err != nil && k8serrors.IsNotFound(err)
}, timeout, pollingInterval).Should(BeTrue())
})
})

func generateRegistryOnlySidecar() *unstructured.Unstructured {
Expand Down

0 comments on commit b17f57a

Please sign in to comment.