Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Propagate CDI workload NodePlacement to DIC CronJob and initial Job #2712

Merged
merged 1 commit into from
May 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion pkg/controller/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ go_library(
"//vendor/k8s.io/klog/v2:go_default_library",
"//vendor/k8s.io/utils/pointer:go_default_library",
"//vendor/kubevirt.io/controller-lifecycle-operator-sdk/api:go_default_library",
"//vendor/kubevirt.io/controller-lifecycle-operator-sdk/pkg/sdk:go_default_library",
"//vendor/sigs.k8s.io/controller-runtime/pkg/client:go_default_library",
"//vendor/sigs.k8s.io/controller-runtime/pkg/controller:go_default_library",
"//vendor/sigs.k8s.io/controller-runtime/pkg/event:go_default_library",
Expand Down
121 changes: 57 additions & 64 deletions pkg/controller/dataimportcron-controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,15 +53,12 @@ import (

cdiv1 "kubevirt.io/containerized-data-importer-api/pkg/apis/core/v1beta1"
"kubevirt.io/containerized-data-importer/pkg/common"

cc "kubevirt.io/containerized-data-importer/pkg/controller/common"
cdv "kubevirt.io/containerized-data-importer/pkg/controller/datavolume"
"kubevirt.io/containerized-data-importer/pkg/monitoring"
"kubevirt.io/containerized-data-importer/pkg/operator"
"kubevirt.io/containerized-data-importer/pkg/util"
"kubevirt.io/containerized-data-importer/pkg/util/naming"

"kubevirt.io/controller-lifecycle-operator-sdk/pkg/sdk"
)

const (
Expand Down Expand Up @@ -109,8 +106,6 @@ const (
AnnLastCronTime = cc.AnnAPIGroup + "/storage.import.lastCronTime"
// AnnLastUseTime is the PVC last use time stamp
AnnLastUseTime = cc.AnnAPIGroup + "/storage.import.lastUseTime"
// AnnLastAppliedConfig is the cron last applied configuration
AnnLastAppliedConfig = cc.AnnAPIGroup + "/lastAppliedConfiguration"

dataImportControllerName = "dataimportcron-controller"
digestPrefix = "sha256:"
Expand Down Expand Up @@ -734,49 +729,54 @@ func addDataImportCronControllerWatches(mgr manager.Manager, c controller.Contro
}

func (r *DataImportCronReconciler) cronJobExistsAndUpdated(ctx context.Context, cron *cdiv1.DataImportCron) (bool, error) {
var cronJob batchv1.CronJob
cronJobNamespacedName := types.NamespacedName{Namespace: r.cdiNamespace, Name: GetCronJobName(cron)}
if err := r.client.Get(ctx, cronJobNamespacedName, &cronJob); err != nil {
cronJob := &batchv1.CronJob{}
cronJobKey := types.NamespacedName{Namespace: r.cdiNamespace, Name: GetCronJobName(cron)}
if err := r.client.Get(ctx, cronJobKey, cronJob); err != nil {
return false, cc.IgnoreNotFound(err)
}
desired, err := r.newCronJob(cron)
if err != nil {
return false, err
}
current, err := sdk.StripStatusFromObject(&cronJob)
if err != nil {
return false, err
}
sdk.MergeLabelsAndAnnotations(desired, current)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see you got rid of all the SDK merging utils - are we ok with that?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the concern is that the deepequal will be false most of the time due to status changes/omitted spec fields

Copy link
Collaborator Author

@arnongilboa arnongilboa May 17, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need for the SDK merging utils. Regarding the DeepEqual you were right. Fixed that.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it make sense to DeepEqual the spec instead of the entire object? Or are the annotations/labels we are interested in?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We DeepEqual the whole object as we are intersted also in annotations/labels which may update on new version. Our CronJobs are not reconciled that often, so it won't cost too much to be on the safe side.

merged, err := sdk.MergeObject(desired, current, AnnLastAppliedConfig)
if err != nil {

cronJobCopy := cronJob.DeepCopy()
if err := r.initCronJob(cron, cronJobCopy); err != nil {
return false, err
}
if reflect.DeepEqual(current, merged) {
return true, nil
}
r.log.Info("Updating CronJob", "name", merged.GetName())
if err := r.client.Update(ctx, merged); err != nil {
return false, cc.IgnoreNotFound(err)

if !reflect.DeepEqual(cronJob, cronJobCopy) {
r.log.Info("Updating CronJob", "name", cronJob.GetName())
if err := r.client.Update(ctx, cronJobCopy); err != nil {
return false, cc.IgnoreNotFound(err)
}
}
return true, nil
}

func (r *DataImportCronReconciler) newCronJob(cron *cdiv1.DataImportCron) (*batchv1.CronJob, error) {
cronJob := &batchv1.CronJob{
ObjectMeta: metav1.ObjectMeta{
Name: GetCronJobName(cron),
Namespace: r.cdiNamespace,
},
}
if err := r.initCronJob(cron, cronJob); err != nil {
return nil, err
}
return cronJob, nil
}

func (r *DataImportCronReconciler) initCronJob(cron *cdiv1.DataImportCron, cronJob *batchv1.CronJob) error {
regSource, err := getCronRegistrySource(cron)
if err != nil {
return nil, err
return err
}
if regSource.URL == nil {
return nil, errors.Errorf("No URL source in cron %s", cron.Name)
return errors.Errorf("No URL source in cron %s", cron.Name)
}
cdiConfig := &cdiv1.CDIConfig{}
if err = r.client.Get(context.TODO(), types.NamespacedName{Name: common.ConfigName}, cdiConfig); err != nil {
return nil, err
return err
}
insecureTLS, err := IsInsecureTLS(*regSource.URL, cdiConfig, r.uncachedClient, r.log)
if err != nil {
return nil, err
return err
}
container := corev1.Container{
Name: "cdi-source-update-poller",
Expand All @@ -792,7 +792,7 @@ func (r *DataImportCronReconciler) newCronJob(cron *cdiv1.DataImportCron) (*batc
TerminationMessagePolicy: corev1.TerminationMessageReadFile,
}

volumes := []corev1.Volume{}
var volumes []corev1.Volume
hasCertConfigMap := regSource.CertConfigMap != nil && *regSource.CertConfigMap != ""
if hasCertConfigMap {
vm := corev1.VolumeMount{
Expand Down Expand Up @@ -862,46 +862,39 @@ func (r *DataImportCronReconciler) newCronJob(cron *cdiv1.DataImportCron) (*batc

imagePullSecrets, err := cc.GetImagePullSecrets(r.client)
if err != nil {
return nil, err
return err
}
cronJobName := GetCronJobName(cron)
cronJob := &batchv1.CronJob{
ObjectMeta: metav1.ObjectMeta{
Name: cronJobName,
Namespace: r.cdiNamespace,
},
Spec: batchv1.CronJobSpec{
Schedule: cron.Spec.Schedule,
ConcurrencyPolicy: batchv1.ForbidConcurrent,
SuccessfulJobsHistoryLimit: pointer.Int32(1),
FailedJobsHistoryLimit: pointer.Int32(1),
JobTemplate: batchv1.JobTemplateSpec{
Spec: batchv1.JobSpec{
Template: corev1.PodTemplateSpec{
Spec: corev1.PodSpec{
RestartPolicy: corev1.RestartPolicyNever,
TerminationGracePeriodSeconds: pointer.Int64(0),
Containers: []corev1.Container{container},
ServiceAccountName: common.CronJobServiceAccountName,
Volumes: volumes,
ImagePullSecrets: imagePullSecrets,
},
},
BackoffLimit: pointer.Int32(2),
TTLSecondsAfterFinished: pointer.Int32(10),
},
},
},
workloadNodePlacement, err := cc.GetWorkloadNodePlacement(r.client)
if err != nil {
return err
}

cronJobSpec := &cronJob.Spec
cronJobSpec.Schedule = cron.Spec.Schedule
cronJobSpec.ConcurrencyPolicy = batchv1.ForbidConcurrent
cronJobSpec.SuccessfulJobsHistoryLimit = pointer.Int32(1)
cronJobSpec.FailedJobsHistoryLimit = pointer.Int32(1)

jobSpec := &cronJobSpec.JobTemplate.Spec
jobSpec.BackoffLimit = pointer.Int32(2)
jobSpec.TTLSecondsAfterFinished = pointer.Int32(10)

podSpec := &jobSpec.Template.Spec
podSpec.RestartPolicy = corev1.RestartPolicyNever
podSpec.TerminationGracePeriodSeconds = pointer.Int64(0)
podSpec.Containers = []corev1.Container{container}
podSpec.ServiceAccountName = common.CronJobServiceAccountName
podSpec.Volumes = volumes
podSpec.ImagePullSecrets = imagePullSecrets
podSpec.NodeSelector = workloadNodePlacement.NodeSelector
podSpec.Tolerations = workloadNodePlacement.Tolerations
podSpec.Affinity = workloadNodePlacement.Affinity

if err := r.setJobCommon(cron, cronJob); err != nil {
return nil, err
}
if err := sdk.SetLastAppliedConfiguration(cronJob, AnnLastAppliedConfig); err != nil {
return nil, err
return err
}
cc.SetRestrictedSecurityContext(&cronJob.Spec.JobTemplate.Spec.Template.Spec)
return cronJob, nil
return nil
}

func (r *DataImportCronReconciler) newInitialJob(cron *cdiv1.DataImportCron, cronJob *batchv1.CronJob) (*batchv1.Job, error) {
Expand Down
68 changes: 67 additions & 1 deletion pkg/controller/dataimportcron-controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,72 @@ var _ = Describe("All DataImportCron Tests", func() {
verifyCronJobContainerImage("new-image")
})

It("Should update CronJob Pod workload NodePlacement on reconcile", func() {
cron = newDataImportCron(cronName)
reconciler = createDataImportCronReconciler(cron)

_, err := reconciler.Reconcile(context.TODO(), cronReq)
Expect(err).ToNot(HaveOccurred())

cronjob := &batchv1.CronJob{}
err = reconciler.client.Get(context.TODO(), cronJobKey(cron), cronjob)
Expect(err).ToNot(HaveOccurred())
spec := &cronjob.Spec.JobTemplate.Spec.Template.Spec
spec.NodeSelector = map[string]string{"some": "thing"}
spec.Tolerations = []corev1.Toleration{{Key: "another", Value: "value"}}
err = reconciler.client.Update(context.TODO(), cronjob)
Expect(err).ToNot(HaveOccurred())

workloads := updateCdiWithTestNodePlacement(reconciler.client)

_, err = reconciler.Reconcile(context.TODO(), cronReq)
Expect(err).ToNot(HaveOccurred())

err = reconciler.client.Get(context.TODO(), cronJobKey(cron), cronjob)
Expect(err).ToNot(HaveOccurred())
podSpec := cronjob.Spec.JobTemplate.Spec.Template.Spec
Expect(podSpec.Affinity).To(Equal(workloads.Affinity))
Expect(podSpec.NodeSelector).To(Equal(workloads.NodeSelector))
Expect(podSpec.Tolerations).To(Equal(workloads.Tolerations))
})

It("Should set initial Job Pod workload NodePlacement on reconcile", func() {
cron = newDataImportCron(cronName)
reconciler = createDataImportCronReconciler(cron)

workloads := updateCdiWithTestNodePlacement(reconciler.client)

_, err := reconciler.Reconcile(context.TODO(), cronReq)
Expect(err).ToNot(HaveOccurred())

job := &batchv1.Job{}
jobKey := types.NamespacedName{Name: GetInitialJobName(cron), Namespace: reconciler.cdiNamespace}
err = reconciler.client.Get(context.TODO(), jobKey, job)
Expect(err).ToNot(HaveOccurred())
podSpec := job.Spec.Template.Spec
Expect(podSpec.Affinity).To(Equal(workloads.Affinity))
Expect(podSpec.NodeSelector).To(Equal(workloads.NodeSelector))
Expect(podSpec.Tolerations).To(Equal(workloads.Tolerations))
})

It("Should not modify new CronJob on initCronJob", func() {
cron = newDataImportCron(cronName)
reconciler = createDataImportCronReconciler(cron)

_, err := reconciler.Reconcile(context.TODO(), cronReq)
Expect(err).ToNot(HaveOccurred())

cronJob := &batchv1.CronJob{}
err = reconciler.client.Get(context.TODO(), cronJobKey(cron), cronJob)
Expect(err).ToNot(HaveOccurred())

cronJobCopy := cronJob.DeepCopy()
err = reconciler.initCronJob(cron, cronJobCopy)
Expect(err).ToNot(HaveOccurred())

Expect(cronJob).To(Equal(cronJobCopy))
})

It("Should create DataVolume on AnnSourceDesiredDigest annotation update, and update DataImportCron and DataSource on DataVolume Succeeded", func() {
cron = newDataImportCron(cronName)
dataSource = nil
Expand Down Expand Up @@ -690,7 +756,7 @@ func createDataImportCronReconciler(objects ...runtime.Object) *DataImportCronRe

func createDataImportCronReconcilerWithoutConfig(objects ...runtime.Object) *DataImportCronReconciler {
crd := &extv1.CustomResourceDefinition{ObjectMeta: metav1.ObjectMeta{Name: "dataimportcrons.cdi.kubevirt.io"}}
objs := []runtime.Object{crd}
objs := []runtime.Object{crd, cc.MakeEmptyCDICR()}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe unit test that existing cronjob (carry over from upgrade etc) gets reconciled with placement values?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added

objs = append(objs, objects...)

s := scheme.Scheme
Expand Down
82 changes: 41 additions & 41 deletions pkg/controller/import-controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,27 +22,25 @@ import (
"strconv"
"strings"

"sigs.k8s.io/controller-runtime/pkg/client"

cc "kubevirt.io/containerized-data-importer/pkg/controller/common"
featuregates "kubevirt.io/containerized-data-importer/pkg/feature-gates"
"kubevirt.io/containerized-data-importer/pkg/util/naming"
sdkapi "kubevirt.io/controller-lifecycle-operator-sdk/api"

"k8s.io/apimachinery/pkg/runtime"
bootstrapapi "k8s.io/cluster-bootstrap/token/api"

. "github.com/onsi/ginkgo"
"github.com/onsi/ginkgo/extensions/table"
. "github.com/onsi/gomega"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/fake"
logf "sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

cdiv1 "kubevirt.io/containerized-data-importer-api/pkg/apis/core/v1beta1"
"kubevirt.io/containerized-data-importer/pkg/common"

//cc "kubevirt.io/containerized-data-importer/pkg/controller/common"

corev1 "k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
Expand Down Expand Up @@ -271,49 +269,17 @@ var _ = Describe("ImportConfig Controller reconcile loop", func() {
pvc.Status.Phase = v1.ClaimBound

reconciler = createImportReconciler(pvc)
workloads := updateCdiWithTestNodePlacement(reconciler.client)

cr := &cdiv1.CDI{}
err := reconciler.client.Get(context.TODO(), types.NamespacedName{Name: "cdi"}, cr)
Expect(err).ToNot(HaveOccurred())

dummyNodeSelector := map[string]string{"kubernetes.io/arch": "amd64"}
dummyTolerations := []v1.Toleration{{Key: "test", Value: "123"}}
dummyAffinity := &v1.Affinity{
NodeAffinity: &v1.NodeAffinity{
RequiredDuringSchedulingIgnoredDuringExecution: &v1.NodeSelector{
NodeSelectorTerms: []v1.NodeSelectorTerm{
{
MatchExpressions: []v1.NodeSelectorRequirement{
{Key: "kubernetes.io/hostname", Operator: v1.NodeSelectorOpIn, Values: []string{"node01"}},
},
},
},
},
},
}
cr.Spec.Workloads.NodeSelector = dummyNodeSelector
cr.Spec.Workloads.Affinity = dummyAffinity
cr.Spec.Workloads.Tolerations = dummyTolerations

err = reconciler.client.Update(context.TODO(), cr)
Expect(err).ToNot(HaveOccurred())

placement, err := cc.GetWorkloadNodePlacement(reconciler.client)
Expect(err).ToNot(HaveOccurred())

Expect(placement.Affinity).To(Equal(dummyAffinity))
Expect(placement.NodeSelector).To(Equal(dummyNodeSelector))
Expect(placement.Tolerations).To(Equal(dummyTolerations))

_, err = reconciler.Reconcile(context.TODO(), reconcile.Request{NamespacedName: types.NamespacedName{Name: "testPvc1", Namespace: "default"}})
_, err := reconciler.Reconcile(context.TODO(), reconcile.Request{NamespacedName: types.NamespacedName{Name: "testPvc1", Namespace: "default"}})
Expect(err).ToNot(HaveOccurred())
pod := &corev1.Pod{}
err = reconciler.client.Get(context.TODO(), types.NamespacedName{Name: "importer-testPvc1", Namespace: "default"}, pod)
Expect(err).ToNot(HaveOccurred())

Expect(pod.Spec.Affinity).To(Equal(dummyAffinity))
Expect(pod.Spec.NodeSelector).To(Equal(dummyNodeSelector))
Expect(pod.Spec.Tolerations).To(Equal(dummyTolerations))
Expect(pod.Spec.Affinity).To(Equal(workloads.Affinity))
Expect(pod.Spec.NodeSelector).To(Equal(workloads.NodeSelector))
Expect(pod.Spec.Tolerations).To(Equal(workloads.Tolerations))
})

It("Should create a POD if a PVC with all needed annotations is passed", func() {
Expand Down Expand Up @@ -1181,3 +1147,37 @@ func createSecret(name, ns, accessKey, secretKey string, labels map[string]strin
},
}
}

func updateCdiWithTestNodePlacement(c client.Client) sdkapi.NodePlacement {
cr := &cdiv1.CDI{}
err := c.Get(context.TODO(), types.NamespacedName{Name: "cdi"}, cr)
Expect(err).ToNot(HaveOccurred())

workloads := sdkapi.NodePlacement{
NodeSelector: map[string]string{"kubernetes.io/arch": "amd64"},
Tolerations: []v1.Toleration{{Key: "test", Value: "123"}},
Affinity: &v1.Affinity{
NodeAffinity: &v1.NodeAffinity{
RequiredDuringSchedulingIgnoredDuringExecution: &v1.NodeSelector{
NodeSelectorTerms: []v1.NodeSelectorTerm{
{
MatchExpressions: []v1.NodeSelectorRequirement{
{Key: "kubernetes.io/hostname", Operator: v1.NodeSelectorOpIn, Values: []string{"node01"}},
},
},
},
},
},
},
}

cr.Spec.Workloads = workloads
err = c.Update(context.TODO(), cr)
Expect(err).ToNot(HaveOccurred())

placement, err := cc.GetWorkloadNodePlacement(c)
Expect(err).ToNot(HaveOccurred())
Expect(*placement).To(Equal(workloads))

return workloads
}