Skip to content

Commit

Permalink
Fix multi-stage import logic in import-populator and add remaining tests
Browse files Browse the repository at this point in the history
This commit fixes several bugs in the import-populator logic for multi-stage imports.

Signed-off-by: Alvaro Romero <alromero@redhat.com>
  • Loading branch information
alromeros committed Jul 11, 2023
1 parent 9a76e17 commit 814b25d
Show file tree
Hide file tree
Showing 14 changed files with 287 additions and 32 deletions.
1 change: 1 addition & 0 deletions pkg/apiserver/webhooks/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ go_library(
"//vendor/k8s.io/apimachinery/pkg/util/validation/field:go_default_library",
"//vendor/k8s.io/client-go/kubernetes:go_default_library",
"//vendor/k8s.io/klog/v2:go_default_library",
"//vendor/k8s.io/utils/pointer:go_default_library",
"//vendor/kubevirt.io/controller-lifecycle-operator-sdk/api:go_default_library",
],
)
Expand Down
1 change: 0 additions & 1 deletion pkg/apiserver/webhooks/populators-validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,6 @@ func (wh *populatorValidatingWebhook) validateVolumeImportSource(ar admissionv1.
return nil, err
}

// Reject spec updates
if ar.Request.Operation == admissionv1.Update {
cause, err := wh.validateVolumeImportSourceUpdate(ar, &volumeImportSource)
if err != nil {
Expand Down
37 changes: 37 additions & 0 deletions pkg/apiserver/webhooks/populators-validate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,43 @@ var _ = Describe("Validating Webhook", func() {
Expect(resp.Allowed).To(BeFalse())
})

It("should accept VolumeImportSource spec checkpoints update", func() {
source := &cdiv1.ImportSourceType{
HTTP: &cdiv1.DataVolumeSourceHTTP{
URL: "http://www.example.com",
},
}
importCR := newVolumeImportSource(cdiv1.DataVolumeKubeVirt, source)
importCR.Spec.Checkpoints = []cdiv1.DataVolumeCheckpoint{
{Current: "test", Previous: ""},
}
newBytes, _ := json.Marshal(&importCR)

oldSource := importCR.DeepCopy()
oldSource.Spec.Source.HTTP.URL = "http://www.example.es"
oldSource.Spec.Checkpoints = nil
oldBytes, _ := json.Marshal(oldSource)

ar := &admissionv1.AdmissionReview{
Request: &admissionv1.AdmissionRequest{
Operation: admissionv1.Update,
Resource: metav1.GroupVersionResource{
Group: cdiv1.SchemeGroupVersion.Group,
Version: cdiv1.SchemeGroupVersion.Version,
Resource: "volumeimportsources",
},
Object: runtime.RawExtension{
Raw: newBytes,
},
OldObject: runtime.RawExtension{
Raw: oldBytes,
},
},
}
resp := validatePopulatorsAdmissionReview(ar)
Expect(resp.Allowed).To(BeFalse())
})

It("should accept VolumeImportSource with HTTP source on create", func() {
source := &cdiv1.ImportSourceType{
HTTP: &cdiv1.DataVolumeSourceHTTP{
Expand Down
10 changes: 10 additions & 0 deletions pkg/controller/common/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -1022,6 +1022,16 @@ func IsPVCComplete(pvc *corev1.PersistentVolumeClaim) bool {
return false
}

// IsMultiStageImportInProgress returns true when a PVC is being part of an ongoing multi-stage import
func IsMultiStageImportInProgress(pvc *corev1.PersistentVolumeClaim) bool {
if pvc != nil {
multiStageImport := metav1.HasAnnotation(pvc.ObjectMeta, AnnCurrentCheckpoint)
multiStageAlreadyDone := metav1.HasAnnotation(pvc.ObjectMeta, AnnMultiStageImportDone)
return multiStageImport && !multiStageAlreadyDone
}
return false
}

// SetRestrictedSecurityContext sets the pod security params to be compatible with restricted PSA
func SetRestrictedSecurityContext(podSpec *corev1.PodSpec) {
hasVolumeMounts := false
Expand Down
3 changes: 1 addition & 2 deletions pkg/controller/datavolume/controller-base.go
Original file line number Diff line number Diff line change
Expand Up @@ -1150,8 +1150,7 @@ func (r *ReconcilerBase) handlePvcCreation(log logr.Logger, syncState *dvSyncSta
// shouldUseCDIPopulator returns if the population of the PVC should be done using
// CDI populators.
// Currently it will use populators only if:
// * no podRetainAfterCompletion or immediateBinding annotations
// * source is not PVC or Snapshot
// * no podRetainAfterCompletion annotation
// * storageClass bindingMode is not wffc while honorWaitForFirstConsumer feature gate is disabled
// * storageClass used is CSI storageClass
func (r *ReconcilerBase) shouldUseCDIPopulator(syncState *dvSyncState) (bool, error) {
Expand Down
7 changes: 4 additions & 3 deletions pkg/controller/datavolume/import-controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ func (r *ImportReconciler) syncImport(log logr.Logger, req reconcile.Request) (d
pvcModifier := r.updateAnnotations
if syncState.usePopulator {
if syncState.dvMutated.Status.Phase != cdiv1.Succeeded {
err := r.createVolumeImportSourceCR(&syncState)
err := r.reconcileVolumeImportSourceCR(&syncState)
if err != nil {
return syncState, err
}
Expand Down Expand Up @@ -271,7 +271,8 @@ func (r *ImportReconciler) updateStatusPhase(pvc *corev1.PersistentVolumeClaim,
event.reason = ImportFailed
event.message = fmt.Sprintf(MessageImportFailed, pvc.Name)
case string(corev1.PodSucceeded):
if _, ok := pvc.Annotations[cc.AnnCurrentCheckpoint]; ok {
if cc.IsMultiStageImportInProgress(pvc) {
// Multi-stage annotations will be updated by import-populator if populators are in use
if !importPopulation {
if err := cc.UpdatesMultistageImportSucceeded(pvc, r.getCheckpointArgs(dataVolumeCopy)); err != nil {
return err
Expand Down Expand Up @@ -318,7 +319,7 @@ func volumeImportSourceName(dv *cdiv1.DataVolume) string {
return fmt.Sprintf("%s-%s", volumeImportSourcePrefix, dv.UID)
}

func (r *ImportReconciler) createVolumeImportSourceCR(syncState *dvSyncState) error {
func (r *ImportReconciler) reconcileVolumeImportSourceCR(syncState *dvSyncState) error {
dv := syncState.dvMutated
importSource := &cdiv1.VolumeImportSource{}
importSourceName := volumeImportSourceName(dv)
Expand Down
5 changes: 1 addition & 4 deletions pkg/controller/import-controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,10 +177,7 @@ func (r *ImportReconciler) shouldReconcilePVC(pvc *corev1.PersistentVolumeClaim,
return false, err
}

multiStageImport := metav1.HasAnnotation(pvc.ObjectMeta, cc.AnnCurrentCheckpoint)
multiStageAlreadyDone := metav1.HasAnnotation(pvc.ObjectMeta, cc.AnnMultiStageImportDone)

return (!cc.IsPVCComplete(pvc) || (cc.IsPVCComplete(pvc) && multiStageImport && !multiStageAlreadyDone)) &&
return (!cc.IsPVCComplete(pvc) || cc.IsMultiStageImportInProgress(pvc)) &&
(checkPVC(pvc, cc.AnnEndpoint, log) || checkPVC(pvc, cc.AnnSource, log)) &&
shouldHandlePvc(pvc, waitForFirstConsumerEnabled, log),
nil
Expand Down
6 changes: 3 additions & 3 deletions pkg/controller/populators/import-populator.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,6 @@ func (r *ImportPopulatorReconciler) getPopulationSource(pvc *corev1.PersistentVo
func (r *ImportPopulatorReconciler) reconcileTargetPVC(pvc, pvcPrime *corev1.PersistentVolumeClaim) (reconcile.Result, error) {
pvcCopy := pvc.DeepCopy()
phase := pvcPrime.Annotations[cc.AnnPodPhase]
multiStageInProgress := false
source, err := r.getPopulationSource(pvc)
if source == nil {
return reconcile.Result{}, err
Expand All @@ -148,13 +147,14 @@ func (r *ImportPopulatorReconciler) reconcileTargetPVC(pvc, pvcPrime *corev1.Per
// We'll get called later once it succeeds
r.recorder.Eventf(pvc, corev1.EventTypeWarning, importFailed, messageImportFailed, pvc.Name)
case string(corev1.PodSucceeded):
if _, multiStageInProgress = pvc.Annotations[cc.AnnCurrentCheckpoint]; multiStageInProgress {
if cc.IsMultiStageImportInProgress(pvcPrime) {
if err := cc.UpdatesMultistageImportSucceeded(pvcPrime, r.getCheckpointArgs(source)); err != nil {
return reconcile.Result{}, err
}
r.recorder.Eventf(pvc, corev1.EventTypeNormal, cc.ImportPaused, cc.MessageImportPaused, pvc.Name)
break
}

// Once the import is succeeded, we rebind the PV from PVC' to target PVC
if err := cc.Rebind(context.TODO(), r.client, pvcPrime, pvc); err != nil {
return reconcile.Result{}, err
Expand All @@ -165,7 +165,7 @@ func (r *ImportPopulatorReconciler) reconcileTargetPVC(pvc, pvcPrime *corev1.Per
if err != nil {
return reconcile.Result{}, err
}
if !multiStageInProgress && cc.IsPVCComplete(pvcPrime) {
if cc.IsPVCComplete(pvcPrime) && !cc.IsMultiStageImportInProgress(pvc) {
r.recorder.Eventf(pvc, corev1.EventTypeNormal, importSucceeded, messageImportSucceeded, pvc.Name)
}

Expand Down
49 changes: 49 additions & 0 deletions pkg/controller/populators/import-populator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/tools/record"
"k8s.io/utils/pointer"
"sigs.k8s.io/controller-runtime/pkg/client/fake"
logf "sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
Expand Down Expand Up @@ -358,6 +359,54 @@ var _ = Describe("Import populator tests", func() {
table.Entry("with pod failed phase", string(corev1.PodFailed)),
table.Entry("with pod succeded phase", string(corev1.PodSucceeded)),
)

It("Should set multistage migration annotations on PVC prime", func() {
targetPvc := CreatePvcInStorageClass(targetPvcName, metav1.NamespaceDefault, &sc.Name, nil, nil, corev1.ClaimBound)
targetPvc.Spec.DataSourceRef = dataSourceRef
volumeImportSource := getVolumeImportSource(true, metav1.NamespaceDefault)
volumeImportSource.Spec.Source = &cdiv1.ImportSourceType{
VDDK: &cdiv1.DataVolumeSourceVDDK{
BackingFile: "testBackingFile",
SecretRef: "testSecret",
Thumbprint: "testThumbprint",
URL: "testUrl",
UUID: "testUUID",
},
}
volumeImportSource.Spec.Checkpoints = []cdiv1.DataVolumeCheckpoint{
{
Previous: "previous",
Current: "current",
},
}
volumeImportSource.Spec.FinalCheckpoint = pointer.Bool(true)

By("Reconcile")
reconciler = createImportPopulatorReconciler(targetPvc, volumeImportSource, sc)
result, err := reconciler.Reconcile(context.TODO(), reconcile.Request{NamespacedName: types.NamespacedName{Name: targetPvcName, Namespace: metav1.NamespaceDefault}})
Expect(err).To(Not(HaveOccurred()))
Expect(result).To(Not(BeNil()))

By("Checking events recorded")
close(reconciler.recorder.(*record.FakeRecorder).Events)
found := false
for event := range reconciler.recorder.(*record.FakeRecorder).Events {
if strings.Contains(event, createdPVCPrimeSuccessfully) {
found = true
}
}
reconciler.recorder = nil
Expect(found).To(BeTrue())

By("Checking PVC' annotations")
pvcPrime, err := reconciler.getPVCPrime(targetPvc)
Expect(err).ToNot(HaveOccurred())
Expect(pvcPrime).ToNot(BeNil())
Expect(pvcPrime.GetAnnotations()).ToNot(BeNil())
Expect(pvcPrime.GetAnnotations()[AnnPreviousCheckpoint]).To(Equal("previous"))
Expect(pvcPrime.GetAnnotations()[AnnCurrentCheckpoint]).To(Equal("current"))
Expect(pvcPrime.GetAnnotations()[AnnFinalCheckpoint]).To(Equal("true"))
})
})

var _ = Describe("Import populator progress report", func() {
Expand Down
4 changes: 2 additions & 2 deletions pkg/controller/populators/populator-base.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ func (r *ReconcilerBase) createPVCPrime(pvc *corev1.PersistentVolumeClaim, sourc
type updatePVCAnnotationsFunc func(pvc, pvcPrime *corev1.PersistentVolumeClaim)

var desiredAnnotations = []string{cc.AnnPodPhase, cc.AnnPodReady, cc.AnnPodRestarts,
cc.AnnPreallocationRequested, cc.AnnPreallocationApplied,
cc.AnnPreallocationRequested, cc.AnnPreallocationApplied, cc.AnnCurrentCheckpoint, cc.AnnMultiStageImportDone,
cc.AnnRunningCondition, cc.AnnRunningConditionMessage, cc.AnnRunningConditionReason}

func (r *ReconcilerBase) updatePVCWithPVCPrimeAnnotations(pvc, pvcPrime *corev1.PersistentVolumeClaim, updateFunc updatePVCAnnotationsFunc) error {
Expand Down Expand Up @@ -265,7 +265,7 @@ func (r *ReconcilerBase) reconcile(req reconcile.Request, populator populatorCon
}

// Each populator reconciles the target PVC in a different way
if cc.IsUnbound(pvc) || !cc.IsPVCComplete(pvc) {
if cc.IsUnbound(pvc) || !cc.IsPVCComplete(pvc) || cc.IsMultiStageImportInProgress(pvc) {
return populator.reconcileTargetPVC(pvc, pvcPrime)
}

Expand Down
45 changes: 45 additions & 0 deletions tests/framework/vddk.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,3 +55,48 @@ func (f *Framework) CreateVddkWarmImportDataVolume(dataVolumeName, size, url str

return utils.NewDataVolumeWithVddkWarmImport(dataVolumeName, size, backingFile, s.Name, thumbprint, url, vmid.String(), currentCheckpoint, previousCheckpoint, finalCheckpoint)
}

// CreateVddkWarmImportPopulatorSource fetches snapshot information from vcsim and returns a multi-stage VDDK volumeImportSource
func (f *Framework) CreateVddkWarmImportPopulatorSource(volumeImportName, pvcName, url string) *cdiv1.VolumeImportSource {
// Find vcenter-simulator pod
pod, err := utils.FindPodByPrefix(f.K8sClient, f.CdiInstallNs, "vcenter-deployment", "app=vcenter")
gomega.Expect(err).ToNot(gomega.HaveOccurred())
gomega.Expect(pod).ToNot(gomega.BeNil())

// Get test VM UUID
id, err := f.RunKubectlCommand("exec", "-n", pod.Namespace, pod.Name, "--", "cat", "/tmp/vmid")
gomega.Expect(err).ToNot(gomega.HaveOccurred())
vmid, err := uuid.Parse(strings.TrimSpace(id))
gomega.Expect(err).ToNot(gomega.HaveOccurred())

// Get snapshot 1 ID
previousCheckpoint, err := f.RunKubectlCommand("exec", "-n", pod.Namespace, pod.Name, "--", "cat", "/tmp/vmsnapshot1")
gomega.Expect(err).ToNot(gomega.HaveOccurred())
previousCheckpoint = strings.TrimSpace(previousCheckpoint)
gomega.Expect(err).ToNot(gomega.HaveOccurred())

// Get snapshot 2 ID
currentCheckpoint, err := f.RunKubectlCommand("exec", "-n", pod.Namespace, pod.Name, "--", "cat", "/tmp/vmsnapshot2")
gomega.Expect(err).ToNot(gomega.HaveOccurred())
currentCheckpoint = strings.TrimSpace(currentCheckpoint)
gomega.Expect(err).ToNot(gomega.HaveOccurred())

// Get disk name
disk, err := f.RunKubectlCommand("exec", "-n", pod.Namespace, pod.Name, "--", "cat", "/tmp/vmdisk")
gomega.Expect(err).ToNot(gomega.HaveOccurred())
disk = strings.TrimSpace(disk)
gomega.Expect(err).ToNot(gomega.HaveOccurred())

// Create VDDK login secret
stringData := map[string]string{
common.KeyAccess: "user",
common.KeySecret: "pass",
}
backingFile := disk
secretRef := "vddksecret"
thumbprint := "testprint"
finalCheckpoint := true
s, _ := utils.CreateSecretFromDefinition(f.K8sClient, utils.NewSecretDefinition(nil, stringData, nil, f.Namespace.Name, secretRef))

return utils.NewVolumeImportSourceWithVddkWarmImport(volumeImportName, pvcName, backingFile, s.Name, thumbprint, url, vmid.String(), currentCheckpoint, previousCheckpoint, finalCheckpoint)
}
Loading

0 comments on commit 814b25d

Please sign in to comment.