Skip to content

Commit

Permalink
Set pod affinity for host assisted clone (#2647)
Browse files Browse the repository at this point in the history
* Set pod affinity for host assisted clone source pods

Signed-off-by: Ido Aharon <iaharon@redhat.com>

* fixed expect condition

Signed-off-by: Ido Aharon <iaharon@redhat.com>

---------

Signed-off-by: Ido Aharon <iaharon@redhat.com>
  • Loading branch information
ido106 committed May 1, 2023
1 parent b980014 commit 163f77a
Show file tree
Hide file tree
Showing 2 changed files with 90 additions and 52 deletions.
34 changes: 30 additions & 4 deletions pkg/controller/clone-controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ const (

cloneSourcePodFinalizer = "cdi.kubevirt.io/cloneSource"

hostAssistedCloneSource = "cdi.kubevirt.io/hostAssistedSourcePodCloneSource"

uploadClientCertDuration = 365 * 24 * time.Hour
)

Expand Down Expand Up @@ -477,7 +479,7 @@ func (r *CloneReconciler) cleanup(pvc *corev1.PersistentVolumeClaim, log logr.Lo

// CreateCloneSourcePod creates our cloning src pod which will be used for out of band cloning to read the contents of the src PVC
func (r *CloneReconciler) CreateCloneSourcePod(image, pullPolicy string, pvc *corev1.PersistentVolumeClaim, log logr.Logger) (*corev1.Pod, error) {
exists, sourcePvcNamespace, sourcePvcName := ParseCloneRequestAnnotation(pvc)
exists, _, _ := ParseCloneRequestAnnotation(pvc)
if !exists {
return nil, errors.Errorf("bad CloneRequest Annotation")
}
Expand Down Expand Up @@ -519,7 +521,7 @@ func (r *CloneReconciler) CreateCloneSourcePod(image, pullPolicy string, pvc *co
sourceVolumeMode = corev1.PersistentVolumeFilesystem
}

pod := MakeCloneSourcePodSpec(sourceVolumeMode, image, pullPolicy, imagePullSecrets, sourcePvcName, sourcePvcNamespace, ownerKey, serverCABundle, pvc, podResourceRequirements, workloadNodePlacement)
pod := MakeCloneSourcePodSpec(sourceVolumeMode, image, pullPolicy, ownerKey, imagePullSecrets, serverCABundle, pvc, sourcePvc, podResourceRequirements, workloadNodePlacement)
util.SetRecommendedLabels(pod, r.installerLabels, "cdi-controller")

if err := r.client.Create(context.TODO(), pod); err != nil {
Expand All @@ -532,10 +534,14 @@ func (r *CloneReconciler) CreateCloneSourcePod(image, pullPolicy string, pvc *co
}

// MakeCloneSourcePodSpec creates and returns the clone source pod spec based on the target pvc.
func MakeCloneSourcePodSpec(sourceVolumeMode corev1.PersistentVolumeMode, image, pullPolicy string, imagePullSecrets []corev1.LocalObjectReference, sourcePvcName, sourcePvcNamespace, ownerRefAnno string,
serverCACert []byte, targetPvc *corev1.PersistentVolumeClaim, resourceRequirements *corev1.ResourceRequirements,
func MakeCloneSourcePodSpec(sourceVolumeMode corev1.PersistentVolumeMode, image, pullPolicy, ownerRefAnno string, imagePullSecrets []corev1.LocalObjectReference,
serverCACert []byte, targetPvc, sourcePvc *corev1.PersistentVolumeClaim, resourceRequirements *corev1.ResourceRequirements,
workloadNodePlacement *sdkapi.NodePlacement) *corev1.Pod {

sourcePvcName := sourcePvc.GetName()
sourcePvcNamespace := sourcePvc.GetNamespace()
sourcePvcUID := string(sourcePvc.GetUID())

var ownerID string
cloneSourcePodName := targetPvc.Annotations[AnnCloneSourcePod]
url := GetUploadServerURL(targetPvc.Namespace, targetPvc.Name, common.UploadPathSync)
Expand Down Expand Up @@ -565,6 +571,7 @@ func MakeCloneSourcePodSpec(sourceVolumeMode corev1.PersistentVolumeMode, image,
// this label is used when searching for a pvc's cloner source pod.
cc.CloneUniqueID: cloneSourcePodName,
common.PrometheusLabelKey: common.PrometheusLabelValue,
hostAssistedCloneSource: sourcePvcUID,
},
},
Spec: corev1.PodSpec{
Expand Down Expand Up @@ -650,6 +657,25 @@ func MakeCloneSourcePodSpec(sourceVolumeMode corev1.PersistentVolumeMode, image,
pod.Spec.Affinity.PodAffinity = &corev1.PodAffinity{}
}

if len(sourcePvc.Spec.AccessModes) == 1 && sourcePvc.Spec.AccessModes[0] == corev1.ReadWriteOnce {
pod.Spec.Affinity.PodAffinity.RequiredDuringSchedulingIgnoredDuringExecution = append(
pod.Spec.Affinity.PodAffinity.RequiredDuringSchedulingIgnoredDuringExecution,
corev1.PodAffinityTerm{
LabelSelector: &metav1.LabelSelector{
MatchExpressions: []metav1.LabelSelectorRequirement{
{
Key: hostAssistedCloneSource,
Operator: metav1.LabelSelectorOpIn,
Values: []string{sourcePvcUID},
},
},
},
Namespaces: []string{sourcePvcNamespace},
TopologyKey: corev1.LabelHostname,
},
)
}

pod.Spec.Affinity.PodAffinity.PreferredDuringSchedulingIgnoredDuringExecution = append(
pod.Spec.Affinity.PodAffinity.PreferredDuringSchedulingIgnoredDuringExecution,
corev1.WeightedPodAffinityTerm{
Expand Down
108 changes: 60 additions & 48 deletions tests/cloner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (

cdiv1 "kubevirt.io/containerized-data-importer-api/pkg/apis/core/v1beta1"
"kubevirt.io/containerized-data-importer/pkg/common"
ctrl "kubevirt.io/containerized-data-importer/pkg/controller"
controller "kubevirt.io/containerized-data-importer/pkg/controller/common"
dvc "kubevirt.io/containerized-data-importer/pkg/controller/datavolume"
"kubevirt.io/containerized-data-importer/pkg/token"
Expand All @@ -50,6 +51,7 @@ const (
verifyPodDeletedTimeout = 270 * time.Second
controllerSkipPVCCompleteTimeout = 270 * time.Second
crossVolumeModeCloneMD5NumBytes = 100000
hostAssistedCloneSource = "cdi.kubevirt.io/hostAssistedSourcePodCloneSource"
)

var _ = Describe("all clone tests", func() {
Expand Down Expand Up @@ -95,6 +97,7 @@ var _ = Describe("all clone tests", func() {
}

})

It("[test_id:6693]Should clone imported data and retain transfer pods after completion", func() {
smartApplicable := f.IsSnapshotStorageClassAvailable()
sc, err := f.K8sClient.StorageV1().StorageClasses().Get(context.TODO(), f.SnapshotSCName, metav1.GetOptions{})
Expand Down Expand Up @@ -127,6 +130,8 @@ var _ = Describe("all clone tests", func() {
cloner, err := utils.FindPodBySuffixOnce(f.K8sClient, targetDataVolume.Namespace, common.ClonerSourcePodNameSuffix, common.CDILabelSelector)
Expect(err).ToNot(HaveOccurred())
Expect(cloner.DeletionTimestamp).To(BeNil())
// The Pod should be associated with the host-assisted clone source
Expect(cloner.GetLabels()[hostAssistedCloneSource]).To(Equal(string(pvc.GetUID())))

By("Find upload pod after completion")
uploader, err := utils.FindPodByPrefixOnce(f.K8sClient, targetDataVolume.Namespace, "cdi-upload-", common.CDILabelSelector)
Expand Down Expand Up @@ -948,21 +953,25 @@ var _ = Describe("all clone tests", func() {
tinyCoreIsoURL := func() string { return fmt.Sprintf(utils.TinyCoreIsoURL, f.CdiInstallNs) }

var (
sourceDv, targetDv1, targetDv2, targetDv3 *cdiv1.DataVolume
err error
sourceDv *cdiv1.DataVolume
targetDvs []*cdiv1.DataVolume
err error
)

AfterEach(func() {
dvs := []*cdiv1.DataVolume{sourceDv, targetDv1, targetDv2, targetDv3}
for _, dv := range dvs {
targetDvs = append(targetDvs, sourceDv)
for _, dv := range targetDvs {
cleanDv(f, dv)
if dv != nil && dv.Status.Phase == cdiv1.Succeeded {
validateCloneType(f, dv)
}
}
targetDvs = nil
})

It("[rfe_id:1277][test_id:1899][crit:High][vendor:cnv-qe@redhat.com][level:component] Should allow multiple cloning operations in parallel", func() {
const NumOfClones int = 3

By("Creating a source from a real image")
sourceDv = utils.NewDataVolumeWithHTTPImport("source-dv", "200Mi", tinyCoreIsoURL())
sourceDv, err = utils.CreateDataVolumeFromDefinition(f.CdiClient, f.Namespace.Name, sourceDv)
Expand All @@ -984,60 +993,71 @@ var _ = Describe("all clone tests", func() {
_, err = utils.WaitPodDeleted(f.K8sClient, "execute-command", f.Namespace.Name, verifyPodDeletedTimeout)
Expect(err).ToNot(HaveOccurred())

// By not waiting for completion, we will start 3 transfers in parallell
By("Cloning from the source DataVolume to target1")
targetDv1 = utils.NewDataVolumeForImageCloning("target-dv1", "200Mi", f.Namespace.Name, sourceDv.Name, sourceDv.Spec.PVC.StorageClassName, sourceDv.Spec.PVC.VolumeMode)
targetDv1.Annotations[controller.AnnPodRetainAfterCompletion] = "true"
targetDv1, err = utils.CreateDataVolumeFromDefinition(f.CdiClient, f.Namespace.Name, targetDv1)
Expect(err).ToNot(HaveOccurred())

By("Cloning from the source DataVolume to target2 in parallel")
targetDv2 = utils.NewDataVolumeForImageCloning("target-dv2", "200Mi", f.Namespace.Name, sourceDv.Name, sourceDv.Spec.PVC.StorageClassName, sourceDv.Spec.PVC.VolumeMode)
targetDv2.Annotations[controller.AnnPodRetainAfterCompletion] = "true"
targetDv2, err = utils.CreateDataVolumeFromDefinition(f.CdiClient, f.Namespace.Name, targetDv2)
Expect(err).ToNot(HaveOccurred())

By("Cloning from the source DataVolume to target3 in parallel")
targetDv3 = utils.NewDataVolumeForImageCloning("target-dv3", "200Mi", f.Namespace.Name, sourceDv.Name, sourceDv.Spec.PVC.StorageClassName, sourceDv.Spec.PVC.VolumeMode)
targetDv3.Annotations[controller.AnnPodRetainAfterCompletion] = "true"
targetDv3, err = utils.CreateDataVolumeFromDefinition(f.CdiClient, f.Namespace.Name, targetDv3)
Expect(err).ToNot(HaveOccurred())
// By not waiting for completion, we will start 3 transfers in parallel
By("Cloning #NumOfClones times in parallel")
for i := 1; i <= NumOfClones; i++ {
By("Cloning from the source DataVolume to target" + strconv.Itoa(i))
targetDv := utils.NewDataVolumeForImageCloning("target-dv"+strconv.Itoa(i), "200Mi", f.Namespace.Name, sourceDv.Name, sourceDv.Spec.PVC.StorageClassName, sourceDv.Spec.PVC.VolumeMode)
targetDv.Annotations[controller.AnnPodRetainAfterCompletion] = "true"
targetDv, err = utils.CreateDataVolumeFromDefinition(f.CdiClient, f.Namespace.Name, targetDv)
Expect(err).ToNot(HaveOccurred())
f.ForceBindPvcIfDvIsWaitForFirstConsumer(targetDv)
targetDvs = append(targetDvs, targetDv)
}

dvs := []*cdiv1.DataVolume{targetDv1, targetDv2, targetDv3}
for _, dv := range dvs {
f.ForceBindPvcIfDvIsWaitForFirstConsumer(dv)
podsNodeName := make(map[string]bool)
for _, dv := range targetDvs {
By("Waiting for clone to be completed")
err = utils.WaitForDataVolumePhaseWithTimeout(f, f.Namespace.Name, cdiv1.Succeeded, dv.Name, 3*90*time.Second)
Expect(err).ToNot(HaveOccurred())
}

if cloneType == "network" {
// Make sure we don't have high number of restart counts on source pods
for _, dv := range dvs {
for _, dv := range targetDvs {
pvc, err := f.K8sClient.CoreV1().PersistentVolumeClaims(dv.Namespace).Get(context.TODO(), dv.Name, metav1.GetOptions{})
Expect(err).ToNot(HaveOccurred())
clonerPodName := controller.CreateCloneSourcePodName(pvc)
cloner, err := f.K8sClient.CoreV1().Pods(dv.Namespace).Get(context.TODO(), clonerPodName, metav1.GetOptions{})
Expect(err).ToNot(HaveOccurred())
restartCount := cloner.Status.ContainerStatuses[0].RestartCount
fmt.Fprintf(GinkgoWriter, "INFO: restart count on clone source pod %s: %d\n", clonerPodName, restartCount)
Expect(restartCount).To(BeNumerically("<", 2))
// TODO remove the comment when the issue in #2550 is fixed
// Expect(restartCount).To(BeNumerically("<", 2))
}
}

for _, dv := range dvs {
for _, dv := range targetDvs {
By("Verifying MD5 sum matches")
matchFile := filepath.Join(testBaseDir, "disk.img")
Expect(f.VerifyTargetPVCContentMD5(f.Namespace, utils.PersistentVolumeClaimFromDataVolume(dv), matchFile, md5sum[:32])).To(BeTrue())

pvc, err := f.K8sClient.CoreV1().PersistentVolumeClaims(dv.Namespace).Get(context.TODO(), dv.Name, metav1.GetOptions{})
Expect(err).ToNot(HaveOccurred())

if cloneSourcePod := pvc.Annotations[ctrl.AnnCloneSourcePod]; cloneSourcePod != "" {
By(fmt.Sprintf("Getting pod %s/%s", dv.Namespace, cloneSourcePod))
pod, err := f.K8sClient.CoreV1().Pods(dv.Namespace).Get(context.TODO(), cloneSourcePod, metav1.GetOptions{})
Expect(err).ToNot(HaveOccurred())
podsNodeName[pod.Spec.NodeName] = true
}

By("Deleting verifier pod")
err = f.K8sClient.CoreV1().Pods(f.Namespace.Name).Delete(context.TODO(), utils.VerifierPodName, metav1.DeleteOptions{})
Expect(err).ToNot(HaveOccurred())
_, err = utils.WaitPodDeleted(f.K8sClient, utils.VerifierPodName, f.Namespace.Name, verifyPodDeletedTimeout)
Expect(err).ToNot(HaveOccurred())
}

// All pods should be in the same node except when the map is empty in smart clone
if cloneType == "network" {
Expect(podsNodeName).To(HaveLen(1))
}
})

It("[rfe_id:1277][test_id:1899][crit:High][vendor:cnv-qe@redhat.com][level:component] Should allow multiple cloning operations in parallel for block devices", func() {
const NumOfClones int = 3

if !f.IsBlockVolumeStorageClassAvailable() {
Skip("Storage Class for block volume is not available")
}
Expand All @@ -1056,32 +1076,23 @@ var _ = Describe("all clone tests", func() {
_, _ = fmt.Fprintf(GinkgoWriter, "INFO: MD5SUM for source is: %s\n", md5sum[:32])

// By not waiting for completion, we will start 3 transfers in parallell
By("Cloning from the source DataVolume to target1")
targetDv1 = utils.NewDataVolumeForImageCloning("target-dv1", "200Mi", f.Namespace.Name, sourceDv.Name, sourceDv.Spec.PVC.StorageClassName, sourceDv.Spec.PVC.VolumeMode)
targetDv1, err = utils.CreateDataVolumeFromDefinition(f.CdiClient, f.Namespace.Name, targetDv1)
Expect(err).ToNot(HaveOccurred())
f.ForceBindPvcIfDvIsWaitForFirstConsumer(targetDv1)

By("Cloning from the source DataVolume to target2 in parallel")
targetDv2 = utils.NewDataVolumeForImageCloning("target-dv2", "200Mi", f.Namespace.Name, sourceDv.Name, sourceDv.Spec.PVC.StorageClassName, sourceDv.Spec.PVC.VolumeMode)
targetDv2, err = utils.CreateDataVolumeFromDefinition(f.CdiClient, f.Namespace.Name, targetDv2)
Expect(err).ToNot(HaveOccurred())
f.ForceBindPvcIfDvIsWaitForFirstConsumer(targetDv2)

By("Cloning from the source DataVolume to target3 in parallel")
targetDv3 = utils.NewDataVolumeForImageCloning("target-dv3", "200Mi", f.Namespace.Name, sourceDv.Name, sourceDv.Spec.PVC.StorageClassName, sourceDv.Spec.PVC.VolumeMode)
targetDv3, err = utils.CreateDataVolumeFromDefinition(f.CdiClient, f.Namespace.Name, targetDv3)
Expect(err).ToNot(HaveOccurred())
f.ForceBindPvcIfDvIsWaitForFirstConsumer(targetDv3)
By("Cloning #NumOfClones times in parallel")
for i := 1; i <= NumOfClones; i++ {
By("Cloning from the source DataVolume to target" + strconv.Itoa(i))
targetDv := utils.NewDataVolumeForImageCloning("target-dv"+strconv.Itoa(i), "200Mi", f.Namespace.Name, sourceDv.Name, sourceDv.Spec.PVC.StorageClassName, sourceDv.Spec.PVC.VolumeMode)
targetDv, err = utils.CreateDataVolumeFromDefinition(f.CdiClient, f.Namespace.Name, targetDv)
Expect(err).ToNot(HaveOccurred())
f.ForceBindPvcIfDvIsWaitForFirstConsumer(targetDv)
targetDvs = append(targetDvs, targetDv)
}

dvs := []*cdiv1.DataVolume{targetDv1, targetDv2, targetDv3}
for _, dv := range dvs {
for _, dv := range targetDvs {
By("Waiting for clone to be completed")
err = utils.WaitForDataVolumePhaseWithTimeout(f, f.Namespace.Name, cdiv1.Succeeded, dv.Name, 3*90*time.Second)
Expect(err).ToNot(HaveOccurred())
}

for _, dv := range dvs {
for _, dv := range targetDvs {
By("Verifying MD5 sum matches")
Expect(f.VerifyTargetPVCContentMD5(f.Namespace, utils.PersistentVolumeClaimFromDataVolume(dv), testBaseDir, md5sum[:32])).To(BeTrue())
By("Deleting verifier pod")
Expand Down Expand Up @@ -1687,6 +1698,7 @@ var _ = Describe("all clone tests", func() {
By("Waiting for clone to be completed")
err = utils.WaitForDataVolumePhaseWithTimeout(f, f.Namespace.Name, cdiv1.Succeeded, dv.Name, 3*90*time.Second)
Expect(err).ToNot(HaveOccurred())

matchFile := filepath.Join(testBaseDir, "disk.img")
Expect(f.VerifyTargetPVCContentMD5(f.Namespace, utils.PersistentVolumeClaimFromDataVolume(dv), matchFile, md5sum[:32])).To(BeTrue())
err = f.K8sClient.CoreV1().Pods(f.Namespace.Name).Delete(context.TODO(), utils.VerifierPodName, metav1.DeleteOptions{})
Expand Down

0 comments on commit 163f77a

Please sign in to comment.