Skip to content

Commit

Permalink
Propagate CDI workload NodePlacement to DIC CronJob and initial Job
Browse files Browse the repository at this point in the history
Signed-off-by: Arnon Gilboa <agilboa@redhat.com>
  • Loading branch information
arnongilboa committed May 17, 2023
1 parent 738036b commit cfae6bc
Show file tree
Hide file tree
Showing 4 changed files with 150 additions and 106 deletions.
1 change: 0 additions & 1 deletion pkg/controller/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ go_library(
"//vendor/k8s.io/klog/v2:go_default_library",
"//vendor/k8s.io/utils/pointer:go_default_library",
"//vendor/kubevirt.io/controller-lifecycle-operator-sdk/api:go_default_library",
"//vendor/kubevirt.io/controller-lifecycle-operator-sdk/pkg/sdk:go_default_library",
"//vendor/sigs.k8s.io/controller-runtime/pkg/client:go_default_library",
"//vendor/sigs.k8s.io/controller-runtime/pkg/controller:go_default_library",
"//vendor/sigs.k8s.io/controller-runtime/pkg/event:go_default_library",
Expand Down
123 changes: 60 additions & 63 deletions pkg/controller/dataimportcron-controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,15 +53,12 @@ import (

cdiv1 "kubevirt.io/containerized-data-importer-api/pkg/apis/core/v1beta1"
"kubevirt.io/containerized-data-importer/pkg/common"

cc "kubevirt.io/containerized-data-importer/pkg/controller/common"
cdv "kubevirt.io/containerized-data-importer/pkg/controller/datavolume"
"kubevirt.io/containerized-data-importer/pkg/monitoring"
"kubevirt.io/containerized-data-importer/pkg/operator"
"kubevirt.io/containerized-data-importer/pkg/util"
"kubevirt.io/containerized-data-importer/pkg/util/naming"

"kubevirt.io/controller-lifecycle-operator-sdk/pkg/sdk"
)

const (
Expand Down Expand Up @@ -109,8 +106,6 @@ const (
AnnLastCronTime = cc.AnnAPIGroup + "/storage.import.lastCronTime"
// AnnLastUseTime is the PVC last use time stamp
AnnLastUseTime = cc.AnnAPIGroup + "/storage.import.lastUseTime"
// AnnLastAppliedConfig is the cron last applied configuration
AnnLastAppliedConfig = cc.AnnAPIGroup + "/lastAppliedConfiguration"

dataImportControllerName = "dataimportcron-controller"
digestPrefix = "sha256:"
Expand Down Expand Up @@ -734,49 +729,54 @@ func addDataImportCronControllerWatches(mgr manager.Manager, c controller.Contro
}

func (r *DataImportCronReconciler) cronJobExistsAndUpdated(ctx context.Context, cron *cdiv1.DataImportCron) (bool, error) {
var cronJob batchv1.CronJob
cronJobNamespacedName := types.NamespacedName{Namespace: r.cdiNamespace, Name: GetCronJobName(cron)}
if err := r.client.Get(ctx, cronJobNamespacedName, &cronJob); err != nil {
cronJob := &batchv1.CronJob{}
cronJobKey := types.NamespacedName{Namespace: r.cdiNamespace, Name: GetCronJobName(cron)}
if err := r.client.Get(ctx, cronJobKey, cronJob); err != nil {
return false, cc.IgnoreNotFound(err)
}
desired, err := r.newCronJob(cron)
if err != nil {
return false, err
}
current, err := sdk.StripStatusFromObject(&cronJob)
if err != nil {
return false, err
}
sdk.MergeLabelsAndAnnotations(desired, current)
merged, err := sdk.MergeObject(desired, current, AnnLastAppliedConfig)
if err != nil {

cronJobCopy := cronJob.DeepCopy()
if err := r.initCronJob(cron, cronJobCopy); err != nil {
return false, err
}
if reflect.DeepEqual(current, merged) {
return true, nil
}
r.log.Info("Updating CronJob", "name", merged.GetName())
if err := r.client.Update(ctx, merged); err != nil {
return false, cc.IgnoreNotFound(err)

if !reflect.DeepEqual(cronJob, cronJobCopy) {
r.log.Info("Updating CronJob", "name", cronJob.GetName())
if err := r.client.Update(ctx, cronJobCopy); err != nil {
return false, cc.IgnoreNotFound(err)
}
}
return true, nil
}

func (r *DataImportCronReconciler) newCronJob(cron *cdiv1.DataImportCron) (*batchv1.CronJob, error) {
cronJob := &batchv1.CronJob{
ObjectMeta: metav1.ObjectMeta{
Name: GetCronJobName(cron),
Namespace: r.cdiNamespace,
},
}
if err := r.initCronJob(cron, cronJob); err != nil {
return nil, err
}
return cronJob, nil
}

func (r *DataImportCronReconciler) initCronJob(cron *cdiv1.DataImportCron, cronJob *batchv1.CronJob) error {
regSource, err := getCronRegistrySource(cron)
if err != nil {
return nil, err
return err
}
if regSource.URL == nil {
return nil, errors.Errorf("No URL source in cron %s", cron.Name)
return errors.Errorf("No URL source in cron %s", cron.Name)
}
cdiConfig := &cdiv1.CDIConfig{}
if err = r.client.Get(context.TODO(), types.NamespacedName{Name: common.ConfigName}, cdiConfig); err != nil {
return nil, err
return err
}
insecureTLS, err := IsInsecureTLS(*regSource.URL, cdiConfig, r.uncachedClient, r.log)
if err != nil {
return nil, err
return err
}
container := corev1.Container{
Name: "cdi-source-update-poller",
Expand Down Expand Up @@ -813,6 +813,10 @@ func (r *DataImportCronReconciler) newCronJob(cron *cdiv1.DataImportCron) (*batc
volumes = append(volumes, createConfigMapVolume(ProxyCertVolName, volName))
}

if len(volumes) == 0 {
volumes = nil
}

if regSource.SecretRef != nil && *regSource.SecretRef != "" {
container.Env = append(container.Env,
corev1.EnvVar{
Expand Down Expand Up @@ -862,46 +866,39 @@ func (r *DataImportCronReconciler) newCronJob(cron *cdiv1.DataImportCron) (*batc

imagePullSecrets, err := cc.GetImagePullSecrets(r.client)
if err != nil {
return nil, err
return err
}
cronJobName := GetCronJobName(cron)
cronJob := &batchv1.CronJob{
ObjectMeta: metav1.ObjectMeta{
Name: cronJobName,
Namespace: r.cdiNamespace,
},
Spec: batchv1.CronJobSpec{
Schedule: cron.Spec.Schedule,
ConcurrencyPolicy: batchv1.ForbidConcurrent,
SuccessfulJobsHistoryLimit: pointer.Int32(1),
FailedJobsHistoryLimit: pointer.Int32(1),
JobTemplate: batchv1.JobTemplateSpec{
Spec: batchv1.JobSpec{
Template: corev1.PodTemplateSpec{
Spec: corev1.PodSpec{
RestartPolicy: corev1.RestartPolicyNever,
TerminationGracePeriodSeconds: pointer.Int64(0),
Containers: []corev1.Container{container},
ServiceAccountName: common.CronJobServiceAccountName,
Volumes: volumes,
ImagePullSecrets: imagePullSecrets,
},
},
BackoffLimit: pointer.Int32(2),
TTLSecondsAfterFinished: pointer.Int32(10),
},
},
},
workloadNodePlacement, err := cc.GetWorkloadNodePlacement(r.client)
if err != nil {
return err
}

cronJobSpec := &cronJob.Spec
cronJobSpec.Schedule = cron.Spec.Schedule
cronJobSpec.ConcurrencyPolicy = batchv1.ForbidConcurrent
cronJobSpec.SuccessfulJobsHistoryLimit = pointer.Int32(1)
cronJobSpec.FailedJobsHistoryLimit = pointer.Int32(1)

jobSpec := &cronJobSpec.JobTemplate.Spec
jobSpec.BackoffLimit = pointer.Int32(2)
jobSpec.TTLSecondsAfterFinished = pointer.Int32(10)

podSpec := &jobSpec.Template.Spec
podSpec.RestartPolicy = corev1.RestartPolicyNever
podSpec.TerminationGracePeriodSeconds = pointer.Int64(0)
podSpec.Containers = []corev1.Container{container}
podSpec.ServiceAccountName = common.CronJobServiceAccountName
podSpec.Volumes = volumes
podSpec.ImagePullSecrets = imagePullSecrets
podSpec.NodeSelector = workloadNodePlacement.NodeSelector
podSpec.Tolerations = workloadNodePlacement.Tolerations
podSpec.Affinity = workloadNodePlacement.Affinity

if err := r.setJobCommon(cron, cronJob); err != nil {
return nil, err
}
if err := sdk.SetLastAppliedConfiguration(cronJob, AnnLastAppliedConfig); err != nil {
return nil, err
return err
}
cc.SetRestrictedSecurityContext(&cronJob.Spec.JobTemplate.Spec.Template.Spec)
return cronJob, nil
return nil
}

func (r *DataImportCronReconciler) newInitialJob(cron *cdiv1.DataImportCron, cronJob *batchv1.CronJob) (*batchv1.Job, error) {
Expand Down
50 changes: 49 additions & 1 deletion pkg/controller/dataimportcron-controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,54 @@ var _ = Describe("All DataImportCron Tests", func() {
verifyCronJobContainerImage("new-image")
})

It("Should update CronJob Pod workload NodePlacement on reconcile", func() {
cron = newDataImportCron(cronName)
reconciler = createDataImportCronReconciler(cron)

_, err := reconciler.Reconcile(context.TODO(), cronReq)
Expect(err).ToNot(HaveOccurred())

cronjob := &batchv1.CronJob{}
err = reconciler.client.Get(context.TODO(), cronJobKey(cron), cronjob)
Expect(err).ToNot(HaveOccurred())
spec := &cronjob.Spec.JobTemplate.Spec.Template.Spec
spec.NodeSelector = map[string]string{"some": "thing"}
spec.Tolerations = []corev1.Toleration{{Key: "another", Value: "value"}}
err = reconciler.client.Update(context.TODO(), cronjob)
Expect(err).ToNot(HaveOccurred())

workloads := updateCdiWithTestNodePlacement(reconciler.client)

_, err = reconciler.Reconcile(context.TODO(), cronReq)
Expect(err).ToNot(HaveOccurred())

err = reconciler.client.Get(context.TODO(), cronJobKey(cron), cronjob)
Expect(err).ToNot(HaveOccurred())
podSpec := cronjob.Spec.JobTemplate.Spec.Template.Spec
Expect(podSpec.Affinity).To(Equal(workloads.Affinity))
Expect(podSpec.NodeSelector).To(Equal(workloads.NodeSelector))
Expect(podSpec.Tolerations).To(Equal(workloads.Tolerations))
})

It("Should set initial Job Pod workload NodePlacement on reconcile", func() {
cron = newDataImportCron(cronName)
reconciler = createDataImportCronReconciler(cron)

workloads := updateCdiWithTestNodePlacement(reconciler.client)

_, err := reconciler.Reconcile(context.TODO(), cronReq)
Expect(err).ToNot(HaveOccurred())

job := &batchv1.Job{}
jobKey := types.NamespacedName{Name: GetInitialJobName(cron), Namespace: reconciler.cdiNamespace}
err = reconciler.client.Get(context.TODO(), jobKey, job)
Expect(err).ToNot(HaveOccurred())
podSpec := job.Spec.Template.Spec
Expect(podSpec.Affinity).To(Equal(workloads.Affinity))
Expect(podSpec.NodeSelector).To(Equal(workloads.NodeSelector))
Expect(podSpec.Tolerations).To(Equal(workloads.Tolerations))
})

It("Should create DataVolume on AnnSourceDesiredDigest annotation update, and update DataImportCron and DataSource on DataVolume Succeeded", func() {
cron = newDataImportCron(cronName)
dataSource = nil
Expand Down Expand Up @@ -690,7 +738,7 @@ func createDataImportCronReconciler(objects ...runtime.Object) *DataImportCronRe

func createDataImportCronReconcilerWithoutConfig(objects ...runtime.Object) *DataImportCronReconciler {
crd := &extv1.CustomResourceDefinition{ObjectMeta: metav1.ObjectMeta{Name: "dataimportcrons.cdi.kubevirt.io"}}
objs := []runtime.Object{crd}
objs := []runtime.Object{crd, cc.MakeEmptyCDICR()}
objs = append(objs, objects...)

s := scheme.Scheme
Expand Down
82 changes: 41 additions & 41 deletions pkg/controller/import-controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,27 +22,25 @@ import (
"strconv"
"strings"

"sigs.k8s.io/controller-runtime/pkg/client"

cc "kubevirt.io/containerized-data-importer/pkg/controller/common"
featuregates "kubevirt.io/containerized-data-importer/pkg/feature-gates"
"kubevirt.io/containerized-data-importer/pkg/util/naming"
sdkapi "kubevirt.io/controller-lifecycle-operator-sdk/api"

"k8s.io/apimachinery/pkg/runtime"
bootstrapapi "k8s.io/cluster-bootstrap/token/api"

. "github.com/onsi/ginkgo"
"github.com/onsi/ginkgo/extensions/table"
. "github.com/onsi/gomega"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/fake"
logf "sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

cdiv1 "kubevirt.io/containerized-data-importer-api/pkg/apis/core/v1beta1"
"kubevirt.io/containerized-data-importer/pkg/common"

//cc "kubevirt.io/containerized-data-importer/pkg/controller/common"

corev1 "k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
Expand Down Expand Up @@ -271,49 +269,17 @@ var _ = Describe("ImportConfig Controller reconcile loop", func() {
pvc.Status.Phase = v1.ClaimBound

reconciler = createImportReconciler(pvc)
workloads := updateCdiWithTestNodePlacement(reconciler.client)

cr := &cdiv1.CDI{}
err := reconciler.client.Get(context.TODO(), types.NamespacedName{Name: "cdi"}, cr)
Expect(err).ToNot(HaveOccurred())

dummyNodeSelector := map[string]string{"kubernetes.io/arch": "amd64"}
dummyTolerations := []v1.Toleration{{Key: "test", Value: "123"}}
dummyAffinity := &v1.Affinity{
NodeAffinity: &v1.NodeAffinity{
RequiredDuringSchedulingIgnoredDuringExecution: &v1.NodeSelector{
NodeSelectorTerms: []v1.NodeSelectorTerm{
{
MatchExpressions: []v1.NodeSelectorRequirement{
{Key: "kubernetes.io/hostname", Operator: v1.NodeSelectorOpIn, Values: []string{"node01"}},
},
},
},
},
},
}
cr.Spec.Workloads.NodeSelector = dummyNodeSelector
cr.Spec.Workloads.Affinity = dummyAffinity
cr.Spec.Workloads.Tolerations = dummyTolerations

err = reconciler.client.Update(context.TODO(), cr)
Expect(err).ToNot(HaveOccurred())

placement, err := cc.GetWorkloadNodePlacement(reconciler.client)
Expect(err).ToNot(HaveOccurred())

Expect(placement.Affinity).To(Equal(dummyAffinity))
Expect(placement.NodeSelector).To(Equal(dummyNodeSelector))
Expect(placement.Tolerations).To(Equal(dummyTolerations))

_, err = reconciler.Reconcile(context.TODO(), reconcile.Request{NamespacedName: types.NamespacedName{Name: "testPvc1", Namespace: "default"}})
_, err := reconciler.Reconcile(context.TODO(), reconcile.Request{NamespacedName: types.NamespacedName{Name: "testPvc1", Namespace: "default"}})
Expect(err).ToNot(HaveOccurred())
pod := &corev1.Pod{}
err = reconciler.client.Get(context.TODO(), types.NamespacedName{Name: "importer-testPvc1", Namespace: "default"}, pod)
Expect(err).ToNot(HaveOccurred())

Expect(pod.Spec.Affinity).To(Equal(dummyAffinity))
Expect(pod.Spec.NodeSelector).To(Equal(dummyNodeSelector))
Expect(pod.Spec.Tolerations).To(Equal(dummyTolerations))
Expect(pod.Spec.Affinity).To(Equal(workloads.Affinity))
Expect(pod.Spec.NodeSelector).To(Equal(workloads.NodeSelector))
Expect(pod.Spec.Tolerations).To(Equal(workloads.Tolerations))
})

It("Should create a POD if a PVC with all needed annotations is passed", func() {
Expand Down Expand Up @@ -1181,3 +1147,37 @@ func createSecret(name, ns, accessKey, secretKey string, labels map[string]strin
},
}
}

func updateCdiWithTestNodePlacement(c client.Client) sdkapi.NodePlacement {
cr := &cdiv1.CDI{}
err := c.Get(context.TODO(), types.NamespacedName{Name: "cdi"}, cr)
Expect(err).ToNot(HaveOccurred())

workloads := sdkapi.NodePlacement{
NodeSelector: map[string]string{"kubernetes.io/arch": "amd64"},
Tolerations: []v1.Toleration{{Key: "test", Value: "123"}},
Affinity: &v1.Affinity{
NodeAffinity: &v1.NodeAffinity{
RequiredDuringSchedulingIgnoredDuringExecution: &v1.NodeSelector{
NodeSelectorTerms: []v1.NodeSelectorTerm{
{
MatchExpressions: []v1.NodeSelectorRequirement{
{Key: "kubernetes.io/hostname", Operator: v1.NodeSelectorOpIn, Values: []string{"node01"}},
},
},
},
},
},
},
}

cr.Spec.Workloads = workloads
err = c.Update(context.TODO(), cr)
Expect(err).ToNot(HaveOccurred())

placement, err := cc.GetWorkloadNodePlacement(c)
Expect(err).ToNot(HaveOccurred())
Expect(*placement).To(Equal(workloads))

return workloads
}

0 comments on commit cfae6bc

Please sign in to comment.