Skip to content

Commit

Permalink
Add utest
Browse files Browse the repository at this point in the history
Signed-off-by: Arnon Gilboa <agilboa@redhat.com>
  • Loading branch information
arnongilboa committed May 12, 2023
1 parent 3ad4eeb commit ad1ee19
Show file tree
Hide file tree
Showing 3 changed files with 142 additions and 103 deletions.
115 changes: 53 additions & 62 deletions pkg/controller/dataimportcron-controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,6 @@ import (
"kubevirt.io/containerized-data-importer/pkg/operator"
"kubevirt.io/containerized-data-importer/pkg/util"
"kubevirt.io/containerized-data-importer/pkg/util/naming"

"kubevirt.io/controller-lifecycle-operator-sdk/pkg/sdk"
)

const (
Expand Down Expand Up @@ -109,8 +107,6 @@ const (
AnnLastCronTime = cc.AnnAPIGroup + "/storage.import.lastCronTime"
// AnnLastUseTime is the PVC last use time stamp
AnnLastUseTime = cc.AnnAPIGroup + "/storage.import.lastUseTime"
// AnnLastAppliedConfig is the cron last applied configuration
AnnLastAppliedConfig = cc.AnnAPIGroup + "/lastAppliedConfiguration"

dataImportControllerName = "dataimportcron-controller"
digestPrefix = "sha256:"
Expand Down Expand Up @@ -734,49 +730,54 @@ func addDataImportCronControllerWatches(mgr manager.Manager, c controller.Contro
}

func (r *DataImportCronReconciler) cronJobExistsAndUpdated(ctx context.Context, cron *cdiv1.DataImportCron) (bool, error) {
var cronJob batchv1.CronJob
cronJobNamespacedName := types.NamespacedName{Namespace: r.cdiNamespace, Name: GetCronJobName(cron)}
if err := r.client.Get(ctx, cronJobNamespacedName, &cronJob); err != nil {
cronJob := batchv1.CronJob{}
cronJobKey := types.NamespacedName{Namespace: r.cdiNamespace, Name: GetCronJobName(cron)}
if err := r.client.Get(ctx, cronJobKey, &cronJob); err != nil {
return false, cc.IgnoreNotFound(err)
}
desired, err := r.newCronJob(cron)
if err != nil {
return false, err
}
current, err := sdk.StripStatusFromObject(&cronJob)
if err != nil {
return false, err
}
sdk.MergeLabelsAndAnnotations(desired, current)
merged, err := sdk.MergeObject(desired, current, AnnLastAppliedConfig)
if err != nil {

cronJobCopy := cronJob.DeepCopy()
if err := r.initCronJob(cron, cronJobCopy); err != nil {
return false, err
}
if reflect.DeepEqual(current, merged) {
return true, nil
}
r.log.Info("Updating CronJob", "name", merged.GetName())
if err := r.client.Update(ctx, merged); err != nil {
return false, cc.IgnoreNotFound(err)

if !reflect.DeepEqual(cronJob, cronJobCopy) {
r.log.Info("Updating CronJob", "name", cronJob.GetName())
if err := r.client.Update(ctx, cronJobCopy); err != nil {
return false, cc.IgnoreNotFound(err)
}
}
return true, nil
}

func (r *DataImportCronReconciler) newCronJob(cron *cdiv1.DataImportCron) (*batchv1.CronJob, error) {
cronJob := &batchv1.CronJob{
ObjectMeta: metav1.ObjectMeta{
Name: GetCronJobName(cron),
Namespace: r.cdiNamespace,
},
}
if err := r.initCronJob(cron, cronJob); err != nil {
return nil, err
}
return cronJob, nil
}

func (r *DataImportCronReconciler) initCronJob(cron *cdiv1.DataImportCron, cronJob *batchv1.CronJob) error {
regSource, err := getCronRegistrySource(cron)
if err != nil {
return nil, err
return err
}
if regSource.URL == nil {
return nil, errors.Errorf("No URL source in cron %s", cron.Name)
return errors.Errorf("No URL source in cron %s", cron.Name)
}
cdiConfig := &cdiv1.CDIConfig{}
if err = r.client.Get(context.TODO(), types.NamespacedName{Name: common.ConfigName}, cdiConfig); err != nil {
return nil, err
return err
}
insecureTLS, err := IsInsecureTLS(*regSource.URL, cdiConfig, r.uncachedClient, r.log)
if err != nil {
return nil, err
return err
}
container := corev1.Container{
Name: "cdi-source-update-poller",
Expand Down Expand Up @@ -862,53 +863,43 @@ func (r *DataImportCronReconciler) newCronJob(cron *cdiv1.DataImportCron) (*batc

imagePullSecrets, err := cc.GetImagePullSecrets(r.client)
if err != nil {
return nil, err
return err
}
workloadNodePlacement, err := cc.GetWorkloadNodePlacement(r.client)
if err != nil {
return nil, err
return err
}
cronJobName := GetCronJobName(cron)
cronJob := &batchv1.CronJob{
ObjectMeta: metav1.ObjectMeta{
Name: cronJobName,
Namespace: r.cdiNamespace,
},
Spec: batchv1.CronJobSpec{
Schedule: cron.Spec.Schedule,
ConcurrencyPolicy: batchv1.ForbidConcurrent,
SuccessfulJobsHistoryLimit: pointer.Int32(1),
FailedJobsHistoryLimit: pointer.Int32(1),
JobTemplate: batchv1.JobTemplateSpec{
Spec: batchv1.JobSpec{
Template: corev1.PodTemplateSpec{
Spec: corev1.PodSpec{
RestartPolicy: corev1.RestartPolicyNever,
TerminationGracePeriodSeconds: pointer.Int64(0),
Containers: []corev1.Container{container},
ServiceAccountName: common.CronJobServiceAccountName,
Volumes: volumes,
ImagePullSecrets: imagePullSecrets,
NodeSelector: workloadNodePlacement.NodeSelector,
Tolerations: workloadNodePlacement.Tolerations,
Affinity: workloadNodePlacement.Affinity,
},
cronJob.Spec = batchv1.CronJobSpec{
Schedule: cron.Spec.Schedule,
ConcurrencyPolicy: batchv1.ForbidConcurrent,
SuccessfulJobsHistoryLimit: pointer.Int32(1),
FailedJobsHistoryLimit: pointer.Int32(1),
JobTemplate: batchv1.JobTemplateSpec{
Spec: batchv1.JobSpec{
Template: corev1.PodTemplateSpec{
Spec: corev1.PodSpec{
RestartPolicy: corev1.RestartPolicyNever,
TerminationGracePeriodSeconds: pointer.Int64(0),
Containers: []corev1.Container{container},
ServiceAccountName: common.CronJobServiceAccountName,
Volumes: volumes,
ImagePullSecrets: imagePullSecrets,
NodeSelector: workloadNodePlacement.NodeSelector,
Tolerations: workloadNodePlacement.Tolerations,
Affinity: workloadNodePlacement.Affinity,
},
BackoffLimit: pointer.Int32(2),
TTLSecondsAfterFinished: pointer.Int32(10),
},
BackoffLimit: pointer.Int32(2),
TTLSecondsAfterFinished: pointer.Int32(10),
},
},
}

if err := r.setJobCommon(cron, cronJob); err != nil {
return nil, err
}
if err := sdk.SetLastAppliedConfiguration(cronJob, AnnLastAppliedConfig); err != nil {
return nil, err
return err
}
cc.SetRestrictedSecurityContext(&cronJob.Spec.JobTemplate.Spec.Template.Spec)
return cronJob, nil
return nil
}

func (r *DataImportCronReconciler) newInitialJob(cron *cdiv1.DataImportCron, cronJob *batchv1.CronJob) (*batchv1.Job, error) {
Expand Down
48 changes: 48 additions & 0 deletions pkg/controller/dataimportcron-controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,54 @@ var _ = Describe("All DataImportCron Tests", func() {
verifyCronJobContainerImage("new-image")
})

It("Should update CronJob Pod workload NodePlacement on reconcile", func() {
cron = newDataImportCron(cronName)
reconciler = createDataImportCronReconciler(cron)

_, err := reconciler.Reconcile(context.TODO(), cronReq)
Expect(err).ToNot(HaveOccurred())

cronjob := &batchv1.CronJob{}
err = reconciler.client.Get(context.TODO(), cronJobKey(cron), cronjob)
Expect(err).ToNot(HaveOccurred())
spec := &cronjob.Spec.JobTemplate.Spec.Template.Spec
spec.NodeSelector = map[string]string{"some": "thing"}
spec.Tolerations = []corev1.Toleration{{Key: "another", Value: "value"}}
err = reconciler.client.Update(context.TODO(), cronjob)
Expect(err).ToNot(HaveOccurred())

workloads := updateCdiWithTestNodePlacement(reconciler.client)

_, err = reconciler.Reconcile(context.TODO(), cronReq)
Expect(err).ToNot(HaveOccurred())

err = reconciler.client.Get(context.TODO(), cronJobKey(cron), cronjob)
Expect(err).ToNot(HaveOccurred())
podSpec := cronjob.Spec.JobTemplate.Spec.Template.Spec
Expect(podSpec.Affinity).To(Equal(workloads.Affinity))
Expect(podSpec.NodeSelector).To(Equal(workloads.NodeSelector))
Expect(podSpec.Tolerations).To(Equal(workloads.Tolerations))
})

It("Should set initial Job Pod workload NodePlacement on reconcile", func() {
cron = newDataImportCron(cronName)
reconciler = createDataImportCronReconciler(cron)

workloads := updateCdiWithTestNodePlacement(reconciler.client)

_, err := reconciler.Reconcile(context.TODO(), cronReq)
Expect(err).ToNot(HaveOccurred())

job := &batchv1.Job{}
jobKey := types.NamespacedName{Name: GetInitialJobName(cron), Namespace: reconciler.cdiNamespace}
err = reconciler.client.Get(context.TODO(), jobKey, job)
Expect(err).ToNot(HaveOccurred())
podSpec := job.Spec.Template.Spec
Expect(podSpec.Affinity).To(Equal(workloads.Affinity))
Expect(podSpec.NodeSelector).To(Equal(workloads.NodeSelector))
Expect(podSpec.Tolerations).To(Equal(workloads.Tolerations))
})

It("Should create DataVolume on AnnSourceDesiredDigest annotation update, and update DataImportCron and DataSource on DataVolume Succeeded", func() {
cron = newDataImportCron(cronName)
dataSource = nil
Expand Down
82 changes: 41 additions & 41 deletions pkg/controller/import-controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,27 +22,25 @@ import (
"strconv"
"strings"

"sigs.k8s.io/controller-runtime/pkg/client"

cc "kubevirt.io/containerized-data-importer/pkg/controller/common"
featuregates "kubevirt.io/containerized-data-importer/pkg/feature-gates"
"kubevirt.io/containerized-data-importer/pkg/util/naming"
sdkapi "kubevirt.io/controller-lifecycle-operator-sdk/api"

"k8s.io/apimachinery/pkg/runtime"
bootstrapapi "k8s.io/cluster-bootstrap/token/api"

. "github.com/onsi/ginkgo"
"github.com/onsi/ginkgo/extensions/table"
. "github.com/onsi/gomega"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/fake"
logf "sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

cdiv1 "kubevirt.io/containerized-data-importer-api/pkg/apis/core/v1beta1"
"kubevirt.io/containerized-data-importer/pkg/common"

//cc "kubevirt.io/containerized-data-importer/pkg/controller/common"

corev1 "k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
Expand Down Expand Up @@ -271,49 +269,17 @@ var _ = Describe("ImportConfig Controller reconcile loop", func() {
pvc.Status.Phase = v1.ClaimBound

reconciler = createImportReconciler(pvc)
workloads := updateCdiWithTestNodePlacement(reconciler.client)

cr := &cdiv1.CDI{}
err := reconciler.client.Get(context.TODO(), types.NamespacedName{Name: "cdi"}, cr)
Expect(err).ToNot(HaveOccurred())

dummyNodeSelector := map[string]string{"kubernetes.io/arch": "amd64"}
dummyTolerations := []v1.Toleration{{Key: "test", Value: "123"}}
dummyAffinity := &v1.Affinity{
NodeAffinity: &v1.NodeAffinity{
RequiredDuringSchedulingIgnoredDuringExecution: &v1.NodeSelector{
NodeSelectorTerms: []v1.NodeSelectorTerm{
{
MatchExpressions: []v1.NodeSelectorRequirement{
{Key: "kubernetes.io/hostname", Operator: v1.NodeSelectorOpIn, Values: []string{"node01"}},
},
},
},
},
},
}
cr.Spec.Workloads.NodeSelector = dummyNodeSelector
cr.Spec.Workloads.Affinity = dummyAffinity
cr.Spec.Workloads.Tolerations = dummyTolerations

err = reconciler.client.Update(context.TODO(), cr)
Expect(err).ToNot(HaveOccurred())

placement, err := cc.GetWorkloadNodePlacement(reconciler.client)
Expect(err).ToNot(HaveOccurred())

Expect(placement.Affinity).To(Equal(dummyAffinity))
Expect(placement.NodeSelector).To(Equal(dummyNodeSelector))
Expect(placement.Tolerations).To(Equal(dummyTolerations))

_, err = reconciler.Reconcile(context.TODO(), reconcile.Request{NamespacedName: types.NamespacedName{Name: "testPvc1", Namespace: "default"}})
_, err := reconciler.Reconcile(context.TODO(), reconcile.Request{NamespacedName: types.NamespacedName{Name: "testPvc1", Namespace: "default"}})
Expect(err).ToNot(HaveOccurred())
pod := &corev1.Pod{}
err = reconciler.client.Get(context.TODO(), types.NamespacedName{Name: "importer-testPvc1", Namespace: "default"}, pod)
Expect(err).ToNot(HaveOccurred())

Expect(pod.Spec.Affinity).To(Equal(dummyAffinity))
Expect(pod.Spec.NodeSelector).To(Equal(dummyNodeSelector))
Expect(pod.Spec.Tolerations).To(Equal(dummyTolerations))
Expect(pod.Spec.Affinity).To(Equal(workloads.Affinity))
Expect(pod.Spec.NodeSelector).To(Equal(workloads.NodeSelector))
Expect(pod.Spec.Tolerations).To(Equal(workloads.Tolerations))
})

It("Should create a POD if a PVC with all needed annotations is passed", func() {
Expand Down Expand Up @@ -1181,3 +1147,37 @@ func createSecret(name, ns, accessKey, secretKey string, labels map[string]strin
},
}
}

func updateCdiWithTestNodePlacement(c client.Client) sdkapi.NodePlacement {
cr := &cdiv1.CDI{}
err := c.Get(context.TODO(), types.NamespacedName{Name: "cdi"}, cr)
Expect(err).ToNot(HaveOccurred())

workloads := sdkapi.NodePlacement{
NodeSelector: map[string]string{"kubernetes.io/arch": "amd64"},
Tolerations: []v1.Toleration{{Key: "test", Value: "123"}},
Affinity: &v1.Affinity{
NodeAffinity: &v1.NodeAffinity{
RequiredDuringSchedulingIgnoredDuringExecution: &v1.NodeSelector{
NodeSelectorTerms: []v1.NodeSelectorTerm{
{
MatchExpressions: []v1.NodeSelectorRequirement{
{Key: "kubernetes.io/hostname", Operator: v1.NodeSelectorOpIn, Values: []string{"node01"}},
},
},
},
},
},
},
}

cr.Spec.Workloads = workloads
err = c.Update(context.TODO(), cr)
Expect(err).ToNot(HaveOccurred())

placement, err := cc.GetWorkloadNodePlacement(c)
Expect(err).ToNot(HaveOccurred())
Expect(*placement).To(Equal(workloads))

return workloads
}

0 comments on commit ad1ee19

Please sign in to comment.