diff --git a/pkg/controller/bundle/bundle_unpacker.go b/pkg/controller/bundle/bundle_unpacker.go index 77f2912e87..cfe46187bb 100644 --- a/pkg/controller/bundle/bundle_unpacker.go +++ b/pkg/controller/bundle/bundle_unpacker.go @@ -4,6 +4,7 @@ import ( "context" "crypto/sha256" "fmt" + "sort" "strings" "time" @@ -660,47 +661,35 @@ func (c *ConfigMapUnpacker) ensureConfigmap(csRef *corev1.ObjectReference, name func (c *ConfigMapUnpacker) ensureJob(cmRef *corev1.ObjectReference, bundlePath string, secrets []corev1.LocalObjectReference, timeout time.Duration, unpackRetryInterval time.Duration) (job *batchv1.Job, err error) { fresh := c.job(cmRef, bundlePath, secrets, timeout) - job, err = c.jobLister.Jobs(fresh.GetNamespace()).Get(fresh.GetName()) + var jobs, toDelete []*batchv1.Job + jobs, err = c.jobLister.Jobs(fresh.GetNamespace()).List(k8slabels.ValidatedSetSelector{bundleUnpackRefLabel: cmRef.Name}) if err != nil { - if apierrors.IsNotFound(err) { - job, err = c.client.BatchV1().Jobs(fresh.GetNamespace()).Create(context.TODO(), fresh, metav1.CreateOptions{}) - } - return } + if len(jobs) == 0 { + job, err = c.client.BatchV1().Jobs(fresh.GetNamespace()).Create(context.TODO(), fresh, metav1.CreateOptions{}) + return + } + + maxRetainedJobs := 5 // TODO: make this configurable + job, toDelete = sortUnpackJobs(jobs, maxRetainedJobs) // choose latest or on-failed job attempt // only check for retries if an unpackRetryInterval is specified if unpackRetryInterval > 0 { - if failedCond, isFailed := getCondition(job, batchv1.JobFailed); isFailed { - lastFailureTime := failedCond.LastTransitionTime.Time + if _, isFailed := getCondition(job, batchv1.JobFailed); isFailed { // Look for other unpack jobs for the same bundle - var jobs []*batchv1.Job - jobs, err = c.jobLister.Jobs(fresh.GetNamespace()).List(k8slabels.ValidatedSetSelector{bundleUnpackRefLabel: cmRef.Name}) - if err != nil { - return - } - - var failed bool - var cond *batchv1.JobCondition - for _, j := range jobs { - cond, failed = getCondition(j, batchv1.JobFailed) - if !failed { - // found an in-progress unpack attempt - job = j - break - } - if cond != nil && lastFailureTime.Before(cond.LastTransitionTime.Time) { - lastFailureTime = cond.LastTransitionTime.Time - } - } - - if failed { - if time.Now().After(lastFailureTime.Add(unpackRetryInterval)) { + if cond, failed := getCondition(job, batchv1.JobFailed); failed { + if time.Now().After(cond.LastTransitionTime.Time.Add(unpackRetryInterval)) { fresh.SetName(names.SimpleNameGenerator.GenerateName(fresh.GetName())) job, err = c.client.BatchV1().Jobs(fresh.GetNamespace()).Create(context.TODO(), fresh, metav1.CreateOptions{}) } - return } + + // cleanup old failed jobs, but don't clean up successful jobs to avoid repeat unpacking + for _, j := range toDelete { + _ = c.client.BatchV1().Jobs(j.GetNamespace()).Delete(context.TODO(), j.GetName(), metav1.DeleteOptions{}) + } + return } } @@ -845,6 +834,37 @@ func getCondition(job *batchv1.Job, conditionType batchv1.JobConditionType) (con return } +func sortUnpackJobs(jobs []*batchv1.Job, maxRetainedJobs int) (latest *batchv1.Job, toDelete []*batchv1.Job) { + if len(jobs) == 0 { + return + } + // sort jobs so that latest job is first + // with preference for non-failed jobs + sort.Slice(jobs, func(i, j int) bool { + condI, failedI := getCondition(jobs[i], batchv1.JobFailed) + condJ, failedJ := getCondition(jobs[j], batchv1.JobFailed) + if failedI != failedJ { + return !failedI // non-failed job goes first + } + return condI.LastTransitionTime.After(condJ.LastTransitionTime.Time) + }) + latest = jobs[0] + if len(jobs) <= maxRetainedJobs { + return + } + if maxRetainedJobs == 0 { + toDelete = jobs[1:] + return + } + + // cleanup old failed jobs, n-1 recent jobs and the oldest job + for i := 0; i < maxRetainedJobs && i+maxRetainedJobs < len(jobs); i++ { + toDelete = append(toDelete, jobs[maxRetainedJobs+i]) + } + + return +} + // OperatorGroupBundleUnpackTimeout returns bundle timeout from annotation if specified. // If the timeout annotation is not set, return timeout < 0 which is subsequently ignored. // This is to overrides the --bundle-unpack-timeout flag value on per-OperatorGroup basis. diff --git a/pkg/controller/bundle/bundle_unpacker_test.go b/pkg/controller/bundle/bundle_unpacker_test.go index 9f0d0652da..a583eb5412 100644 --- a/pkg/controller/bundle/bundle_unpacker_test.go +++ b/pkg/controller/bundle/bundle_unpacker_test.go @@ -441,7 +441,7 @@ func TestConfigMapUnpacker(t *testing.T) { ObjectMeta: metav1.ObjectMeta{ Name: digestHash, Namespace: "ns-a", - Labels: map[string]string{install.OLMManagedLabelKey: install.OLMManagedLabelValue}, + Labels: map[string]string{install.OLMManagedLabelKey: install.OLMManagedLabelValue, bundleUnpackRefLabel: digestHash}, OwnerReferences: []metav1.OwnerReference{ { APIVersion: "v1", @@ -712,7 +712,7 @@ func TestConfigMapUnpacker(t *testing.T) { ObjectMeta: metav1.ObjectMeta{ Name: digestHash, Namespace: "ns-a", - Labels: map[string]string{install.OLMManagedLabelKey: install.OLMManagedLabelValue}, + Labels: map[string]string{install.OLMManagedLabelKey: install.OLMManagedLabelValue, bundleUnpackRefLabel: digestHash}, OwnerReferences: []metav1.OwnerReference{ { APIVersion: "v1", @@ -978,7 +978,7 @@ func TestConfigMapUnpacker(t *testing.T) { ObjectMeta: metav1.ObjectMeta{ Name: pathHash, Namespace: "ns-a", - Labels: map[string]string{install.OLMManagedLabelKey: install.OLMManagedLabelValue}, + Labels: map[string]string{install.OLMManagedLabelKey: install.OLMManagedLabelValue, bundleUnpackRefLabel: pathHash}, OwnerReferences: []metav1.OwnerReference{ { APIVersion: "v1", @@ -1213,7 +1213,7 @@ func TestConfigMapUnpacker(t *testing.T) { ObjectMeta: metav1.ObjectMeta{ Name: pathHash, Namespace: "ns-a", - Labels: map[string]string{install.OLMManagedLabelKey: install.OLMManagedLabelValue}, + Labels: map[string]string{install.OLMManagedLabelKey: install.OLMManagedLabelValue, bundleUnpackRefLabel: pathHash}, OwnerReferences: []metav1.OwnerReference{ { APIVersion: "v1", @@ -1459,7 +1459,7 @@ func TestConfigMapUnpacker(t *testing.T) { ObjectMeta: metav1.ObjectMeta{ Name: pathHash, Namespace: "ns-a", - Labels: map[string]string{install.OLMManagedLabelKey: install.OLMManagedLabelValue}, + Labels: map[string]string{install.OLMManagedLabelKey: install.OLMManagedLabelValue, bundleUnpackRefLabel: pathHash}, OwnerReferences: []metav1.OwnerReference{ { APIVersion: "v1", @@ -1951,3 +1951,101 @@ func TestOperatorGroupBundleUnpackRetryInterval(t *testing.T) { }) } } + +func TestSortUnpackJobs(t *testing.T) { + // if there is a non-failed job, it should be first + // otherwise, the latest job should be first + //first n-1 jobs and oldest job are preserved + testJob := func(name string, failed bool, ts int64) *batchv1.Job { + conditions := []batchv1.JobCondition{} + if failed { + conditions = append(conditions, batchv1.JobCondition{ + Type: batchv1.JobFailed, + Status: corev1.ConditionTrue, + LastTransitionTime: metav1.Time{Time: time.Unix(ts, 0)}, + }) + } + return &batchv1.Job{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Labels: map[string]string{install.OLMManagedLabelKey: install.OLMManagedLabelValue, bundleUnpackRefLabel: "test"}, + }, + Status: batchv1.JobStatus{ + Conditions: conditions, + }, + } + } + failedJobs := []*batchv1.Job{ + testJob("f-1", true, 1), + testJob("f-2", true, 2), + testJob("f-3", true, 3), + testJob("f-4", true, 4), + testJob("f-5", true, 5), + } + nonFailedJob := testJob("s-1", false, 1) + for _, tc := range []struct { + name string + jobs []*batchv1.Job + maxRetained int + expectedLatest *batchv1.Job + expectedToDelete []*batchv1.Job + }{ + { + name: "no job history", + maxRetained: 0, + jobs: []*batchv1.Job{ + failedJobs[1], + failedJobs[2], + failedJobs[0], + }, + expectedLatest: failedJobs[2], + expectedToDelete: []*batchv1.Job{ + failedJobs[1], + failedJobs[0], + }, + }, { + name: "empty job list", + maxRetained: 1, + }, { + name: "retain oldest", + maxRetained: 1, + jobs: []*batchv1.Job{ + failedJobs[2], + failedJobs[0], + failedJobs[1], + }, + expectedToDelete: []*batchv1.Job{ + failedJobs[1], + }, + expectedLatest: failedJobs[2], + }, { + name: "multiple old jobs", + maxRetained: 2, + jobs: []*batchv1.Job{ + failedJobs[1], + failedJobs[0], + failedJobs[2], + failedJobs[3], + failedJobs[4], + }, + expectedLatest: failedJobs[4], + expectedToDelete: []*batchv1.Job{ + failedJobs[1], + failedJobs[2], + }, + }, { + name: "select non-failed as latest", + maxRetained: 3, + jobs: []*batchv1.Job{ + failedJobs[0], + failedJobs[1], + nonFailedJob, + }, + expectedLatest: nonFailedJob, + }, + } { + latest, toDelete := sortUnpackJobs(tc.jobs, tc.maxRetained) + assert.Equal(t, tc.expectedLatest, latest) + assert.ElementsMatch(t, tc.expectedToDelete, toDelete) + } +} diff --git a/pkg/controller/operators/catalog/operator.go b/pkg/controller/operators/catalog/operator.go index 1842a06746..34fd983886 100644 --- a/pkg/controller/operators/catalog/operator.go +++ b/pkg/controller/operators/catalog/operator.go @@ -1669,7 +1669,7 @@ type UnpackedBundleReference struct { Properties string `json:"properties"` } -func (o *Operator) unpackBundles(namespace string, installPlanSteps []*v1alpha1.Step, bundleLookups []v1alpha1.BundleLookup, unpackTimeout, minUnpackRetryInterval time.Duration) (bool, []*v1alpha1.Step, []v1alpha1.BundleLookup, error) { +func (o *Operator) unpackBundles(namespace string, installPlanSteps []*v1alpha1.Step, bundleLookups []v1alpha1.BundleLookup, unpackTimeout, unpackRetryInterval time.Duration) (bool, []*v1alpha1.Step, []v1alpha1.BundleLookup, error) { unpacked := true outBundleLookups := make([]v1alpha1.BundleLookup, len(bundleLookups)) @@ -1684,7 +1684,7 @@ func (o *Operator) unpackBundles(namespace string, installPlanSteps []*v1alpha1. var errs []error for i := 0; i < len(outBundleLookups); i++ { lookup := outBundleLookups[i] - res, err := o.bundleUnpacker.UnpackBundle(&lookup, unpackTimeout, minUnpackRetryInterval) + res, err := o.bundleUnpacker.UnpackBundle(&lookup, unpackTimeout, unpackRetryInterval) if err != nil { errs = append(errs, err) continue diff --git a/test/e2e/registry.go b/test/e2e/registry.go index 5b292b92ce..cb1f4c87e0 100644 --- a/test/e2e/registry.go +++ b/test/e2e/registry.go @@ -54,7 +54,6 @@ func createDockerRegistry(client operatorclient.ClientInterface, namespace strin Port: int32(5000), }, }, - Type: corev1.ServiceTypeNodePort, }, } diff --git a/test/e2e/subscription_e2e_test.go b/test/e2e/subscription_e2e_test.go index 0615eccd52..fdad3895b7 100644 --- a/test/e2e/subscription_e2e_test.go +++ b/test/e2e/subscription_e2e_test.go @@ -2526,6 +2526,11 @@ var _ = Describe("Subscription", func() { }) When("bundle unpack retries are enabled", func() { It("should retry failing unpack jobs", func() { + if ok, err := inKind(c); ok && err == nil { + Skip("This spec fails when run using KIND cluster. See https://github.com/operator-framework/operator-lifecycle-manager/issues/2420 for more details") + } else if err != nil { + Skip("Could not determine whether running in a kind cluster. Skipping.") + } By("Ensuring a registry to host bundle images") local, err := Local(c) Expect(err).NotTo(HaveOccurred(), "cannot determine if test running locally or on CI: %s", err) @@ -2581,20 +2586,14 @@ var _ = Describe("Subscription", func() { } } - // testImage is the name of the image used throughout the test - the image overwritten by skopeo - // the tag is generated randomly and appended to the end of the testImage + // The remote image to be copied onto the local registry srcImage := "quay.io/olmtest/example-operator-bundle:" srcTag := "0.1.0" - bundleImage := fmt.Sprint(registryURL, "/unpack-retry-bundle", ":") + + // on-cluster image ref + bundleImage := registryURL + "/unpack-retry-bundle:" bundleTag := genName("x") - //// hash hashes data with sha256 and returns the hex string. - //func hash(data string) string { - // // A SHA256 hash is 64 characters, which is within the 253 character limit for kube resource names - // h := fmt.Sprintf("%x", sha256.Sum256([]byte(data))) - // - // // Make the hash 63 characters instead to comply with the 63 character limit for labels - // return fmt.Sprintf(h[:len(h)-1]) - //} + unpackRetryCatalog := fmt.Sprintf(` schema: olm.package name: unpack-retry-package