Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve DataVolume status reporting with populators #2928

Merged
merged 1 commit into from
Oct 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 23 additions & 6 deletions pkg/controller/datavolume/import-controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,13 +240,30 @@ func isPVCImportPopulation(pvc *corev1.PersistentVolumeClaim) bool {
return populators.IsPVCDataSourceRefKind(pvc, cdiv1.VolumeImportSourceRef)
}

func (r *ImportReconciler) shouldUpdateStatusPhase(pvc *corev1.PersistentVolumeClaim, dv *cdiv1.DataVolume) (bool, error) {
pvcCopy := pvc.DeepCopy()
if isPVCImportPopulation(pvcCopy) {
// Better to play it safe and check the PVC Prime too
// before updating DV phase.
nn := types.NamespacedName{Namespace: pvcCopy.Namespace, Name: populators.PVCPrimeName(pvcCopy)}
err := r.client.Get(context.TODO(), nn, pvcCopy)
if err != nil {
if k8serrors.IsNotFound(err) {
return false, nil
}
return false, err
}
}
_, ok := pvcCopy.Annotations[cc.AnnImportPod]
return ok && pvcCopy.Status.Phase == corev1.ClaimBound && !pvcIsPopulated(pvcCopy, dv), nil
}

func (r *ImportReconciler) updateStatusPhase(pvc *corev1.PersistentVolumeClaim, dataVolumeCopy *cdiv1.DataVolume, event *Event) error {
phase, ok := pvc.Annotations[cc.AnnPodPhase]
importPopulation := isPVCImportPopulation(pvc)
if phase != string(corev1.PodSucceeded) && !importPopulation {
_, ok := pvc.Annotations[cc.AnnImportPod]
if !ok || pvc.Status.Phase != corev1.ClaimBound || pvcIsPopulated(pvc, dataVolumeCopy) {
return nil
if phase != string(corev1.PodSucceeded) {
update, err := r.shouldUpdateStatusPhase(pvc, dataVolumeCopy)
if !update || err != nil {
return err
}
}
dataVolumeCopy.Status.Phase = cdiv1.ImportScheduled
Expand Down Expand Up @@ -274,7 +291,7 @@ func (r *ImportReconciler) updateStatusPhase(pvc *corev1.PersistentVolumeClaim,
case string(corev1.PodSucceeded):
if cc.IsMultiStageImportInProgress(pvc) {
// Multi-stage annotations will be updated by import-populator if populators are in use
if !importPopulation {
if !isPVCImportPopulation(pvc) {
if err := cc.UpdatesMultistageImportSucceeded(pvc, r.getCheckpointArgs(dataVolumeCopy)); err != nil {
return err
}
Expand Down
72 changes: 71 additions & 1 deletion pkg/controller/datavolume/import-controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ import (
cdiv1 "kubevirt.io/containerized-data-importer-api/pkg/apis/core/v1beta1"
"kubevirt.io/containerized-data-importer/pkg/common"
. "kubevirt.io/containerized-data-importer/pkg/controller/common"
"kubevirt.io/containerized-data-importer/pkg/controller/populators"
featuregates "kubevirt.io/containerized-data-importer/pkg/feature-gates"
)

Expand Down Expand Up @@ -1209,6 +1210,17 @@ var _ = Describe("All DataVolume Tests", func() {
AddAnnotation(pvc, AnnSelectedNode, "node01")
err = reconciler.client.Update(context.TODO(), pvc)
Expect(err).ToNot(HaveOccurred())

// Creating a valid PVC Prime
pvcPrime := &corev1.PersistentVolumeClaim{}
pvcPrime.Name = populators.PVCPrimeName(pvc)
pvcPrime.Namespace = metav1.NamespaceDefault
pvcPrime.Status.Phase = corev1.ClaimBound
pvcPrime.SetAnnotations(make(map[string]string))
pvcPrime.GetAnnotations()[AnnImportPod] = "something"
err = reconciler.client.Create(context.TODO(), pvcPrime)
Expect(err).ToNot(HaveOccurred())

_, err = reconciler.updateStatus(getReconcileRequest(importDataVolume), nil, reconciler)
Expect(err).ToNot(HaveOccurred())
dv := &cdiv1.DataVolume{}
Expand All @@ -1231,6 +1243,44 @@ var _ = Describe("All DataVolume Tests", func() {
Expect(found).To(BeTrue())
})

It("Should not update DV phase when PVC Prime is unbound", func() {
scName := "testSC"
sc := CreateStorageClassWithProvisioner(scName, map[string]string{AnnDefaultStorageClass: "true"}, map[string]string{}, "csi-plugin")
csiDriver := &storagev1.CSIDriver{
ObjectMeta: metav1.ObjectMeta{
Name: "csi-plugin",
},
}
importDataVolume := NewImportDataVolume("test-dv")
importDataVolume.Spec.PVC.StorageClassName = &scName

reconciler = createImportReconciler(sc, csiDriver, importDataVolume)
_, err := reconciler.Reconcile(context.TODO(), reconcile.Request{NamespacedName: types.NamespacedName{Name: "test-dv", Namespace: metav1.NamespaceDefault}})
Expect(err).ToNot(HaveOccurred())

dv := &cdiv1.DataVolume{}
err = reconciler.client.Get(context.TODO(), types.NamespacedName{Name: "test-dv", Namespace: metav1.NamespaceDefault}, dv)
Expect(err).ToNot(HaveOccurred())
// Get original DV phase
dvPhase := dv.Status.Phase

// Create PVC Prime
pvc := &corev1.PersistentVolumeClaim{}
err = reconciler.client.Get(context.TODO(), types.NamespacedName{Name: "test-dv", Namespace: metav1.NamespaceDefault}, pvc)
Expect(err).ToNot(HaveOccurred())
pvcPrime := &corev1.PersistentVolumeClaim{}
pvcPrime.Name = populators.PVCPrimeName(pvc)
pvcPrime.Status.Phase = corev1.ClaimPending
err = reconciler.client.Create(context.TODO(), pvcPrime)
Expect(err).ToNot(HaveOccurred())

_, err = reconciler.updateStatus(getReconcileRequest(importDataVolume), nil, reconciler)
Expect(err).ToNot(HaveOccurred())
err = reconciler.client.Get(context.TODO(), types.NamespacedName{Name: "test-dv", Namespace: metav1.NamespaceDefault}, dv)
Expect(err).ToNot(HaveOccurred())
Expect(dv.Status.Phase).To(Equal(dvPhase))
})

It("Should switch to succeeded if PVC phase is pending, but pod phase is succeeded", func() {
reconciler = createImportReconciler(NewImportDataVolume("test-dv"))
_, err := reconciler.Reconcile(context.TODO(), reconcile.Request{NamespacedName: types.NamespacedName{Name: "test-dv", Namespace: metav1.NamespaceDefault}})
Expand Down Expand Up @@ -1325,12 +1375,32 @@ var _ = Describe("All DataVolume Tests", func() {
})

DescribeTable("DV phase", func(testDv runtime.Object, current, expected cdiv1.DataVolumePhase, pvcPhase corev1.PersistentVolumeClaimPhase, podPhase corev1.PodPhase, ann, expectedEvent string, extraAnnotations ...string) {
// First we test the non-populator flow
scName := "testpvc"
sc := CreateStorageClassWithProvisioner(scName, map[string]string{AnnDefaultStorageClass: "true"}, map[string]string{}, "csi-plugin")
storageProfile := createStorageProfile(scName, nil, BlockMode)

r := createImportReconciler(testDv, sc, storageProfile)
dvPhaseTest(r.ReconcilerBase, r, testDv, current, expected, pvcPhase, podPhase, ann, expectedEvent, extraAnnotations...)

// Test the populator flow, it should match
csiDriver := &storagev1.CSIDriver{
ObjectMeta: metav1.ObjectMeta{
Name: "csi-plugin",
},
}
// Creating a valid PVC Prime
pvcPrime := &corev1.PersistentVolumeClaim{}
pvcPrime.Name = "prime-"
pvcPrime.Namespace = metav1.NamespaceDefault
pvcPrime.Status.Phase = corev1.ClaimBound
pvcPrime.SetAnnotations(make(map[string]string))
pvcPrime.GetAnnotations()[ann] = "something"
pvcPrime.GetAnnotations()[AnnPodPhase] = string(podPhase)
for i := 0; i < len(extraAnnotations); i += 2 {
pvcPrime.GetAnnotations()[extraAnnotations[i]] = extraAnnotations[i+1]
}
r = createImportReconciler(testDv, sc, storageProfile, pvcPrime, csiDriver)
dvPhaseTest(r.ReconcilerBase, r, testDv, current, expected, pvcPhase, podPhase, ann, expectedEvent, extraAnnotations...)
},
Entry("should switch to bound for import", NewImportDataVolume("test-dv"), cdiv1.Pending, cdiv1.PVCBound, corev1.ClaimBound, corev1.PodPending, "invalid", "PVC test-dv Bound", AnnPriorityClassName, "p0"),
Entry("should switch to bound for import", NewImportDataVolume("test-dv"), cdiv1.Unknown, cdiv1.PVCBound, corev1.ClaimBound, corev1.PodPending, "invalid", "PVC test-dv Bound", AnnPriorityClassName, "p0"),
Expand Down
28 changes: 23 additions & 5 deletions pkg/controller/datavolume/upload-controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,19 +196,37 @@ func isPVCUploadPopulation(pvc *corev1.PersistentVolumeClaim) bool {
return populators.IsPVCDataSourceRefKind(pvc, cdiv1.VolumeUploadSourceRef)
}

func (r *UploadReconciler) shouldUpdateStatusPhase(pvc *corev1.PersistentVolumeClaim, dv *cdiv1.DataVolume) (bool, error) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this function duplicated in import controller?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's common code but one function uses isPVCImportPopulation and AnnImportPod and the other the upload equivalent. I can try to have one shared function but I think this practice is somewhat common in our controllers.

pvcCopy := pvc.DeepCopy()
if isPVCUploadPopulation(pvcCopy) {
// Better to play it safe and check the PVC Prime too
// before updating DV phase.
nn := types.NamespacedName{Namespace: pvcCopy.Namespace, Name: populators.PVCPrimeName(pvcCopy)}
err := r.client.Get(context.TODO(), nn, pvcCopy)
if err != nil {
if k8serrors.IsNotFound(err) {
return false, nil
}
return false, err
}
}
_, ok := pvcCopy.Annotations[cc.AnnUploadRequest]
return ok && pvcCopy.Status.Phase == corev1.ClaimBound && !pvcIsPopulated(pvcCopy, dv), nil
}

func (r *UploadReconciler) updateStatusPhase(pvc *corev1.PersistentVolumeClaim, dataVolumeCopy *cdiv1.DataVolume, event *Event) error {
phase, ok := pvc.Annotations[cc.AnnPodPhase]
uploadPopulation := isPVCUploadPopulation(pvc)
if phase != string(corev1.PodSucceeded) && !uploadPopulation {
_, ok = pvc.Annotations[cc.AnnUploadRequest]
if !ok || pvc.Status.Phase != corev1.ClaimBound || pvcIsPopulated(pvc, dataVolumeCopy) {
return nil
if phase != string(corev1.PodSucceeded) {
update, err := r.shouldUpdateStatusPhase(pvc, dataVolumeCopy)
if !update || err != nil {
return err
}
}
dataVolumeCopy.Status.Phase = cdiv1.UploadScheduled
if !ok {
return nil
}

switch phase {
case string(corev1.PodPending):
// TODO: Use a more generic Scheduled, like maybe TransferScheduled.
Expand Down
72 changes: 71 additions & 1 deletion pkg/controller/datavolume/upload-controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
cdiv1 "kubevirt.io/containerized-data-importer-api/pkg/apis/core/v1beta1"
"kubevirt.io/containerized-data-importer/pkg/common"
. "kubevirt.io/containerized-data-importer/pkg/controller/common"
"kubevirt.io/containerized-data-importer/pkg/controller/populators"
featuregates "kubevirt.io/containerized-data-importer/pkg/feature-gates"
"sigs.k8s.io/controller-runtime/pkg/client/fake"
logf "sigs.k8s.io/controller-runtime/pkg/log"
Expand Down Expand Up @@ -205,12 +206,32 @@ var _ = Describe("All DataVolume Tests", func() {

var _ = Describe("Reconcile Datavolume status", func() {
DescribeTable("DV phase", func(testDv runtime.Object, current, expected cdiv1.DataVolumePhase, pvcPhase corev1.PersistentVolumeClaimPhase, podPhase corev1.PodPhase, ann, expectedEvent string, extraAnnotations ...string) {
// We first test the non-populator flow
scName := "testpvc"
sc := CreateStorageClassWithProvisioner(scName, map[string]string{AnnDefaultStorageClass: "true"}, map[string]string{}, "csi-plugin")
storageProfile := createStorageProfile(scName, nil, BlockMode)

r := createUploadReconciler(testDv, sc, storageProfile)
dvPhaseTest(r.ReconcilerBase, r, testDv, current, expected, pvcPhase, podPhase, ann, expectedEvent, extraAnnotations...)

// Test the populator flow, it should match
csiDriver := &storagev1.CSIDriver{
ObjectMeta: metav1.ObjectMeta{
Name: "csi-plugin",
},
}
// Creating a valid PVC Prime
pvcPrime := &corev1.PersistentVolumeClaim{}
pvcPrime.Name = "prime-"
pvcPrime.Namespace = metav1.NamespaceDefault
pvcPrime.Status.Phase = corev1.ClaimBound
pvcPrime.SetAnnotations(make(map[string]string))
pvcPrime.GetAnnotations()[ann] = "something"
pvcPrime.GetAnnotations()[AnnPodPhase] = string(podPhase)
for i := 0; i < len(extraAnnotations); i += 2 {
pvcPrime.GetAnnotations()[extraAnnotations[i]] = extraAnnotations[i+1]
}
r = createUploadReconciler(testDv, sc, storageProfile, pvcPrime, csiDriver)
dvPhaseTest(r.ReconcilerBase, r, testDv, current, expected, pvcPhase, podPhase, ann, expectedEvent, extraAnnotations...)
},
Entry("should switch to scheduled for upload", newUploadDataVolume("test-dv"), cdiv1.Pending, cdiv1.UploadScheduled, corev1.ClaimBound, corev1.PodPending, AnnUploadRequest, "Upload into test-dv scheduled", AnnPriorityClassName, "p0-upload"),
Entry("should switch to uploadready for upload", newUploadDataVolume("test-dv"), cdiv1.Pending, cdiv1.UploadReady, corev1.ClaimBound, corev1.PodRunning, AnnUploadRequest, "Upload into test-dv ready", AnnPodReady, "true", AnnPriorityClassName, "p0-upload"),
Expand Down Expand Up @@ -244,6 +265,17 @@ var _ = Describe("All DataVolume Tests", func() {
AddAnnotation(pvc, AnnSelectedNode, "node01")
err = reconciler.client.Update(context.TODO(), pvc)
Expect(err).ToNot(HaveOccurred())

// Create PVC Prime
pvcPrime := &corev1.PersistentVolumeClaim{}
pvcPrime.Name = populators.PVCPrimeName(pvc)
pvcPrime.Namespace = metav1.NamespaceDefault
pvcPrime.Status.Phase = corev1.ClaimBound
pvcPrime.SetAnnotations(make(map[string]string))
pvcPrime.GetAnnotations()[AnnUploadRequest] = "something"
err = reconciler.client.Create(context.TODO(), pvcPrime)
Expect(err).ToNot(HaveOccurred())

_, err = reconciler.updateStatus(getReconcileRequest(uploadDataVolume), nil, reconciler)
Expect(err).ToNot(HaveOccurred())
dv := &cdiv1.DataVolume{}
Expand All @@ -265,6 +297,44 @@ var _ = Describe("All DataVolume Tests", func() {
}
Expect(found).To(BeTrue())
})

It("Should not update DV phase when PVC Prime is unbound", func() {
scName := "testSC"
sc := CreateStorageClassWithProvisioner(scName, map[string]string{AnnDefaultStorageClass: "true"}, map[string]string{}, "csi-plugin")
csiDriver := &storagev1.CSIDriver{
ObjectMeta: metav1.ObjectMeta{
Name: "csi-plugin",
},
}
uploadDataVolume := newUploadDataVolume("test-dv")
uploadDataVolume.Spec.PVC.StorageClassName = &scName

reconciler = createUploadReconciler(sc, csiDriver, uploadDataVolume)
_, err := reconciler.Reconcile(context.TODO(), reconcile.Request{NamespacedName: types.NamespacedName{Name: "test-dv", Namespace: metav1.NamespaceDefault}})
Expect(err).ToNot(HaveOccurred())

dv := &cdiv1.DataVolume{}
err = reconciler.client.Get(context.TODO(), types.NamespacedName{Name: "test-dv", Namespace: metav1.NamespaceDefault}, dv)
Expect(err).ToNot(HaveOccurred())
// Get original DV phase
dvPhase := dv.Status.Phase

// Create PVC Prime
pvc := &corev1.PersistentVolumeClaim{}
err = reconciler.client.Get(context.TODO(), types.NamespacedName{Name: "test-dv", Namespace: metav1.NamespaceDefault}, pvc)
Expect(err).ToNot(HaveOccurred())
pvcPrime := &corev1.PersistentVolumeClaim{}
pvcPrime.Name = populators.PVCPrimeName(pvc)
pvcPrime.Status.Phase = corev1.ClaimPending
err = reconciler.client.Create(context.TODO(), pvcPrime)
Expect(err).ToNot(HaveOccurred())

_, err = reconciler.updateStatus(getReconcileRequest(uploadDataVolume), nil, reconciler)
Expect(err).ToNot(HaveOccurred())
err = reconciler.client.Get(context.TODO(), types.NamespacedName{Name: "test-dv", Namespace: metav1.NamespaceDefault}, dv)
Expect(err).ToNot(HaveOccurred())
Expect(dv.Status.Phase).To(Equal(dvPhase))
})
})
})

Expand Down
34 changes: 22 additions & 12 deletions pkg/controller/populators/import-populator.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,21 +244,36 @@ func (r *ImportPopulatorReconciler) updateImportProgress(podPhase string, pvc, p
cc.AddAnnotation(pvc, cc.AnnPopulatorProgress, "100.0%")
return nil
}
importPod, err := r.getImportPod(pvcPrime)

importPodName, ok := pvcPrime.Annotations[cc.AnnImportPod]
if !ok {
return nil
}

importPod, err := r.getImportPod(pvcPrime, importPodName)
if err != nil {
return err
}

if importPod == nil {
_, ok := pvc.Annotations[cc.AnnPopulatorProgress]
// Initialize the progress once PVC Prime is bound
if !ok && pvcPrime.Status.Phase == corev1.ClaimBound {
cc.AddAnnotation(pvc, cc.AnnPopulatorProgress, "N/A")
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a nasty patch to force a reconcile in the DV controller when the PVC' gets bound disguised as an annotation to initialize progress. It should fix the error in the tests, and it matches the behavior in the DV controller. After spending hours on this I think this is the best-looking solution.

}
return nil
}

// This will only work when the import pod is running
if importPod != nil && importPod.Status.Phase != corev1.PodRunning {
if importPod.Status.Phase != corev1.PodRunning {
return nil
}

url, err := cc.GetMetricsURL(importPod)
if err != nil {
if url == "" || err != nil {
return err
}
if url == "" {
return nil
}

// We fetch the import progress from the import pod metrics
importRegExp := regexp.MustCompile("progress\\{ownerUID\\=\"" + string(pvc.UID) + "\"\\} (\\d{1,3}\\.?\\d*)")
httpClient = cc.BuildHTTPClient(httpClient)
Expand All @@ -275,12 +290,7 @@ func (r *ImportPopulatorReconciler) updateImportProgress(podPhase string, pvc, p
return nil
}

func (r *ImportPopulatorReconciler) getImportPod(pvc *corev1.PersistentVolumeClaim) (*corev1.Pod, error) {
importPodName, ok := pvc.Annotations[cc.AnnImportPod]
if !ok {
return nil, nil
}

func (r *ImportPopulatorReconciler) getImportPod(pvc *corev1.PersistentVolumeClaim, importPodName string) (*corev1.Pod, error) {
pod := &corev1.Pod{}
if err := r.client.Get(context.TODO(), types.NamespacedName{Name: importPodName, Namespace: pvc.GetNamespace()}, pod); err != nil {
if !k8serrors.IsNotFound(err) {
Expand Down
13 changes: 13 additions & 0 deletions pkg/controller/populators/import-populator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -454,6 +454,19 @@ var _ = Describe("Import populator tests", func() {
Expect(targetPvc.Annotations[AnnPopulatorProgress]).To(Equal("100.0%"))
})

It("should set N/A once PVC Prime is bound", func() {
targetPvc := CreatePvcInStorageClass(targetPvcName, metav1.NamespaceDefault, &sc.Name, nil, nil, corev1.ClaimBound)
pvcPrime := getPVCPrime(targetPvc, nil)
importPodName := fmt.Sprintf("%s-%s", common.ImporterPodName, pvcPrime.Name)
pvcPrime.Annotations = map[string]string{AnnImportPod: importPodName}
pvcPrime.Status.Phase = corev1.ClaimBound

reconciler = createImportPopulatorReconciler(targetPvc, pvcPrime, sc)
err := reconciler.updateImportProgress("", targetPvc, pvcPrime)
Expect(err).To(Not(HaveOccurred()))
Expect(targetPvc.Annotations[AnnPopulatorProgress]).To(Equal("N/A"))
})

It("should return error if no metrics in pod", func() {
targetPvc := CreatePvcInStorageClass(targetPvcName, metav1.NamespaceDefault, &sc.Name, nil, nil, corev1.ClaimBound)
pvcPrime := getPVCPrime(targetPvc, nil)
Expand Down