Skip to content

Commit

Permalink
[release-v1.55] Propagate CDI workload NodePlacement to DIC CronJob a…
Browse files Browse the repository at this point in the history
…nd initial Job (#2734)

Manual backport of #2712

Signed-off-by: Arnon Gilboa <agilboa@redhat.com>
  • Loading branch information
arnongilboa committed May 30, 2023
1 parent a47ae61 commit 5455229
Show file tree
Hide file tree
Showing 4 changed files with 195 additions and 111 deletions.
1 change: 0 additions & 1 deletion pkg/controller/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ go_library(
"//vendor/k8s.io/klog/v2:go_default_library",
"//vendor/k8s.io/utils/pointer:go_default_library",
"//vendor/kubevirt.io/controller-lifecycle-operator-sdk/api:go_default_library",
"//vendor/kubevirt.io/controller-lifecycle-operator-sdk/pkg/sdk:go_default_library",
"//vendor/sigs.k8s.io/controller-runtime/pkg/cache:go_default_library",
"//vendor/sigs.k8s.io/controller-runtime/pkg/client:go_default_library",
"//vendor/sigs.k8s.io/controller-runtime/pkg/controller:go_default_library",
Expand Down
116 changes: 55 additions & 61 deletions pkg/controller/dataimportcron-controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,6 @@ import (
"kubevirt.io/containerized-data-importer/pkg/operator"
"kubevirt.io/containerized-data-importer/pkg/util"
"kubevirt.io/containerized-data-importer/pkg/util/naming"

"kubevirt.io/controller-lifecycle-operator-sdk/pkg/sdk"
)

const (
Expand Down Expand Up @@ -106,8 +104,6 @@ const (
AnnLastCronTime = AnnAPIGroup + "/storage.import.lastCronTime"
// AnnLastUseTime is the PVC last use time stamp
AnnLastUseTime = AnnAPIGroup + "/storage.import.lastUseTime"
// AnnLastAppliedConfig is the cron last applied configuration
AnnLastAppliedConfig = AnnAPIGroup + "/lastAppliedConfiguration"

dataImportControllerName = "dataimportcron-controller"
digestPrefix = "sha256:"
Expand Down Expand Up @@ -726,49 +722,54 @@ func addDataImportCronControllerWatches(mgr manager.Manager, c controller.Contro
}

func (r *DataImportCronReconciler) cronJobExistsAndUpdated(ctx context.Context, cron *cdiv1.DataImportCron) (bool, error) {
var cronJob batchv1.CronJob
cronJobNamespacedName := types.NamespacedName{Namespace: r.cdiNamespace, Name: GetCronJobName(cron)}
if err := r.client.Get(ctx, cronJobNamespacedName, &cronJob); err != nil {
cronJob := &batchv1.CronJob{}
cronJobKey := types.NamespacedName{Namespace: r.cdiNamespace, Name: GetCronJobName(cron)}
if err := r.client.Get(ctx, cronJobKey, cronJob); err != nil {
return false, IgnoreNotFound(err)
}
desired, err := r.newCronJob(cron)
if err != nil {
return false, err
}
current, err := sdk.StripStatusFromObject(&cronJob)
if err != nil {
return false, err
}
sdk.MergeLabelsAndAnnotations(desired, current)
merged, err := sdk.MergeObject(desired, current, AnnLastAppliedConfig)
if err != nil {

cronJobCopy := cronJob.DeepCopy()
if err := r.initCronJob(cron, cronJobCopy); err != nil {
return false, err
}
if reflect.DeepEqual(current, merged) {
return true, nil
}
r.log.Info("Updating CronJob", "name", merged.GetName())
if err := r.client.Update(ctx, merged); err != nil {
return false, IgnoreNotFound(err)

if !reflect.DeepEqual(cronJob, cronJobCopy) {
r.log.Info("Updating CronJob", "name", cronJob.GetName())
if err := r.client.Update(ctx, cronJobCopy); err != nil {
return false, IgnoreNotFound(err)
}
}
return true, nil
}

func (r *DataImportCronReconciler) newCronJob(cron *cdiv1.DataImportCron) (*batchv1.CronJob, error) {
cronJob := &batchv1.CronJob{
ObjectMeta: metav1.ObjectMeta{
Name: GetCronJobName(cron),
Namespace: r.cdiNamespace,
},
}
if err := r.initCronJob(cron, cronJob); err != nil {
return nil, err
}
return cronJob, nil
}

func (r *DataImportCronReconciler) initCronJob(cron *cdiv1.DataImportCron, cronJob *batchv1.CronJob) error {
regSource, err := getCronRegistrySource(cron)
if err != nil {
return nil, err
return err
}
if regSource.URL == nil {
return nil, errors.Errorf("No URL source in cron %s", cron.Name)
return errors.Errorf("No URL source in cron %s", cron.Name)
}
cdiConfig := &cdiv1.CDIConfig{}
if err = r.client.Get(context.TODO(), types.NamespacedName{Name: common.ConfigName}, cdiConfig); err != nil {
return nil, err
return err
}
insecureTLS, err := IsInsecureTLS(*regSource.URL, cdiConfig, r.uncachedClient, r.log)
if err != nil {
return nil, err
return err
}
container := corev1.Container{
Name: "cdi-source-update-poller",
Expand All @@ -784,7 +785,7 @@ func (r *DataImportCronReconciler) newCronJob(cron *cdiv1.DataImportCron) (*batc
TerminationMessagePolicy: corev1.TerminationMessageReadFile,
}

volumes := []corev1.Volume{}
var volumes []corev1.Volume
hasCertConfigMap := regSource.CertConfigMap != nil && *regSource.CertConfigMap != ""
if hasCertConfigMap {
vm := corev1.VolumeMount{
Expand Down Expand Up @@ -852,43 +853,36 @@ func (r *DataImportCronReconciler) newCronJob(cron *cdiv1.DataImportCron) (*batc
addEnvVarFromImportProxyConfig(common.ImportProxyHTTPS)
addEnvVarFromImportProxyConfig(common.ImportProxyNoProxy)

cronJobName := GetCronJobName(cron)
cronJob := &batchv1.CronJob{
ObjectMeta: metav1.ObjectMeta{
Name: cronJobName,
Namespace: r.cdiNamespace,
},
Spec: batchv1.CronJobSpec{
Schedule: cron.Spec.Schedule,
ConcurrencyPolicy: batchv1.ForbidConcurrent,
SuccessfulJobsHistoryLimit: pointer.Int32(1),
FailedJobsHistoryLimit: pointer.Int32(1),
JobTemplate: batchv1.JobTemplateSpec{
Spec: batchv1.JobSpec{
Template: corev1.PodTemplateSpec{
Spec: corev1.PodSpec{
RestartPolicy: corev1.RestartPolicyNever,
TerminationGracePeriodSeconds: pointer.Int64(0),
Containers: []corev1.Container{container},
ServiceAccountName: common.CronJobServiceAccountName,
Volumes: volumes,
},
},
BackoffLimit: pointer.Int32(2),
TTLSecondsAfterFinished: pointer.Int32(10),
},
},
},
workloadNodePlacement, err := GetWorkloadNodePlacement(r.client)
if err != nil {
return err
}

cronJobSpec := &cronJob.Spec
cronJobSpec.Schedule = cron.Spec.Schedule
cronJobSpec.ConcurrencyPolicy = batchv1.ForbidConcurrent
cronJobSpec.SuccessfulJobsHistoryLimit = pointer.Int32(1)
cronJobSpec.FailedJobsHistoryLimit = pointer.Int32(1)

jobSpec := &cronJobSpec.JobTemplate.Spec
jobSpec.BackoffLimit = pointer.Int32(2)
jobSpec.TTLSecondsAfterFinished = pointer.Int32(10)

podSpec := &jobSpec.Template.Spec
podSpec.RestartPolicy = corev1.RestartPolicyNever
podSpec.TerminationGracePeriodSeconds = pointer.Int64(0)
podSpec.Containers = []corev1.Container{container}
podSpec.ServiceAccountName = common.CronJobServiceAccountName
podSpec.Volumes = volumes
podSpec.NodeSelector = workloadNodePlacement.NodeSelector
podSpec.Tolerations = workloadNodePlacement.Tolerations
podSpec.Affinity = workloadNodePlacement.Affinity

if err := r.setJobCommon(cron, cronJob); err != nil {
return nil, err
}
if err := sdk.SetLastAppliedConfiguration(cronJob, AnnLastAppliedConfig); err != nil {
return nil, err
return err
}
SetRestrictedSecurityContext(&cronJob.Spec.JobTemplate.Spec.Template.Spec)
return cronJob, nil
return nil
}

func (r *DataImportCronReconciler) newInitialJob(cron *cdiv1.DataImportCron, cronJob *batchv1.CronJob) (*batchv1.Job, error) {
Expand Down
68 changes: 67 additions & 1 deletion pkg/controller/dataimportcron-controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,72 @@ var _ = Describe("All DataImportCron Tests", func() {
verifyCronJobContainerImage("new-image")
})

It("Should update CronJob Pod workload NodePlacement on reconcile", func() {
cron = newDataImportCron(cronName)
reconciler = createDataImportCronReconciler(cron)

_, err := reconciler.Reconcile(context.TODO(), cronReq)
Expect(err).ToNot(HaveOccurred())

cronjob := &batchv1.CronJob{}
err = reconciler.client.Get(context.TODO(), cronJobKey(cron), cronjob)
Expect(err).ToNot(HaveOccurred())
spec := &cronjob.Spec.JobTemplate.Spec.Template.Spec
spec.NodeSelector = map[string]string{"some": "thing"}
spec.Tolerations = []corev1.Toleration{{Key: "another", Value: "value"}}
err = reconciler.client.Update(context.TODO(), cronjob)
Expect(err).ToNot(HaveOccurred())

workloads := updateCdiWithTestNodePlacement(reconciler.client)

_, err = reconciler.Reconcile(context.TODO(), cronReq)
Expect(err).ToNot(HaveOccurred())

err = reconciler.client.Get(context.TODO(), cronJobKey(cron), cronjob)
Expect(err).ToNot(HaveOccurred())
podSpec := cronjob.Spec.JobTemplate.Spec.Template.Spec
Expect(podSpec.Affinity).To(Equal(workloads.Affinity))
Expect(podSpec.NodeSelector).To(Equal(workloads.NodeSelector))
Expect(podSpec.Tolerations).To(Equal(workloads.Tolerations))
})

It("Should set initial Job Pod workload NodePlacement on reconcile", func() {
cron = newDataImportCron(cronName)
reconciler = createDataImportCronReconciler(cron)

workloads := updateCdiWithTestNodePlacement(reconciler.client)

_, err := reconciler.Reconcile(context.TODO(), cronReq)
Expect(err).ToNot(HaveOccurred())

job := &batchv1.Job{}
jobKey := types.NamespacedName{Name: GetInitialJobName(cron), Namespace: reconciler.cdiNamespace}
err = reconciler.client.Get(context.TODO(), jobKey, job)
Expect(err).ToNot(HaveOccurred())
podSpec := job.Spec.Template.Spec
Expect(podSpec.Affinity).To(Equal(workloads.Affinity))
Expect(podSpec.NodeSelector).To(Equal(workloads.NodeSelector))
Expect(podSpec.Tolerations).To(Equal(workloads.Tolerations))
})

It("Should not modify new CronJob on initCronJob", func() {
cron = newDataImportCron(cronName)
reconciler = createDataImportCronReconciler(cron)

_, err := reconciler.Reconcile(context.TODO(), cronReq)
Expect(err).ToNot(HaveOccurred())

cronJob := &batchv1.CronJob{}
err = reconciler.client.Get(context.TODO(), cronJobKey(cron), cronJob)
Expect(err).ToNot(HaveOccurred())

cronJobCopy := cronJob.DeepCopy()
err = reconciler.initCronJob(cron, cronJobCopy)
Expect(err).ToNot(HaveOccurred())

Expect(cronJob).To(Equal(cronJobCopy))
})

It("Should create DataVolume on AnnSourceDesiredDigest annotation update, and update DataImportCron and DataSource on DataVolume Succeeded", func() {
cron = newDataImportCron(cronName)
dataSource = nil
Expand Down Expand Up @@ -642,7 +708,7 @@ func createDataImportCronReconciler(objects ...runtime.Object) *DataImportCronRe

func createDataImportCronReconcilerWithoutConfig(objects ...runtime.Object) *DataImportCronReconciler {
crd := &extv1.CustomResourceDefinition{ObjectMeta: metav1.ObjectMeta{Name: "dataimportcrons.cdi.kubevirt.io"}}
objs := []runtime.Object{crd}
objs := []runtime.Object{crd, MakeEmptyCDICR()}
objs = append(objs, objects...)

s := scheme.Scheme
Expand Down
Loading

0 comments on commit 5455229

Please sign in to comment.