Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Set pod affinity for host assisted clone #2647

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 30 additions & 4 deletions pkg/controller/clone-controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ const (

cloneSourcePodFinalizer = "cdi.kubevirt.io/cloneSource"

hostAssistedCloneSource = "cdi.kubevirt.io/hostAssistedSourcePodCloneSource"

uploadClientCertDuration = 365 * 24 * time.Hour
)

Expand Down Expand Up @@ -477,7 +479,7 @@ func (r *CloneReconciler) cleanup(pvc *corev1.PersistentVolumeClaim, log logr.Lo

// CreateCloneSourcePod creates our cloning src pod which will be used for out of band cloning to read the contents of the src PVC
func (r *CloneReconciler) CreateCloneSourcePod(image, pullPolicy string, pvc *corev1.PersistentVolumeClaim, log logr.Logger) (*corev1.Pod, error) {
exists, sourcePvcNamespace, sourcePvcName := ParseCloneRequestAnnotation(pvc)
exists, _, _ := ParseCloneRequestAnnotation(pvc)
if !exists {
return nil, errors.Errorf("bad CloneRequest Annotation")
}
Expand Down Expand Up @@ -519,7 +521,7 @@ func (r *CloneReconciler) CreateCloneSourcePod(image, pullPolicy string, pvc *co
sourceVolumeMode = corev1.PersistentVolumeFilesystem
}

pod := MakeCloneSourcePodSpec(sourceVolumeMode, image, pullPolicy, imagePullSecrets, sourcePvcName, sourcePvcNamespace, ownerKey, serverCABundle, pvc, podResourceRequirements, workloadNodePlacement)
pod := MakeCloneSourcePodSpec(sourceVolumeMode, image, pullPolicy, ownerKey, imagePullSecrets, serverCABundle, pvc, sourcePvc, podResourceRequirements, workloadNodePlacement)
util.SetRecommendedLabels(pod, r.installerLabels, "cdi-controller")

if err := r.client.Create(context.TODO(), pod); err != nil {
Expand All @@ -532,10 +534,14 @@ func (r *CloneReconciler) CreateCloneSourcePod(image, pullPolicy string, pvc *co
}

// MakeCloneSourcePodSpec creates and returns the clone source pod spec based on the target pvc.
func MakeCloneSourcePodSpec(sourceVolumeMode corev1.PersistentVolumeMode, image, pullPolicy string, imagePullSecrets []corev1.LocalObjectReference, sourcePvcName, sourcePvcNamespace, ownerRefAnno string,
serverCACert []byte, targetPvc *corev1.PersistentVolumeClaim, resourceRequirements *corev1.ResourceRequirements,
func MakeCloneSourcePodSpec(sourceVolumeMode corev1.PersistentVolumeMode, image, pullPolicy, ownerRefAnno string, imagePullSecrets []corev1.LocalObjectReference,
serverCACert []byte, targetPvc, sourcePvc *corev1.PersistentVolumeClaim, resourceRequirements *corev1.ResourceRequirements,
workloadNodePlacement *sdkapi.NodePlacement) *corev1.Pod {

sourcePvcName := sourcePvc.GetName()
sourcePvcNamespace := sourcePvc.GetNamespace()
awels marked this conversation as resolved.
Show resolved Hide resolved
sourcePvcUID := string(sourcePvc.GetUID())

var ownerID string
cloneSourcePodName := targetPvc.Annotations[AnnCloneSourcePod]
url := GetUploadServerURL(targetPvc.Namespace, targetPvc.Name, common.UploadPathSync)
Expand Down Expand Up @@ -565,6 +571,7 @@ func MakeCloneSourcePodSpec(sourceVolumeMode corev1.PersistentVolumeMode, image,
// this label is used when searching for a pvc's cloner source pod.
cc.CloneUniqueID: cloneSourcePodName,
common.PrometheusLabelKey: common.PrometheusLabelValue,
hostAssistedCloneSource: sourcePvcUID,
},
},
Spec: corev1.PodSpec{
Expand Down Expand Up @@ -650,6 +657,25 @@ func MakeCloneSourcePodSpec(sourceVolumeMode corev1.PersistentVolumeMode, image,
pod.Spec.Affinity.PodAffinity = &corev1.PodAffinity{}
}

if len(sourcePvc.Spec.AccessModes) == 1 && sourcePvc.Spec.AccessModes[0] == corev1.ReadWriteOnce {
pod.Spec.Affinity.PodAffinity.RequiredDuringSchedulingIgnoredDuringExecution = append(
pod.Spec.Affinity.PodAffinity.RequiredDuringSchedulingIgnoredDuringExecution,
corev1.PodAffinityTerm{
LabelSelector: &metav1.LabelSelector{
MatchExpressions: []metav1.LabelSelectorRequirement{
{
Key: hostAssistedCloneSource,
Operator: metav1.LabelSelectorOpIn,
Values: []string{sourcePvcUID},
},
},
},
Namespaces: []string{sourcePvcNamespace},
TopologyKey: corev1.LabelHostname,
},
)
}

pod.Spec.Affinity.PodAffinity.PreferredDuringSchedulingIgnoredDuringExecution = append(
pod.Spec.Affinity.PodAffinity.PreferredDuringSchedulingIgnoredDuringExecution,
corev1.WeightedPodAffinityTerm{
Expand Down
108 changes: 60 additions & 48 deletions tests/cloner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (

cdiv1 "kubevirt.io/containerized-data-importer-api/pkg/apis/core/v1beta1"
"kubevirt.io/containerized-data-importer/pkg/common"
ctrl "kubevirt.io/containerized-data-importer/pkg/controller"
controller "kubevirt.io/containerized-data-importer/pkg/controller/common"
dvc "kubevirt.io/containerized-data-importer/pkg/controller/datavolume"
"kubevirt.io/containerized-data-importer/pkg/token"
Expand All @@ -50,6 +51,7 @@ const (
verifyPodDeletedTimeout = 270 * time.Second
controllerSkipPVCCompleteTimeout = 270 * time.Second
crossVolumeModeCloneMD5NumBytes = 100000
hostAssistedCloneSource = "cdi.kubevirt.io/hostAssistedSourcePodCloneSource"
)

var _ = Describe("all clone tests", func() {
Expand Down Expand Up @@ -95,6 +97,7 @@ var _ = Describe("all clone tests", func() {
}

})

It("[test_id:6693]Should clone imported data and retain transfer pods after completion", func() {
smartApplicable := f.IsSnapshotStorageClassAvailable()
sc, err := f.K8sClient.StorageV1().StorageClasses().Get(context.TODO(), f.SnapshotSCName, metav1.GetOptions{})
Expand Down Expand Up @@ -127,6 +130,8 @@ var _ = Describe("all clone tests", func() {
cloner, err := utils.FindPodBySuffixOnce(f.K8sClient, targetDataVolume.Namespace, common.ClonerSourcePodNameSuffix, common.CDILabelSelector)
Expect(err).ToNot(HaveOccurred())
Expect(cloner.DeletionTimestamp).To(BeNil())
// The Pod should be associated with the host-assisted clone source
Expect(cloner.GetLabels()[hostAssistedCloneSource]).To(Equal(string(pvc.GetUID())))

By("Find upload pod after completion")
uploader, err := utils.FindPodByPrefixOnce(f.K8sClient, targetDataVolume.Namespace, "cdi-upload-", common.CDILabelSelector)
Expand Down Expand Up @@ -944,21 +949,25 @@ var _ = Describe("all clone tests", func() {
tinyCoreIsoURL := func() string { return fmt.Sprintf(utils.TinyCoreIsoURL, f.CdiInstallNs) }

var (
sourceDv, targetDv1, targetDv2, targetDv3 *cdiv1.DataVolume
err error
sourceDv *cdiv1.DataVolume
targetDvs []*cdiv1.DataVolume
err error
)

AfterEach(func() {
dvs := []*cdiv1.DataVolume{sourceDv, targetDv1, targetDv2, targetDv3}
for _, dv := range dvs {
targetDvs = append(targetDvs, sourceDv)
for _, dv := range targetDvs {
cleanDv(f, dv)
if dv != nil && dv.Status.Phase == cdiv1.Succeeded {
validateCloneType(f, dv)
}
}
targetDvs = nil
})

It("[rfe_id:1277][test_id:1899][crit:High][vendor:cnv-qe@redhat.com][level:component] Should allow multiple cloning operations in parallel", func() {
const NumOfClones int = 3

By("Creating a source from a real image")
sourceDv = utils.NewDataVolumeWithHTTPImport("source-dv", "200Mi", tinyCoreIsoURL())
sourceDv, err = utils.CreateDataVolumeFromDefinition(f.CdiClient, f.Namespace.Name, sourceDv)
Expand All @@ -978,60 +987,71 @@ var _ = Describe("all clone tests", func() {
_, err = utils.WaitPodDeleted(f.K8sClient, "execute-command", f.Namespace.Name, verifyPodDeletedTimeout)
Expect(err).ToNot(HaveOccurred())

// By not waiting for completion, we will start 3 transfers in parallell
By("Cloning from the source DataVolume to target1")
targetDv1 = utils.NewDataVolumeForImageCloning("target-dv1", "200Mi", f.Namespace.Name, sourceDv.Name, sourceDv.Spec.PVC.StorageClassName, sourceDv.Spec.PVC.VolumeMode)
targetDv1.Annotations[controller.AnnPodRetainAfterCompletion] = "true"
targetDv1, err = utils.CreateDataVolumeFromDefinition(f.CdiClient, f.Namespace.Name, targetDv1)
Expect(err).ToNot(HaveOccurred())

By("Cloning from the source DataVolume to target2 in parallel")
targetDv2 = utils.NewDataVolumeForImageCloning("target-dv2", "200Mi", f.Namespace.Name, sourceDv.Name, sourceDv.Spec.PVC.StorageClassName, sourceDv.Spec.PVC.VolumeMode)
targetDv2.Annotations[controller.AnnPodRetainAfterCompletion] = "true"
targetDv2, err = utils.CreateDataVolumeFromDefinition(f.CdiClient, f.Namespace.Name, targetDv2)
Expect(err).ToNot(HaveOccurred())

By("Cloning from the source DataVolume to target3 in parallel")
targetDv3 = utils.NewDataVolumeForImageCloning("target-dv3", "200Mi", f.Namespace.Name, sourceDv.Name, sourceDv.Spec.PVC.StorageClassName, sourceDv.Spec.PVC.VolumeMode)
targetDv3.Annotations[controller.AnnPodRetainAfterCompletion] = "true"
targetDv3, err = utils.CreateDataVolumeFromDefinition(f.CdiClient, f.Namespace.Name, targetDv3)
Expect(err).ToNot(HaveOccurred())
// By not waiting for completion, we will start 3 transfers in parallel
By("Cloning #NumOfClones times in parallel")
for i := 1; i <= NumOfClones; i++ {
By("Cloning from the source DataVolume to target" + strconv.Itoa(i))
targetDv := utils.NewDataVolumeForImageCloning("target-dv"+strconv.Itoa(i), "200Mi", f.Namespace.Name, sourceDv.Name, sourceDv.Spec.PVC.StorageClassName, sourceDv.Spec.PVC.VolumeMode)
targetDv.Annotations[controller.AnnPodRetainAfterCompletion] = "true"
targetDv, err = utils.CreateDataVolumeFromDefinition(f.CdiClient, f.Namespace.Name, targetDv)
Expect(err).ToNot(HaveOccurred())
f.ForceBindPvcIfDvIsWaitForFirstConsumer(targetDv)
targetDvs = append(targetDvs, targetDv)
}

dvs := []*cdiv1.DataVolume{targetDv1, targetDv2, targetDv3}
for _, dv := range dvs {
f.ForceBindPvcIfDvIsWaitForFirstConsumer(dv)
podsNodeName := make(map[string]bool)
for _, dv := range targetDvs {
By("Waiting for clone to be completed")
err = utils.WaitForDataVolumePhaseWithTimeout(f, f.Namespace.Name, cdiv1.Succeeded, dv.Name, 3*90*time.Second)
Expect(err).ToNot(HaveOccurred())
}

if cloneType == "network" {
// Make sure we don't have high number of restart counts on source pods
for _, dv := range dvs {
for _, dv := range targetDvs {
pvc, err := f.K8sClient.CoreV1().PersistentVolumeClaims(dv.Namespace).Get(context.TODO(), dv.Name, metav1.GetOptions{})
Expect(err).ToNot(HaveOccurred())
clonerPodName := controller.CreateCloneSourcePodName(pvc)
cloner, err := f.K8sClient.CoreV1().Pods(dv.Namespace).Get(context.TODO(), clonerPodName, metav1.GetOptions{})
Expect(err).ToNot(HaveOccurred())
restartCount := cloner.Status.ContainerStatuses[0].RestartCount
fmt.Fprintf(GinkgoWriter, "INFO: restart count on clone source pod %s: %d\n", clonerPodName, restartCount)
Expect(restartCount).To(BeNumerically("<", 2))
// TODO remove the comment when the issue in #2550 is fixed
// Expect(restartCount).To(BeNumerically("<", 2))
}
}

for _, dv := range dvs {
for _, dv := range targetDvs {
By("Verifying MD5 sum matches")
matchFile := filepath.Join(testBaseDir, "disk.img")
Expect(f.VerifyTargetPVCContentMD5(f.Namespace, utils.PersistentVolumeClaimFromDataVolume(dv), matchFile, md5sum[:32])).To(BeTrue())

pvc, err := f.K8sClient.CoreV1().PersistentVolumeClaims(dv.Namespace).Get(context.TODO(), dv.Name, metav1.GetOptions{})
Expect(err).ToNot(HaveOccurred())

if cloneSourcePod := pvc.Annotations[ctrl.AnnCloneSourcePod]; cloneSourcePod != "" {
By(fmt.Sprintf("Getting pod %s/%s", dv.Namespace, cloneSourcePod))
pod, err := f.K8sClient.CoreV1().Pods(dv.Namespace).Get(context.TODO(), cloneSourcePod, metav1.GetOptions{})
Expect(err).ToNot(HaveOccurred())
podsNodeName[pod.Spec.NodeName] = true
}

By("Deleting verifier pod")
err = f.K8sClient.CoreV1().Pods(f.Namespace.Name).Delete(context.TODO(), utils.VerifierPodName, metav1.DeleteOptions{})
Expect(err).ToNot(HaveOccurred())
_, err = utils.WaitPodDeleted(f.K8sClient, utils.VerifierPodName, f.Namespace.Name, verifyPodDeletedTimeout)
Expect(err).ToNot(HaveOccurred())
}

// All pods should be in the same node except when the map is empty in smart clone
Copy link
Collaborator

@akalenyu akalenyu Apr 17, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should be able to use cloneType in this context to tell you if this is going to be smart clone or not
For example cloneType == "network" means host assisted

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I changed the condition to if cloneType == "network"

if cloneType == "network" {
Expect(podsNodeName).To(HaveLen(1))
}
})

It("[rfe_id:1277][test_id:1899][crit:High][vendor:cnv-qe@redhat.com][level:component] Should allow multiple cloning operations in parallel for block devices", func() {
const NumOfClones int = 3

if !f.IsBlockVolumeStorageClassAvailable() {
Skip("Storage Class for block volume is not available")
}
Expand All @@ -1050,32 +1070,23 @@ var _ = Describe("all clone tests", func() {
fmt.Fprintf(GinkgoWriter, "INFO: MD5SUM for source is: %s\n", md5sum[:32])

// By not waiting for completion, we will start 3 transfers in parallell
By("Cloning from the source DataVolume to target1")
targetDv1 = utils.NewDataVolumeForImageCloning("target-dv1", "200Mi", f.Namespace.Name, sourceDv.Name, sourceDv.Spec.PVC.StorageClassName, sourceDv.Spec.PVC.VolumeMode)
targetDv1, err = utils.CreateDataVolumeFromDefinition(f.CdiClient, f.Namespace.Name, targetDv1)
Expect(err).ToNot(HaveOccurred())
f.ForceBindPvcIfDvIsWaitForFirstConsumer(targetDv1)

By("Cloning from the source DataVolume to target2 in parallel")
targetDv2 = utils.NewDataVolumeForImageCloning("target-dv2", "200Mi", f.Namespace.Name, sourceDv.Name, sourceDv.Spec.PVC.StorageClassName, sourceDv.Spec.PVC.VolumeMode)
targetDv2, err = utils.CreateDataVolumeFromDefinition(f.CdiClient, f.Namespace.Name, targetDv2)
Expect(err).ToNot(HaveOccurred())
f.ForceBindPvcIfDvIsWaitForFirstConsumer(targetDv2)

By("Cloning from the source DataVolume to target3 in parallel")
targetDv3 = utils.NewDataVolumeForImageCloning("target-dv3", "200Mi", f.Namespace.Name, sourceDv.Name, sourceDv.Spec.PVC.StorageClassName, sourceDv.Spec.PVC.VolumeMode)
targetDv3, err = utils.CreateDataVolumeFromDefinition(f.CdiClient, f.Namespace.Name, targetDv3)
Expect(err).ToNot(HaveOccurred())
f.ForceBindPvcIfDvIsWaitForFirstConsumer(targetDv3)
By("Cloning #NumOfClones times in parallel")
for i := 1; i <= NumOfClones; i++ {
By("Cloning from the source DataVolume to target" + strconv.Itoa(i))
targetDv := utils.NewDataVolumeForImageCloning("target-dv"+strconv.Itoa(i), "200Mi", f.Namespace.Name, sourceDv.Name, sourceDv.Spec.PVC.StorageClassName, sourceDv.Spec.PVC.VolumeMode)
targetDv, err = utils.CreateDataVolumeFromDefinition(f.CdiClient, f.Namespace.Name, targetDv)
Expect(err).ToNot(HaveOccurred())
f.ForceBindPvcIfDvIsWaitForFirstConsumer(targetDv)
targetDvs = append(targetDvs, targetDv)
}

dvs := []*cdiv1.DataVolume{targetDv1, targetDv2, targetDv3}
for _, dv := range dvs {
for _, dv := range targetDvs {
By("Waiting for clone to be completed")
err = utils.WaitForDataVolumePhaseWithTimeout(f, f.Namespace.Name, cdiv1.Succeeded, dv.Name, 3*90*time.Second)
Expect(err).ToNot(HaveOccurred())
}

for _, dv := range dvs {
for _, dv := range targetDvs {
By("Verifying MD5 sum matches")
Expect(f.VerifyTargetPVCContentMD5(f.Namespace, utils.PersistentVolumeClaimFromDataVolume(dv), testBaseDir, md5sum[:32])).To(BeTrue())
By("Deleting verifier pod")
Expand Down Expand Up @@ -1673,6 +1684,7 @@ var _ = Describe("all clone tests", func() {
By("Waiting for clone to be completed")
err = utils.WaitForDataVolumePhaseWithTimeout(f, f.Namespace.Name, cdiv1.Succeeded, dv.Name, 3*90*time.Second)
Expect(err).ToNot(HaveOccurred())

matchFile := filepath.Join(testBaseDir, "disk.img")
Expect(f.VerifyTargetPVCContentMD5(f.Namespace, utils.PersistentVolumeClaimFromDataVolume(dv), matchFile, md5sum[:32])).To(BeTrue())
err = f.K8sClient.CoreV1().Pods(f.Namespace.Name).Delete(context.TODO(), utils.VerifierPodName, metav1.DeleteOptions{})
Expand Down