Skip to content

Commit

Permalink
Minor fixes and enhancements in import/common populator code
Browse files Browse the repository at this point in the history
* Modify indexes and other related code to support namespaced dataSourceRefs. Cross-namespace population is still not supported as it depends on alpha feature gates.
* Add functional test to cover static binding.
* Fix selected node annotation bug in scratch space PVCs
* Fix linter alerts

Signed-off-by: Alvaro Romero <alromero@redhat.com>
  • Loading branch information
alromeros committed Apr 26, 2023
1 parent 51b3379 commit 6cce0ac
Show file tree
Hide file tree
Showing 8 changed files with 212 additions and 49 deletions.
6 changes: 3 additions & 3 deletions pkg/controller/common/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (
"crypto/rsa"
"crypto/tls"
"fmt"
"io/ioutil"
"io"
"math"
"net/http"
"regexp"
Expand Down Expand Up @@ -939,7 +939,7 @@ func SetRestrictedSecurityContext(podSpec *v1.PodSpec) {
// SetNodeNameIfPopulator sets NodeName in a pod spec when the PVC is being handled by a CDI volume populator
func SetNodeNameIfPopulator(pvc *corev1.PersistentVolumeClaim, podSpec *v1.PodSpec) {
_, isPopulator := pvc.Annotations[AnnPopulatorKind]
nodeName, _ := pvc.Annotations[AnnSelectedNode]
nodeName := pvc.Annotations[AnnSelectedNode]
if isPopulator && nodeName != "" {
podSpec.NodeName = nodeName
}
Expand Down Expand Up @@ -1378,7 +1378,7 @@ func GetProgressReportFromURL(url string, regExp *regexp.Regexp, httpClient *htt
return "", err
}
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
body, err := io.ReadAll(resp.Body)
if err != nil {
return "", err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/populators/import-populator.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ func (r *ImportPopulatorReconciler) getPopulationSource(namespace, name string)

// Import-specific implementation of reconcileTargetPVC
func (r *ImportPopulatorReconciler) reconcileTargetPVC(pvc, pvcPrime *corev1.PersistentVolumeClaim) (reconcile.Result, error) {
phase, _ := pvcPrime.Annotations[cc.AnnPodPhase]
phase := pvcPrime.Annotations[cc.AnnPodPhase]
if err := r.updateImportProgress(phase, pvc, pvcPrime); err != nil {
return reconcile.Result{}, err
}
Expand Down
62 changes: 51 additions & 11 deletions pkg/controller/populators/import-populator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,11 +73,11 @@ var _ = Describe("Import populator tests", func() {
AnnDefaultStorageClass: "true",
}, map[string]string{}, "csi-plugin")

getVolumeImportSource := func(preallocation bool) *cdiv1.VolumeImportSource {
getVolumeImportSource := func(preallocation bool, namespace string) *cdiv1.VolumeImportSource {
return &cdiv1.VolumeImportSource{
ObjectMeta: metav1.ObjectMeta{
Name: samplePopulatorName,
Namespace: metav1.NamespaceDefault,
Namespace: namespace,
},
Spec: cdiv1.VolumeImportSourceSpec{
ContentType: cdiv1.DataVolumeKubeVirt,
Expand Down Expand Up @@ -122,12 +122,19 @@ var _ = Describe("Import populator tests", func() {
Kind: cdiv1.VolumeImportSourceRef,
Name: samplePopulatorName,
}
nsName := "test-import"
namespacedDataSourceRef := &corev1.TypedObjectReference{
APIGroup: &apiGroup,
Kind: cdiv1.VolumeImportSourceRef,
Name: samplePopulatorName,
Namespace: &nsName,
}

var _ = Describe("Import populator reconcile", func() {
It("should trigger succeeded event when podPhase is succeeded during population", func() {
targetPvc := CreatePvcInStorageClass(targetPvcName, metav1.NamespaceDefault, &scName, nil, nil, corev1.ClaimPending)
targetPvc.Spec.DataSourceRef = dataSourceRef
volumeImportSource := getVolumeImportSource(true)
volumeImportSource := getVolumeImportSource(true, metav1.NamespaceDefault)
pvcPrime := getPVCPrime(targetPvc, nil)
pvcPrime.Annotations = map[string]string{AnnPodPhase: string(corev1.PodSucceeded)}

Expand All @@ -149,10 +156,35 @@ var _ = Describe("Import populator tests", func() {
Expect(found).To(BeTrue())
})

It("should ignore namespaced dataSourceRefs", func() {
targetPvc := CreatePvcInStorageClass(targetPvcName, metav1.NamespaceDefault, &scName, nil, nil, corev1.ClaimPending)
targetPvc.Spec.DataSourceRef = namespacedDataSourceRef
volumeImportSource := getVolumeImportSource(true, nsName)
pvcPrime := getPVCPrime(targetPvc, nil)
pvcPrime.Annotations = map[string]string{AnnPodPhase: string(corev1.PodSucceeded)}

By("Reconcile")
reconciler = createImportPopulatorReconciler(targetPvc, pvcPrime, volumeImportSource, sc)
result, err := reconciler.Reconcile(context.TODO(), reconcile.Request{NamespacedName: types.NamespacedName{Name: targetPvcName, Namespace: metav1.NamespaceDefault}})
Expect(err).To(Not(HaveOccurred()))
Expect(result).To(Not(BeNil()))

By("Checking PVC was ignored")
close(reconciler.recorder.(*record.FakeRecorder).Events)
found := false
for event := range reconciler.recorder.(*record.FakeRecorder).Events {
if strings.Contains(event, importSucceeded) {
found = true
}
}
reconciler.recorder = nil
Expect(found).To(BeFalse())
})

It("Should trigger failed import event when pod phase is podfailed", func() {
targetPvc := CreatePvcInStorageClass(targetPvcName, metav1.NamespaceDefault, &scName, nil, nil, corev1.ClaimPending)
targetPvc.Spec.DataSourceRef = dataSourceRef
volumeImportSource := getVolumeImportSource(true)
volumeImportSource := getVolumeImportSource(true, metav1.NamespaceDefault)
pvcPrime := getPVCPrime(targetPvc, nil)
pvcPrime.Annotations = map[string]string{AnnPodPhase: string(corev1.PodFailed)}

Expand All @@ -177,7 +209,7 @@ var _ = Describe("Import populator tests", func() {
It("Should retrigger reconcile while import pod is running", func() {
targetPvc := CreatePvcInStorageClass(targetPvcName, metav1.NamespaceDefault, &scName, nil, nil, corev1.ClaimPending)
targetPvc.Spec.DataSourceRef = dataSourceRef
volumeImportSource := getVolumeImportSource(true)
volumeImportSource := getVolumeImportSource(true, metav1.NamespaceDefault)
pvcPrime := getPVCPrime(targetPvc, nil)
pvcPrime.Annotations = map[string]string{AnnPodPhase: string(corev1.PodRunning)}

Expand All @@ -192,7 +224,7 @@ var _ = Describe("Import populator tests", func() {
It("Should create PVC Prime with proper import annotations", func() {
targetPvc := CreatePvcInStorageClass(targetPvcName, metav1.NamespaceDefault, &scName, nil, nil, corev1.ClaimBound)
targetPvc.Spec.DataSourceRef = dataSourceRef
volumeImportSource := getVolumeImportSource(true)
volumeImportSource := getVolumeImportSource(true, metav1.NamespaceDefault)

By("Reconcile")
reconciler = createImportPopulatorReconciler(targetPvc, volumeImportSource, sc)
Expand Down Expand Up @@ -322,7 +354,7 @@ var _ = Describe("Import populator tests", func() {
pvcPrime := getPVCPrime(targetPvc, nil)

ts := httptest.NewTLSServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Write([]byte(fmt.Sprintf("import_progress{ownerUID=\"%v\"} 13.45", targetPvc.GetUID())))
_, _ = w.Write([]byte(fmt.Sprintf("import_progress{ownerUID=\"%v\"} 13.45", targetPvc.GetUID())))
w.WriteHeader(200)
}))
defer ts.Close()
Expand Down Expand Up @@ -362,14 +394,22 @@ func createImportPopulatorReconcilerWithoutConfig(objects ...runtime.Object) *Im

// Register operator types with the runtime scheme.
s := scheme.Scheme
cdiv1.AddToScheme(s)
snapshotv1.AddToScheme(s)
extv1.AddToScheme(s)
_ = cdiv1.AddToScheme(s)
_ = snapshotv1.AddToScheme(s)
_ = extv1.AddToScheme(s)

objs = append(objs, MakeEmptyCDICR())

// Create a fake client to mock API calls.
cl := fake.NewFakeClientWithScheme(s, objs...)
builder := fake.NewClientBuilder().
WithScheme(s).
WithRuntimeObjects(objs...)

for _, ia := range getIndexArgs() {
builder = builder.WithIndex(ia.obj, ia.field, ia.extractValue)
}

cl := builder.Build()

rec := record.NewFakeRecorder(10)

Expand Down
57 changes: 43 additions & 14 deletions pkg/controller/populators/populator-base.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,18 +68,37 @@ type populatorController interface {
reconcileTargetPVC(pvc, pvcPrime *corev1.PersistentVolumeClaim) (reconcile.Result, error)
}

type indexArgs struct {
obj client.Object
field string
extractValue client.IndexerFunc
}

func getIndexArgs() []indexArgs {
return []indexArgs{
{
obj: &corev1.PersistentVolumeClaim{},
field: dataSourceRefField,
extractValue: func(obj client.Object) []string {
pvc := obj.(*corev1.PersistentVolumeClaim)
dataSourceRef := pvc.Spec.DataSourceRef
if isDataSourceRefValid(dataSourceRef) {
namespace := getPopulationSourceNamespace(pvc)
apiGroup := *dataSourceRef.APIGroup
return []string{getPopulatorIndexKey(apiGroup, dataSourceRef.Kind, namespace, dataSourceRef.Name)}
}
return nil
},
},
}
}

// CreateCommonPopulatorIndexes creates indexes used by all populators
func CreateCommonPopulatorIndexes(mgr manager.Manager) error {
if err := mgr.GetFieldIndexer().IndexField(context.TODO(), &corev1.PersistentVolumeClaim{}, dataSourceRefField, func(obj client.Object) []string {
pvc := obj.(*corev1.PersistentVolumeClaim)
dataSourceRef := pvc.Spec.DataSourceRef
if dataSourceRef != nil && dataSourceRef.APIGroup != nil &&
*dataSourceRef.APIGroup == cc.AnnAPIGroup && dataSourceRef.Name != "" {
return []string{getPopulatorIndexKey(obj.GetNamespace(), pvc.Spec.DataSourceRef.Kind, pvc.Spec.DataSourceRef.Name)}
for _, ia := range getIndexArgs() {
if err := mgr.GetFieldIndexer().IndexField(context.TODO(), ia.obj, ia.field, ia.extractValue); err != nil {
return err
}
return nil
}); err != nil {
return err
}
return nil
}
Expand All @@ -106,7 +125,9 @@ func addCommonPopulatorsWatches(mgr manager.Manager, c controller.Controller, lo

mapDataSourceRefToPVC := func(obj client.Object) (reqs []reconcile.Request) {
var pvcs corev1.PersistentVolumeClaimList
matchingFields := client.MatchingFields{dataSourceRefField: getPopulatorIndexKey(obj.GetNamespace(), sourceKind, obj.GetName())}
matchingFields := client.MatchingFields{
dataSourceRefField: getPopulatorIndexKey(cc.AnnAPIGroup, sourceKind, obj.GetNamespace(), obj.GetName()),
}
if err := mgr.GetClient().List(context.TODO(), &pvcs, matchingFields); err != nil {
log.Error(err, "Unable to list PVCs", "matchingFields", matchingFields)
return reqs
Expand Down Expand Up @@ -241,8 +262,8 @@ func (r *ReconcilerBase) createPVCPrime(pvc *corev1.PersistentVolumeClaim, sourc
}
util.SetRecommendedLabels(pvcPrime, r.installerLabels, "cdi-controller")

requestedSize, _ := pvc.Spec.Resources.Requests[corev1.ResourceStorage]
// disk or image size, inflate it with overhead
// disk or image size, inflate it with overhead if necessary
requestedSize := pvc.Spec.Resources.Requests[corev1.ResourceStorage]
requestedSize, err := cc.InflateSizeWithOverhead(r.client, requestedSize.Value(), &pvc.Spec)
if err != nil {
return nil, err
Expand Down Expand Up @@ -297,15 +318,23 @@ func (r *ReconcilerBase) reconcileCommon(pvc *corev1.PersistentVolumeClaim, popu

// We should ignore PVCs that aren't for this populator to handle
dataSourceRef := pvc.Spec.DataSourceRef
if dataSourceRef == nil || !IsPVCDataSourceRefKind(pvc, r.sourceKind) || dataSourceRef.Name == "" {
if !IsPVCDataSourceRefKind(pvc, r.sourceKind) {
log.V(1).Info("reconciled unexpected PVC, ignoring")
return nil, nil
}
// TODO: Remove this check once we support cross-namespace dataSourceRef
if dataSourceRef.Namespace != nil {
log.V(1).Info("cross-namespace dataSourceRef not supported yet, ignoring")
return nil, nil
}

// Wait until dataSourceRef exists
populationSource, err := populator.getPopulationSource(pvc.Namespace, dataSourceRef.Name)
namespace := getPopulationSourceNamespace(pvc)
populationSource, err := populator.getPopulationSource(namespace, dataSourceRef.Name)
if populationSource == nil {
return nil, err
}
// Check storage class
ready, waitForFirstConsumer, err := r.handleStorageClass(pvc)
if !ready || err != nil {
return nil, err
Expand Down
32 changes: 22 additions & 10 deletions pkg/controller/populators/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,19 +46,36 @@ const (
annMigratedTo = "pv.kubernetes.io/migrated-to"
)

// IsPVCDataSourceRefKind returns if the PVC has DataSourceRef that
// IsPVCDataSourceRefKind returns if the PVC has a valid DataSourceRef that
// is equal to the given kind
func IsPVCDataSourceRefKind(pvc *corev1.PersistentVolumeClaim, kind string) bool {
dataSourceRef := pvc.Spec.DataSourceRef
return dataSourceRef != nil && dataSourceRef.APIGroup != nil && *dataSourceRef.APIGroup == cc.AnnAPIGroup && dataSourceRef.Kind == kind
return isDataSourceRefValid(dataSourceRef) && dataSourceRef.Kind == kind
}

func isDataSourceRefValid(dataSourceRef *corev1.TypedObjectReference) bool {
return dataSourceRef != nil && dataSourceRef.APIGroup != nil &&
*dataSourceRef.APIGroup == cc.AnnAPIGroup && dataSourceRef.Name != ""
}

func getPopulationSourceNamespace(pvc *corev1.PersistentVolumeClaim) string {
namespace := pvc.GetNamespace()
// The populator CR can be in a different namespace from the target PVC
// if the CrossNamespaceVolumeDataSource feature gate is enabled in the
// kube-apiserver and the kube-controller-manager.
dataSourceRef := pvc.Spec.DataSourceRef
if dataSourceRef != nil && dataSourceRef.Namespace != nil && *dataSourceRef.Namespace != "" {
namespace = *pvc.Spec.DataSourceRef.Namespace
}
return namespace
}

func isPVCPrimeDataSourceRefKind(pvc *corev1.PersistentVolumeClaim, kind string) bool {
owner := metav1.GetControllerOf(pvc)
if owner == nil || owner.Kind != "PersistentVolumeClaim" {
return false
}
populatorKind, _ := pvc.Annotations[cc.AnnPopulatorKind]
populatorKind := pvc.Annotations[cc.AnnPopulatorKind]
return populatorKind == kind
}

Expand All @@ -67,13 +84,8 @@ func PVCPrimeName(targetPVC *corev1.PersistentVolumeClaim) string {
return fmt.Sprintf("%s-%s", primePvcPrefix, targetPVC.UID)
}

func getPopulatorIndexKey(namespace, kind, name string) string {
return namespace + "/" + kind + "/" + name
}

func isPVCOwnedByDataVolume(pvc *corev1.PersistentVolumeClaim) bool {
owner := metav1.GetControllerOf(pvc)
return (owner != nil && owner.Kind == "DataVolume") || cc.HasAnnOwnedByDataVolume(pvc)
func getPopulatorIndexKey(apiGroup, kind, namespace, name string) string {
return fmt.Sprintf("%s/%s/%s/%s", apiGroup, kind, namespace, name)
}

func checkIntreeStorageClass(pvc *corev1.PersistentVolumeClaim, sc *storagev1.StorageClass) bool {
Expand Down
5 changes: 3 additions & 2 deletions pkg/controller/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,8 +137,9 @@ func newScratchPersistentVolumeClaimSpec(pvc *v1.PersistentVolumeClaim, pod *v1.
}
// When the original PVC is being handled by a populator, copy AnnSelectedNode to avoid issues with k8s scheduler
_, isPopulator := pvc.Annotations[cc.AnnPopulatorKind]
if isPopulator {
annotations[cc.AnnSelectedNode] = pvc.Annotations[cc.AnnSelectedNode]
selectedNode := pvc.Annotations[cc.AnnSelectedNode]
if isPopulator && selectedNode != "" {
annotations[cc.AnnSelectedNode] = selectedNode
}

pvcDef := &v1.PersistentVolumeClaim{
Expand Down
3 changes: 2 additions & 1 deletion tests/framework/pvc.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,8 @@ func createConsumerPodForPopulationPVC(targetPvc *k8sv1.PersistentVolumeClaim, f
gomega.Expect(status).To(gomega.BeTrue())
gomega.Expect(selectedNode).ToNot(gomega.BeEmpty())

utils.DeletePodNoGrace(f.K8sClient, executorPod, namespace)
err = utils.DeletePodNoGrace(f.K8sClient, executorPod, namespace)
gomega.Expect(err).ToNot(gomega.HaveOccurred())
}

// VerifyPVCIsEmpty verifies a passed in PVC is empty, returns true if the PVC is empty, false if it is not. Optionaly, specify node for the pod.
Expand Down
Loading

0 comments on commit 6cce0ac

Please sign in to comment.