Skip to content

Commit

Permalink
Enable empty schedule in DataImportCron
Browse files Browse the repository at this point in the history
Allow disabling DataImportCron schedule and support external trigger

Signed-off-by: Ido Aharon <iaharon@redhat.com>
  • Loading branch information
ido106 committed May 29, 2023
1 parent fae6535 commit e51c0ca
Show file tree
Hide file tree
Showing 5 changed files with 83 additions and 13 deletions.
16 changes: 9 additions & 7 deletions pkg/apiserver/webhooks/dataimportcron-validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,13 +116,15 @@ func (wh *dataImportCronValidatingWebhook) validateDataImportCronSpec(request *a
return causes
}

if _, err := cronexpr.Parse(spec.Schedule); err != nil {
causes = append(causes, metav1.StatusCause{
Type: metav1.CauseTypeFieldValueInvalid,
Message: "Illegal cron schedule",
Field: field.Child("Schedule").String(),
})
return causes
if spec.Schedule != "" {
if _, err := cronexpr.Parse(spec.Schedule); err != nil {
causes = append(causes, metav1.StatusCause{
Type: metav1.CauseTypeFieldValueInvalid,
Message: "Illegal cron schedule",
Field: field.Child("Schedule").String(),
})
return causes
}
}

if spec.ImportsToKeep != nil && *spec.ImportsToKeep < 0 {
Expand Down
6 changes: 6 additions & 0 deletions pkg/apiserver/webhooks/dataimportcron-validate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,12 @@ var _ = Describe("Validating Webhook", func() {
resp := validateDataImportCronCreate(cron)
Expect(resp.Allowed).To(BeFalse())
})
It("should allow DataImportCron with empty cron schedule", func() {
cron := newDataImportCron(cdiv1.DataVolumeSourceRegistry{URL: &testRegistryURL})
cron.Spec.Schedule = ""
resp := validateDataImportCronCreate(cron)
Expect(resp.Allowed).To(BeTrue())
})
It("should reject DataImportCron with illegal ManagedDataSource on create", func() {
cron := newDataImportCron(cdiv1.DataVolumeSourceRegistry{URL: &testRegistryURL})
cron.Spec.ManagedDataSource = ""
Expand Down
8 changes: 7 additions & 1 deletion pkg/controller/dataimportcron-controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,9 +127,11 @@ func (r *DataImportCronReconciler) Reconcile(ctx context.Context, req reconcile.
if !shouldReconcile || err != nil {
return reconcile.Result{}, err
}

if err := r.initCron(ctx, dataImportCron); err != nil {
return reconcile.Result{}, err
}

return r.update(ctx, dataImportCron)
}

Expand Down Expand Up @@ -162,6 +164,9 @@ func (r *DataImportCronReconciler) shouldReconcileCron(ctx context.Context, cron
}

func (r *DataImportCronReconciler) initCron(ctx context.Context, dataImportCron *cdiv1.DataImportCron) error {
if dataImportCron.Spec.Schedule == "" {
return nil
}
if isImageStreamSource(dataImportCron) {
if dataImportCron.Annotations[AnnNextCronTime] == "" {
cc.AddAnnotation(dataImportCron, AnnNextCronTime, time.Now().Format(time.RFC3339))
Expand Down Expand Up @@ -348,7 +353,8 @@ func (r *DataImportCronReconciler) update(ctx context.Context, dataImportCron *c
}

// We use the poller returned reconcile.Result for RequeueAfter if needed
if isImageStreamSource(dataImportCron) {
// skip if we disabled schedule
if isImageStreamSource(dataImportCron) && dataImportCron.Spec.Schedule != "" {
res, err = r.pollImageStreamDigest(ctx, dataImportCron)
if err != nil {
return res, err
Expand Down
18 changes: 15 additions & 3 deletions pkg/controller/dataimportcron-controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ const (
imageStreamName = "test-imagestream"
imageStreamTag = "test-imagestream-tag"
tagWithNoItems = "tag-with-no-items"
defaultSchedule = "* * * * *"
emptySchedule = ""
)

type possiblyErroringFakeCtrlRuntimeClient struct {
Expand Down Expand Up @@ -378,8 +380,9 @@ var _ = Describe("All DataImportCron Tests", func() {
Expect(cronJob).To(Equal(cronJobCopy))
})

It("Should create DataVolume on AnnSourceDesiredDigest annotation update, and update DataImportCron and DataSource on DataVolume Succeeded", func() {
DescribeTable("Should create DataVolume on AnnSourceDesiredDigest annotation update, and update DataImportCron and DataSource on DataVolume Succeeded", func(schedule, errorString string) {
cron = newDataImportCron(cronName)
cron.Spec.Schedule = schedule
dataSource = nil
retentionPolicy := cdiv1.DataImportCronRetainNone
cron.Spec.RetentionPolicy = &retentionPolicy
Expand All @@ -406,6 +409,12 @@ var _ = Describe("All DataImportCron Tests", func() {
Expect(*dv.Spec.Source.Registry.URL).To(Equal(testRegistryURL + "@" + testDigest))
Expect(dv.Annotations[cc.AnnImmediateBinding]).To(Equal("true"))

if cron.Spec.Schedule == emptySchedule {
cronjob := &batchv1.CronJob{}
err = reconciler.client.Get(context.TODO(), cronJobKey(cron), cronjob)
Expect(k8serrors.IsNotFound(err)).To(BeTrue())
}

dv.Status.Phase = cdiv1.ImportScheduled
err = reconciler.client.Update(context.TODO(), dv)
Expect(err).ToNot(HaveOccurred())
Expand Down Expand Up @@ -450,7 +459,10 @@ var _ = Describe("All DataImportCron Tests", func() {
err = reconciler.client.List(context.TODO(), dvList, &client.ListOptions{})
Expect(err).ToNot(HaveOccurred())
Expect(dvList.Items).To(BeEmpty())
})
},
Entry("default schedule", defaultSchedule, "should succeed with a default schedule"),
Entry("empty schedule", emptySchedule, "should succeed with an empty schedule"),
)

It("Should not create DV if PVC exists on DesiredDigest update; Should update DIC and DAS, and GC LRU PVCs", func() {
const nPVCs = 3
Expand Down Expand Up @@ -838,7 +850,7 @@ func newDataImportCron(name string) *cdiv1.DataImportCron {
},
},
},
Schedule: "* * * * *",
Schedule: defaultSchedule,
ManagedDataSource: dataSourceName,
GarbageCollect: &garbageCollect,
ImportsToKeep: &importsToKeep,
Expand Down
48 changes: 46 additions & 2 deletions tests/dataimportcron_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
cdiv1 "kubevirt.io/containerized-data-importer-api/pkg/apis/core/v1beta1"
"kubevirt.io/containerized-data-importer/pkg/common"
"kubevirt.io/containerized-data-importer/pkg/controller"
cc "kubevirt.io/containerized-data-importer/pkg/controller/common"
"kubevirt.io/containerized-data-importer/tests/framework"
"kubevirt.io/containerized-data-importer/tests/utils"
)
Expand All @@ -29,6 +30,8 @@ const (
scheduleEveryMinute = "* * * * *"
scheduleOnceAYear = "0 0 1 1 *"
importsToKeep = 1
emptySchedule = ""
errorDigest = "sha256:12345678900987654321"
)

var _ = Describe("DataImportCron", func() {
Expand Down Expand Up @@ -60,7 +63,7 @@ var _ = Describe("DataImportCron", func() {

updateDigest := func(digest string) func(cron *cdiv1.DataImportCron) *cdiv1.DataImportCron {
return func(cron *cdiv1.DataImportCron) *cdiv1.DataImportCron {
cron.Annotations[controller.AnnSourceDesiredDigest] = digest
cc.AddAnnotation(cron, controller.AnnSourceDesiredDigest, digest)
return cron
}
}
Expand Down Expand Up @@ -118,7 +121,7 @@ var _ = Describe("DataImportCron", func() {
By("Set desired digest to nonexisting one")

//get and update!!!
retryOnceOnErr(updateDataImportCron(f.CdiClient, ns, cronName, updateDigest("sha256:12345678900987654321"))).Should(BeNil())
retryOnceOnErr(updateDataImportCron(f.CdiClient, ns, cronName, updateDigest(errorDigest))).Should(BeNil())

By("Wait for CurrentImports update")
Eventually(func() string {
Expand Down Expand Up @@ -275,6 +278,47 @@ var _ = Describe("DataImportCron", func() {
waitForDigest()
})

It("[test_id:XXXX] Should allow an empty schedule to trigger an external update to the source", func() {
By("Create DataImportCron with empty schedule")
cron = utils.NewDataImportCron(cronName, "5Gi", emptySchedule, dataSourceName, importsToKeep, *reg)
retentionPolicy := cdiv1.DataImportCronRetainNone
cron.Spec.RetentionPolicy = &retentionPolicy

cron, err = f.CdiClient.CdiV1beta1().DataImportCrons(ns).Create(context.TODO(), cron, metav1.CreateOptions{})
Expect(err).ToNot(HaveOccurred())

By("Create poller pod to update the DataImportCron digest")
importerImage := f.GetEnvVarValue("IMPORTER_IMAGE")
Expect(importerImage).ToNot(BeEmpty())

pod := &corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: pollerPodName}}
err = controller.InitPollerPodSpec(f.CrClient, cron, &pod.Spec, importerImage, corev1.PullIfNotPresent, log)
Expect(err).ToNot(HaveOccurred())

_, err = utils.CreatePod(f.K8sClient, f.CdiInstallNs, pod)
Expect(err).ToNot(HaveOccurred())

By("Wait for digest set by external poller")
waitForDigest()

waitForConditions(corev1.ConditionFalse, corev1.ConditionTrue)
By("Verify CurrentImports update")
currentImportDv := cron.Status.CurrentImports[0].DataVolumeName
Expect(currentImportDv).ToNot(BeEmpty())

By(fmt.Sprintf("Verify pvc was created %s", currentImportDv))
_, err = utils.WaitForPVC(f.K8sClient, ns, currentImportDv)
Expect(err).ToNot(HaveOccurred())

By("Wait for import completion")
err = utils.WaitForDataVolumePhase(f, ns, cdiv1.Succeeded, currentImportDv)
Expect(err).ToNot(HaveOccurred(), "Datavolume not in phase succeeded in time")

By("Verify cronjob was not created")
_, err = f.K8sClient.BatchV1().CronJobs(f.CdiInstallNs).Get(context.TODO(), controller.GetCronJobName(cron), metav1.GetOptions{})
Expect(errors.IsNotFound(err)).To(BeTrue())
})

It("[test_id:7406] succeed garbage collecting old PVCs when importing new ones", func() {
garbagePVCs := 3
for i := 0; i < garbagePVCs; i++ {
Expand Down

0 comments on commit e51c0ca

Please sign in to comment.