Skip to content

Commit

Permalink
enable empty schedule in dataimportcron
Browse files Browse the repository at this point in the history
Signed-off-by: Ido Aharon <iaharon@redhat.com>
  • Loading branch information
ido106 committed May 21, 2023
1 parent 210b9db commit 029bebc
Show file tree
Hide file tree
Showing 5 changed files with 81 additions and 15 deletions.
16 changes: 9 additions & 7 deletions pkg/apiserver/webhooks/dataimportcron-validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,13 +116,15 @@ func (wh *dataImportCronValidatingWebhook) validateDataImportCronSpec(request *a
return causes
}

if _, err := cronexpr.Parse(spec.Schedule); err != nil {
causes = append(causes, metav1.StatusCause{
Type: metav1.CauseTypeFieldValueInvalid,
Message: "Illegal cron schedule",
Field: field.Child("Schedule").String(),
})
return causes
if spec.Schedule != "" {
if _, err := cronexpr.Parse(spec.Schedule); err != nil {
causes = append(causes, metav1.StatusCause{
Type: metav1.CauseTypeFieldValueInvalid,
Message: "Illegal cron schedule",
Field: field.Child("Schedule").String(),
})
return causes
}
}

if spec.ImportsToKeep != nil && *spec.ImportsToKeep < 0 {
Expand Down
6 changes: 6 additions & 0 deletions pkg/apiserver/webhooks/dataimportcron-validate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,12 @@ var _ = Describe("Validating Webhook", func() {
resp := validateDataImportCronCreate(cron)
Expect(resp.Allowed).To(BeFalse())
})
It("should allow DataImportCron with empty cron schedule", func() {
cron := newDataImportCron(cdiv1.DataVolumeSourceRegistry{URL: &testRegistryURL})
cron.Spec.Schedule = ""
resp := validateDataImportCronCreate(cron)
Expect(resp.Allowed).To(BeTrue())
})
It("should reject DataImportCron with illegal ManagedDataSource on create", func() {
cron := newDataImportCron(cdiv1.DataVolumeSourceRegistry{URL: &testRegistryURL})
cron.Spec.ManagedDataSource = ""
Expand Down
10 changes: 7 additions & 3 deletions pkg/controller/dataimportcron-controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,8 +127,11 @@ func (r *DataImportCronReconciler) Reconcile(ctx context.Context, req reconcile.
if !shouldReconcile || err != nil {
return reconcile.Result{}, err
}
if err := r.initCron(ctx, dataImportCron); err != nil {
return reconcile.Result{}, err
// Cron job and initial job are not initialized if no schedule, allowing external source update trigger
if dataImportCron.Spec.Schedule != "" {
if err := r.initCron(ctx, dataImportCron); err != nil {
return reconcile.Result{}, err
}
}
return r.update(ctx, dataImportCron)
}
Expand Down Expand Up @@ -348,7 +351,8 @@ func (r *DataImportCronReconciler) update(ctx context.Context, dataImportCron *c
}

// We use the poller returned reconcile.Result for RequeueAfter if needed
if isImageStreamSource(dataImportCron) {
// skip if we disabled schedule
if isImageStreamSource(dataImportCron) && dataImportCron.Spec.Schedule != "" {
res, err = r.pollImageStreamDigest(ctx, dataImportCron)
if err != nil {
return res, err
Expand Down
18 changes: 15 additions & 3 deletions pkg/controller/dataimportcron-controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ const (
imageStreamName = "test-imagestream"
imageStreamTag = "test-imagestream-tag"
tagWithNoItems = "tag-with-no-items"
defaultSchedule = "* * * * *"
emptySchedule = ""
)

type possiblyErroringFakeCtrlRuntimeClient struct {
Expand Down Expand Up @@ -378,8 +380,9 @@ var _ = Describe("All DataImportCron Tests", func() {
Expect(cronJob).To(Equal(cronJobCopy))
})

It("Should create DataVolume on AnnSourceDesiredDigest annotation update, and update DataImportCron and DataSource on DataVolume Succeeded", func() {
DescribeTable("Should create DataVolume on AnnSourceDesiredDigest annotation update, and update DataImportCron and DataSource on DataVolume Succeeded", func(schedule, errorString string) {
cron = newDataImportCron(cronName)
cron.Spec.Schedule = schedule
dataSource = nil
retentionPolicy := cdiv1.DataImportCronRetainNone
cron.Spec.RetentionPolicy = &retentionPolicy
Expand All @@ -406,6 +409,12 @@ var _ = Describe("All DataImportCron Tests", func() {
Expect(*dv.Spec.Source.Registry.URL).To(Equal(testRegistryURL + "@" + testDigest))
Expect(dv.Annotations[cc.AnnImmediateBinding]).To(Equal("true"))

if cron.Spec.Schedule == emptySchedule {
cronjob := &batchv1.CronJob{}
err = reconciler.client.Get(context.TODO(), cronJobKey(cron), cronjob)
Expect(k8serrors.IsNotFound(err)).To(BeTrue())
}

dv.Status.Phase = cdiv1.ImportScheduled
err = reconciler.client.Update(context.TODO(), dv)
Expect(err).ToNot(HaveOccurred())
Expand Down Expand Up @@ -450,7 +459,10 @@ var _ = Describe("All DataImportCron Tests", func() {
err = reconciler.client.List(context.TODO(), dvList, &client.ListOptions{})
Expect(err).ToNot(HaveOccurred())
Expect(dvList.Items).To(BeEmpty())
})
},
Entry("default schedule", defaultSchedule, "should succeed with a default schedule"),
Entry("empty schedule", emptySchedule, "should succeed with an empty schedule"),
)

It("Should not create DV if PVC exists on DesiredDigest update; Should update DIC and DAS, and GC LRU PVCs", func() {
const nPVCs = 3
Expand Down Expand Up @@ -838,7 +850,7 @@ func newDataImportCron(name string) *cdiv1.DataImportCron {
},
},
},
Schedule: "* * * * *",
Schedule: defaultSchedule,
ManagedDataSource: dataSourceName,
GarbageCollect: &garbageCollect,
ImportsToKeep: &importsToKeep,
Expand Down
46 changes: 44 additions & 2 deletions tests/dataimportcron_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
cdiv1 "kubevirt.io/containerized-data-importer-api/pkg/apis/core/v1beta1"
"kubevirt.io/containerized-data-importer/pkg/common"
"kubevirt.io/containerized-data-importer/pkg/controller"
cc "kubevirt.io/containerized-data-importer/pkg/controller/common"
"kubevirt.io/containerized-data-importer/tests/framework"
"kubevirt.io/containerized-data-importer/tests/utils"
)
Expand All @@ -28,6 +29,8 @@ const (
scheduleEveryMinute = "* * * * *"
scheduleOnceAYear = "0 0 1 1 *"
importsToKeep = 1
emptySchedule = ""
errorDigest = "sha256:12345678900987654321"
)

var _ = Describe("DataImportCron", func() {
Expand All @@ -45,7 +48,7 @@ var _ = Describe("DataImportCron", func() {

updateDigest := func(digest string) func(cron *cdiv1.DataImportCron) *cdiv1.DataImportCron {
return func(cron *cdiv1.DataImportCron) *cdiv1.DataImportCron {
cron.Annotations[controller.AnnSourceDesiredDigest] = digest
cc.AddAnnotation(cron, controller.AnnSourceDesiredDigest, digest)
return cron
}
}
Expand Down Expand Up @@ -103,7 +106,7 @@ var _ = Describe("DataImportCron", func() {
By("Set desired digest to nonexisting one")

//get and update!!!
retryOnceOnErr(updateDataImportCron(f.CdiClient, ns, cronName, updateDigest("sha256:12345678900987654321"))).Should(BeNil())
retryOnceOnErr(updateDataImportCron(f.CdiClient, ns, cronName, updateDigest(errorDigest))).Should(BeNil())

By("Wait for CurrentImports update")
Eventually(func() string {
Expand Down Expand Up @@ -235,6 +238,45 @@ var _ = Describe("DataImportCron", func() {
table.Entry("[test_id:8266] succeed deleting error DVs when importing new ones", false, true, 2),
)

It("[test_id:xxxx] Should allow an empty schedule to trigger an external update to the source", func() {
reg, err := getDataVolumeSourceRegistry(f)
Expect(err).ToNot(HaveOccurred())
defer func() {
if err := utils.RemoveInsecureRegistry(f.CrClient, *reg.URL); err != nil {
_, _ = fmt.Fprintf(GinkgoWriter, "failed to remove registry; %v", err)
}
}()

By("Create DataImportCron with empty schedule")
cron = utils.NewDataImportCron(cronName, "5Gi", emptySchedule, dataSourceName, importsToKeep, *reg)
retentionPolicy := cdiv1.DataImportCronRetainNone
cron.Spec.RetentionPolicy = &retentionPolicy

cron, err = f.CdiClient.CdiV1beta1().DataImportCrons(ns).Create(context.TODO(), cron, metav1.CreateOptions{})
Expect(err).ToNot(HaveOccurred())

By("Set desired digest to nonexisting one")
//FIXME: explicitly create the poller pod to update the DIC digest, instead of using this magic digest
retryOnceOnErr(updateDataImportCron(f.CdiClient, ns, cronName, updateDigest("sha256:63c30e7080344125452f9b2889c8ed40815ab1d7053e8024ad6032c616c97d8d"))).Should(BeNil())

waitForConditions(corev1.ConditionFalse, corev1.ConditionTrue)
By("Verify CurrentImports update")
currentImportDv := cron.Status.CurrentImports[0].DataVolumeName
Expect(currentImportDv).ToNot(BeEmpty())

By(fmt.Sprintf("Verify pvc was created %s", currentImportDv))
_, err = utils.WaitForPVC(f.K8sClient, ns, currentImportDv)
Expect(err).ToNot(HaveOccurred())

By("Wait for import completion")
err = utils.WaitForDataVolumePhase(f, ns, cdiv1.Succeeded, currentImportDv)
Expect(err).ToNot(HaveOccurred(), "Datavolume not in phase succeeded in time")

By("Verify cronjob was not created")
_, err = f.K8sClient.BatchV1().CronJobs(f.CdiInstallNs).Get(context.TODO(), controller.GetCronJobName(cron), metav1.GetOptions{})
Expect(errors.IsNotFound(err)).To(BeTrue())
})

It("[test_id:7406] succeed garbage collecting old PVCs when importing new ones", func() {
reg, err := getDataVolumeSourceRegistry(f)
Expect(err).ToNot(HaveOccurred())
Expand Down

0 comments on commit 029bebc

Please sign in to comment.