Skip to content

Commit

Permalink
Fix multi-stage import logic in import-populator and add remaining tests
Browse files Browse the repository at this point in the history
Signed-off-by: Alvaro Romero <alromero@redhat.com>
  • Loading branch information
alromeros committed Jul 10, 2023
1 parent c04ba83 commit 83942fe
Show file tree
Hide file tree
Showing 9 changed files with 209 additions and 22 deletions.
1 change: 1 addition & 0 deletions pkg/apiserver/webhooks/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ go_library(
"//vendor/k8s.io/apimachinery/pkg/util/validation/field:go_default_library",
"//vendor/k8s.io/client-go/kubernetes:go_default_library",
"//vendor/k8s.io/klog/v2:go_default_library",
"//vendor/k8s.io/utils/pointer:go_default_library",
"//vendor/kubevirt.io/controller-lifecycle-operator-sdk/api:go_default_library",
],
)
Expand Down
1 change: 0 additions & 1 deletion pkg/apiserver/webhooks/populators-validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,6 @@ func (wh *populatorValidatingWebhook) validateVolumeImportSource(ar admissionv1.
return nil, err
}

// Reject spec updates
if ar.Request.Operation == admissionv1.Update {
cause, err := wh.validateVolumeImportSourceUpdate(ar, &volumeImportSource)
if err != nil {
Expand Down
51 changes: 37 additions & 14 deletions pkg/apiserver/webhooks/populators-validate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,43 @@ var _ = Describe("Validating Webhook", func() {
Expect(resp.Allowed).To(BeFalse())
})

It("should accept VolumeImportSource spec checkpoints update", func() {
source := &cdiv1.ImportSourceType{
HTTP: &cdiv1.DataVolumeSourceHTTP{
URL: "http://www.example.com",
},
}
importCR := newVolumeImportSource(cdiv1.DataVolumeKubeVirt, source)
importCR.Spec.Checkpoints = []cdiv1.DataVolumeCheckpoint{
{Current: "test", Previous: ""},
}
newBytes, _ := json.Marshal(&importCR)

oldSource := importCR.DeepCopy()
oldSource.Spec.Source.HTTP.URL = "http://www.example.es"
oldSource.Spec.Checkpoints = nil
oldBytes, _ := json.Marshal(oldSource)

ar := &admissionv1.AdmissionReview{
Request: &admissionv1.AdmissionRequest{
Operation: admissionv1.Update,
Resource: metav1.GroupVersionResource{
Group: cdiv1.SchemeGroupVersion.Group,
Version: cdiv1.SchemeGroupVersion.Version,
Resource: "volumeimportsources",
},
Object: runtime.RawExtension{
Raw: newBytes,
},
OldObject: runtime.RawExtension{
Raw: oldBytes,
},
},
}
resp := validatePopulatorsAdmissionReview(ar)
Expect(resp.Allowed).To(BeFalse())
})

It("should accept VolumeImportSource with HTTP source on create", func() {
source := &cdiv1.ImportSourceType{
HTTP: &cdiv1.DataVolumeSourceHTTP{
Expand Down Expand Up @@ -145,13 +182,7 @@ var _ = Describe("Validating Webhook", func() {
Expect(resp.Allowed).To(BeFalse())
})

<<<<<<< Updated upstream
It("should VolumeImportSource with incomplete VDDK source", func() {
=======
<<<<<<< Updated upstream
=======
It("should reject VolumeImportSource with incomplete VDDK source", func() {
>>>>>>> Stashed changes
source := &cdiv1.ImportSourceType{
VDDK: &cdiv1.DataVolumeSourceVDDK{
BackingFile: "",
Expand Down Expand Up @@ -198,20 +229,12 @@ var _ = Describe("Validating Webhook", func() {
importCR.Spec.Checkpoints = []cdiv1.DataVolumeCheckpoint{
{Current: "test", Previous: ""},
}
<<<<<<< Updated upstream
importCR.Spec.TargetClaim = "test-pvc"
=======
targetClaim := "test-pvc"
importCR.Spec.TargetClaim = &targetClaim
>>>>>>> Stashed changes
resp := validateVolumeImportSourceCreate(importCR)
Expect(resp.Allowed).To(BeTrue())
})

<<<<<<< Updated upstream
=======
>>>>>>> Stashed changes
>>>>>>> Stashed changes
It("should accept VolumeImportSource with Registry source URL on create", func() {
url := "docker://registry:5000/test"
source := &cdiv1.ImportSourceType{
Expand Down
4 changes: 2 additions & 2 deletions pkg/controller/datavolume/import-controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ func (r *ImportReconciler) syncImport(log logr.Logger, req reconcile.Request) (d
pvcModifier := r.updateAnnotations
if syncState.usePopulator {
if syncState.dvMutated.Status.Phase != cdiv1.Succeeded {
err := r.createVolumeImportSourceCR(&syncState)
err := r.reconcileVolumeImportSourceCR(&syncState)
if err != nil {
return syncState, err
}
Expand Down Expand Up @@ -315,7 +315,7 @@ func volumeImportSourceName(dv *cdiv1.DataVolume) string {
return fmt.Sprintf("%s-%s", volumeImportSourcePrefix, dv.UID)
}

func (r *ImportReconciler) createVolumeImportSourceCR(syncState *dvSyncState) error {
func (r *ImportReconciler) reconcileVolumeImportSourceCR(syncState *dvSyncState) error {
dv := syncState.dvMutated
importSource := &cdiv1.VolumeImportSource{}
importSourceName := volumeImportSourceName(dv)
Expand Down
7 changes: 6 additions & 1 deletion pkg/controller/populators/import-populator.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ func (r *ImportPopulatorReconciler) reconcileTargetPVC(pvc, pvcPrime *corev1.Per
// We'll get called later once it succeeds
r.recorder.Eventf(pvc, corev1.EventTypeWarning, importFailed, messageImportFailed, pvc.Name)
case string(corev1.PodSucceeded):
if _, multiStageInProgress = pvc.Annotations[cc.AnnCurrentCheckpoint]; multiStageInProgress {
if _, multiStageInProgress = pvcPrime.Annotations[cc.AnnCurrentCheckpoint]; multiStageInProgress {
if err := cc.UpdatesMultistageImportSucceeded(pvcPrime, r.getCheckpointArgs(source)); err != nil {
return reconcile.Result{}, err
}
Expand Down Expand Up @@ -232,6 +232,11 @@ func (r *ImportPopulatorReconciler) updateImportAnnotations(pvc, pvcPrime *corev
if err := r.updateImportProgress(phase, pvc, pvcPrime); err != nil {
r.log.Error(err, "Failed to update import progress for pvc %s/%s", pvc.Namespace, pvc.Name)
}
if value, multiStageImport := pvcPrime.GetAnnotations()[cc.AnnCurrentCheckpoint]; multiStageImport {
cc.AddAnnotation(pvc, cc.AnnCurrentCheckpoint, value)
} else if _, multiStageImport := pvc.GetAnnotations()[cc.AnnCurrentCheckpoint]; multiStageImport {
delete(pvc.Annotations, cc.AnnCurrentCheckpoint)
}
updateVddkAnnotations(pvc, pvcPrime)
}

Expand Down
45 changes: 45 additions & 0 deletions tests/framework/vddk.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,3 +55,48 @@ func (f *Framework) CreateVddkWarmImportDataVolume(dataVolumeName, size, url str

return utils.NewDataVolumeWithVddkWarmImport(dataVolumeName, size, backingFile, s.Name, thumbprint, url, vmid.String(), currentCheckpoint, previousCheckpoint, finalCheckpoint)
}

// CreateVddkWarmImportPopulatorSource fetches snapshot information from vcsim and returns a multi-stage VDDK volumeImportSource
func (f *Framework) CreateVddkWarmImportPopulatorSource(volumeImportName, pvcName, url string) *cdiv1.VolumeImportSource {
// Find vcenter-simulator pod
pod, err := utils.FindPodByPrefix(f.K8sClient, f.CdiInstallNs, "vcenter-deployment", "app=vcenter")
gomega.Expect(err).ToNot(gomega.HaveOccurred())
gomega.Expect(pod).ToNot(gomega.BeNil())

// Get test VM UUID
id, err := f.RunKubectlCommand("exec", "-n", pod.Namespace, pod.Name, "--", "cat", "/tmp/vmid")
gomega.Expect(err).ToNot(gomega.HaveOccurred())
vmid, err := uuid.Parse(strings.TrimSpace(id))
gomega.Expect(err).ToNot(gomega.HaveOccurred())

// Get snapshot 1 ID
previousCheckpoint, err := f.RunKubectlCommand("exec", "-n", pod.Namespace, pod.Name, "--", "cat", "/tmp/vmsnapshot1")
gomega.Expect(err).ToNot(gomega.HaveOccurred())
previousCheckpoint = strings.TrimSpace(previousCheckpoint)
gomega.Expect(err).ToNot(gomega.HaveOccurred())

// Get snapshot 2 ID
currentCheckpoint, err := f.RunKubectlCommand("exec", "-n", pod.Namespace, pod.Name, "--", "cat", "/tmp/vmsnapshot2")
gomega.Expect(err).ToNot(gomega.HaveOccurred())
currentCheckpoint = strings.TrimSpace(currentCheckpoint)
gomega.Expect(err).ToNot(gomega.HaveOccurred())

// Get disk name
disk, err := f.RunKubectlCommand("exec", "-n", pod.Namespace, pod.Name, "--", "cat", "/tmp/vmdisk")
gomega.Expect(err).ToNot(gomega.HaveOccurred())
disk = strings.TrimSpace(disk)
gomega.Expect(err).ToNot(gomega.HaveOccurred())

// Create VDDK login secret
stringData := map[string]string{
common.KeyAccess: "user",
common.KeySecret: "pass",
}
backingFile := disk
secretRef := "vddksecret"
thumbprint := "testprint"
finalCheckpoint := true
s, _ := utils.CreateSecretFromDefinition(f.K8sClient, utils.NewSecretDefinition(nil, stringData, nil, f.Namespace.Name, secretRef))

return utils.NewVolumeImportSourceWithVddkWarmImport(volumeImportName, pvcName, backingFile, s.Name, thumbprint, url, vmid.String(), currentCheckpoint, previousCheckpoint, finalCheckpoint)
}
88 changes: 84 additions & 4 deletions tests/import_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ var _ = Describe("[rfe_id:1115][crit:high][vendor:cnv-qe@redhat.com][level:compo
It("[test_id:6688] Should retain all multi-stage importer pods after completion with dv annotation cdi.kubevirt.io/storage.pod.retainAfterCompletion=true", func() {
vcenterURL := fmt.Sprintf(utils.VcenterURL, f.CdiInstallNs)
dataVolume := f.CreateVddkWarmImportDataVolume("import-pod-retain-test", "100Mi", vcenterURL)

By(fmt.Sprintf("Create new datavolume %s", dataVolume.Name))
dataVolume.Annotations[controller.AnnPodRetainAfterCompletion] = "true"
dataVolume, err := utils.CreateDataVolumeFromDefinition(f.CdiClient, f.Namespace.Name, dataVolume)
Expand All @@ -182,22 +183,101 @@ var _ = Describe("[rfe_id:1115][crit:high][vendor:cnv-qe@redhat.com][level:compo
}
})

It("Should do multi-stage importer pods with populator flow", func() {
It("Should do multi-stage import with dataVolume populator flow", func() {
vcenterURL := fmt.Sprintf(utils.VcenterURL, f.CdiInstallNs)
dataVolume := f.CreateVddkWarmImportDataVolume("import-pod-retain-test", "100Mi", vcenterURL)
dataVolume := f.CreateVddkWarmImportDataVolume("multi-stage-import-test", "100Mi", vcenterURL)
By(fmt.Sprintf("Create new datavolume %s", dataVolume.Name))
dataVolume, err := utils.CreateDataVolumeFromDefinition(f.CdiClient, f.Namespace.Name, dataVolume)
Expect(err).ToNot(HaveOccurred())

By("Verify pvc was created")
pvc, err := utils.WaitForPVC(f.K8sClient, dataVolume.Namespace, dataVolume.Name)
_, err = utils.WaitForPVC(f.K8sClient, dataVolume.Namespace, dataVolume.Name)
Expect(err).ToNot(HaveOccurred())
f.ForceBindIfWaitForFirstConsumer(pvc)

By("Wait for import to be completed")
err = utils.WaitForDataVolumePhase(f, dataVolume.Namespace, cdiv1.Succeeded, dataVolume.Name)
Expect(err).ToNot(HaveOccurred(), "Datavolume not in phase succeeded in time")
})

It("Should update volumeImportSource accordingly when doind a multi-stage import", func() {
vcenterURL := fmt.Sprintf(utils.VcenterURL, f.CdiInstallNs)
dataVolume := f.CreateVddkWarmImportDataVolume("multi-stage-import-test", "100Mi", vcenterURL)

// Set FinalCheckpoint to false to pause the DataVolume
dataVolume.Spec.FinalCheckpoint = false
By(fmt.Sprintf("Create new datavolume %s", dataVolume.Name))
dataVolume, err := utils.CreateDataVolumeFromDefinition(f.CdiClient, f.Namespace.Name, dataVolume)
Expect(err).ToNot(HaveOccurred())
volumeImportSourceName := fmt.Sprintf("%s-%s", "volume-import-source", dataVolume.UID)

By("Verify pvc was created")
_, err = utils.WaitForPVC(f.K8sClient, dataVolume.Namespace, dataVolume.Name)
Expect(err).ToNot(HaveOccurred())
By("Verify volumeimportSource")
volumeImportSource, err := f.CdiClient.CdiV1beta1().VolumeImportSources(f.Namespace.Name).Get(context.TODO(), volumeImportSourceName, metav1.GetOptions{})
Expect(err).ToNot(HaveOccurred())
Expect(reflect.DeepEqual(dataVolume.Spec.Checkpoints, volumeImportSource.Spec.Checkpoints)).To(BeTrue())

By("Update DataVolume checkpoints")
Eventually(func() bool {
dataVolume, err := f.CdiClient.CdiV1beta1().DataVolumes(f.Namespace.Name).Get(context.TODO(), dataVolume.Name, metav1.GetOptions{})
dataVolume.Spec.Checkpoints = []cdiv1.DataVolumeCheckpoint{
{Current: "test", Previous: "foo"},
{Current: "foo", Previous: "test"},
}
_, err = f.CdiClient.CdiV1beta1().DataVolumes(f.Namespace.Name).Update(context.TODO(), dataVolume, metav1.UpdateOptions{})
return err == nil
}, timeout, pollingInterval).Should(BeTrue())

By("Check volumeImportSource is also updated")
Eventually(func() bool {
volumeImportSource, err := f.CdiClient.CdiV1beta1().VolumeImportSources(f.Namespace.Name).Get(context.TODO(), volumeImportSourceName, metav1.GetOptions{})
Expect(err).ToNot(HaveOccurred())
return reflect.DeepEqual(dataVolume.Spec.Checkpoints, volumeImportSource.Spec.Checkpoints)
}, timeout, pollingInterval).Should(BeTrue())
})

It("Should do multi-stage import with manually created volumeImportSource and PVC", func() {
pvcName := "multi-stage-import-pvc-test"
importSourceName := "multi-stage-import-test"
vcenterURL := fmt.Sprintf(utils.VcenterURL, f.CdiInstallNs)

By(fmt.Sprintf("Create volumeImportSource %s", importSourceName))
volumeImportSource := f.CreateVddkWarmImportPopulatorSource(importSourceName, pvcName, vcenterURL)
volumeImportSource, err := f.CdiClient.CdiV1beta1().VolumeImportSources(f.Namespace.Name).Create(
context.TODO(), volumeImportSource, metav1.CreateOptions{})
Expect(err).ToNot(HaveOccurred())

By(fmt.Sprintf("Create PVC to be populated %s", pvcName))
pvcDef := utils.NewPVCDefinition(pvcName, "1Gi", nil, nil)
apiGroup := controller.AnnAPIGroup
pvcDef.Spec.DataSourceRef = &v1.TypedObjectReference{
APIGroup: &apiGroup,
Kind: cdiv1.VolumeImportSourceRef,
Name: importSourceName,
}
pvc := f.CreateScheduledPVCFromDefinition(pvcDef)

By("Verify PVC prime was created")
pvcPrime, err := utils.WaitForPVC(f.K8sClient, pvc.Namespace, populators.PVCPrimeName(pvc))
Expect(err).ToNot(HaveOccurred())

By("Verify target PVC is bound")
err = utils.WaitForPersistentVolumeClaimPhase(f.K8sClient, pvc.Namespace, v1.ClaimBound, pvc.Name)

By("Verify 100.0% annotation")
progress, ok, err := utils.WaitForPVCAnnotation(f.K8sClient, f.Namespace.Name, pvc, controller.AnnPopulatorProgress)
Expect(err).ToNot(HaveOccurred())
Expect(ok).To(BeTrue())
Expect(progress).Should(BeEquivalentTo("100.0%"))

By("Wait for PVC prime to be deleted")
Eventually(func() bool {
// Make sure pvcPrime was deleted after import population
_, err := f.FindPVC(pvcPrime.Name)
return err != nil && k8serrors.IsNotFound(err)
}, timeout, pollingInterval).Should(BeTrue())
})
})

var _ = Describe("DataVolume Garbage Collection", func() {
Expand Down
1 change: 1 addition & 0 deletions tests/utils/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ go_library(
"deployments.go",
"fileConversion.go",
"pod.go",
"populators.go",
"pv.go",
"pvc.go",
"secrets.go",
Expand Down
33 changes: 33 additions & 0 deletions tests/utils/populators.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package utils

import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

cdiv1 "kubevirt.io/containerized-data-importer-api/pkg/apis/core/v1beta1"
)

// NewVolumeImportSourceWithVddkWarmImport initializes a VolumeImportSource for a multi-stage import from vCenter/ESX snapshots
func NewVolumeImportSourceWithVddkWarmImport(name, pvcName, backingFile, secretRef, thumbprint, httpURL, uuid, currentCheckpoint, previousCheckpoint string, finalCheckpoint bool) *cdiv1.VolumeImportSource {
return &cdiv1.VolumeImportSource{
ObjectMeta: metav1.ObjectMeta{
Name: name,
},
Spec: cdiv1.VolumeImportSourceSpec{
Source: &cdiv1.ImportSourceType{
VDDK: &cdiv1.DataVolumeSourceVDDK{
BackingFile: backingFile,
SecretRef: secretRef,
Thumbprint: thumbprint,
URL: httpURL,
UUID: uuid,
},
},
TargetClaim: &pvcName,
FinalCheckpoint: &finalCheckpoint,
Checkpoints: []cdiv1.DataVolumeCheckpoint{
{Current: previousCheckpoint, Previous: ""},
{Current: currentCheckpoint, Previous: previousCheckpoint},
},
},
}
}

0 comments on commit 83942fe

Please sign in to comment.