From a9c554ae4123f5bff0937bdebad9af10fa247786 Mon Sep 17 00:00:00 2001 From: Ankita Thomas Date: Tue, 29 Aug 2023 15:39:44 -0400 Subject: [PATCH] preserve failed unpack jobs, enforce minimum interval between failing unpack jobs Signed-off-by: Ankita Thomas --- pkg/controller/bundle/bundle_unpacker.go | 80 ++++++++++++++++--- pkg/controller/bundle/bundle_unpacker_test.go | 2 +- .../bundle/bundlefakes/fake_unpacker.go | 2 +- pkg/controller/operators/catalog/operator.go | 11 ++- 4 files changed, 80 insertions(+), 15 deletions(-) diff --git a/pkg/controller/bundle/bundle_unpacker.go b/pkg/controller/bundle/bundle_unpacker.go index c583aee780b..becf79c07c8 100644 --- a/pkg/controller/bundle/bundle_unpacker.go +++ b/pkg/controller/bundle/bundle_unpacker.go @@ -41,6 +41,13 @@ const ( // e.g 1m30s BundleUnpackTimeoutAnnotationKey = "operatorframework.io/bundle-unpack-timeout" BundleUnpackPodLabel = "job-name" + + // BundleUnpackRetryMinimumIntervalAnnotationKey sets a minimum interval to wait before + // attempting to recreate a failed unpack job for a bundle. + BundleUnpackRetryMinimumIntervalAnnotationKey = "operatorframework.io/bundle-unpack-min-retry-interval" + + // bundleUnpackRefLabel is used to filter for all unpack jobs for a specific bundle. + bundleUnpackRefLabel = "operatorframework.io/bundle-unpack-ref" ) type BundleUnpackResult struct { @@ -239,6 +246,7 @@ func (c *ConfigMapUnpacker) job(cmRef *corev1.ObjectReference, bundlePath string } job.SetNamespace(cmRef.Namespace) job.SetName(cmRef.Name) + job.SetLabels(map[string]string{bundleUnpackRefLabel: cmRef.Name}) job.SetOwnerReferences([]metav1.OwnerReference{ownerRef(cmRef)}) if c.runAsUser > 0 { job.Spec.Template.Spec.SecurityContext.RunAsUser = &c.runAsUser @@ -279,7 +287,7 @@ func (c *ConfigMapUnpacker) job(cmRef *corev1.ObjectReference, bundlePath string //go:generate go run github.com/maxbrunsfeld/counterfeiter/v6 . Unpacker type Unpacker interface { - UnpackBundle(lookup *operatorsv1alpha1.BundleLookup, timeout time.Duration) (result *BundleUnpackResult, err error) + UnpackBundle(lookup *operatorsv1alpha1.BundleLookup, timeout, retryInterval time.Duration) (result *BundleUnpackResult, err error) } type ConfigMapUnpacker struct { @@ -440,7 +448,7 @@ const ( NotUnpackedMessage = "bundle contents have not yet been persisted to installplan status" ) -func (c *ConfigMapUnpacker) UnpackBundle(lookup *operatorsv1alpha1.BundleLookup, timeout time.Duration) (result *BundleUnpackResult, err error) { +func (c *ConfigMapUnpacker) UnpackBundle(lookup *operatorsv1alpha1.BundleLookup, timeout, retryInterval time.Duration) (result *BundleUnpackResult, err error) { result = newBundleUnpackResult(lookup) // if bundle lookup failed condition already present, then there is nothing more to do @@ -502,7 +510,7 @@ func (c *ConfigMapUnpacker) UnpackBundle(lookup *operatorsv1alpha1.BundleLookup, secrets = append(secrets, corev1.LocalObjectReference{Name: secretName}) } var job *batchv1.Job - job, err = c.ensureJob(cmRef, result.Path, secrets, timeout) + job, err = c.ensureJob(cmRef, result.Path, secrets, timeout, retryInterval) if err != nil || job == nil { // ensureJob can return nil if the job present does not match the expected job (spec and ownerefs) // The current job is deleted in that case so UnpackBundle needs to be retried @@ -641,7 +649,7 @@ func (c *ConfigMapUnpacker) ensureConfigmap(csRef *corev1.ObjectReference, name return } -func (c *ConfigMapUnpacker) ensureJob(cmRef *corev1.ObjectReference, bundlePath string, secrets []corev1.LocalObjectReference, timeout time.Duration) (job *batchv1.Job, err error) { +func (c *ConfigMapUnpacker) ensureJob(cmRef *corev1.ObjectReference, bundlePath string, secrets []corev1.LocalObjectReference, timeout time.Duration, unpackRetryInterval time.Duration) (job *batchv1.Job, err error) { fresh := c.job(cmRef, bundlePath, secrets, timeout) job, err = c.jobLister.Jobs(fresh.GetNamespace()).Get(fresh.GetName()) if err != nil { @@ -651,13 +659,40 @@ func (c *ConfigMapUnpacker) ensureJob(cmRef *corev1.ObjectReference, bundlePath return } - // Cleanup old unpacking job and retry - if _, isFailed := getCondition(job, batchv1.JobFailed); isFailed { - err = c.client.BatchV1().Jobs(job.GetNamespace()).Delete(context.TODO(), job.GetName(), metav1.DeleteOptions{}) - if err == nil { - job, err = c.client.BatchV1().Jobs(fresh.GetNamespace()).Create(context.TODO(), fresh, metav1.CreateOptions{}) + + // only check for retries if an unpackRetryInterval is specified + if unpackRetryInterval > 0 { + if failedCond, isFailed := getCondition(job, batchv1.JobFailed); isFailed { + lastFailureTime := failedCond.LastTransitionTime.Time + // Look for other unpack jobs for the same bundle + var jobs []*batchv1.Job + jobs, err = c.jobLister.Jobs(fresh.GetNamespace()).List(k8slabels.ValidatedSetSelector{bundleUnpackRefLabel: cmRef.Name}) + if err != nil { + return + } + + var failed bool + var cond *batchv1.JobCondition + for _, j := range jobs { + cond, failed = getCondition(j, batchv1.JobFailed) + if !failed { + // found an in-progress unpack attempt + job = j + break + } + if cond != nil && lastFailureTime.Before(cond.LastTransitionTime.Time) { + lastFailureTime = cond.LastTransitionTime.Time + } + } + + if failed { + if time.Now().After(lastFailureTime.Add(unpackRetryInterval)) { + fresh.SetName(fmt.Sprintf("%s-%d", fresh.GetName(), len(jobs))) + job, err = c.client.BatchV1().Jobs(fresh.GetNamespace()).Create(context.TODO(), fresh, metav1.CreateOptions{}) + return + } + } } - return } if equality.Semantic.DeepDerivative(fresh.GetOwnerReferences(), job.GetOwnerReferences()) && equality.Semantic.DeepDerivative(fresh.Spec, job.Spec) { @@ -825,3 +860,28 @@ func OperatorGroupBundleUnpackTimeout(ogLister v1listers.OperatorGroupNamespaceL return d, nil } + +// OperatorGroupBundleUnpackRetryInterval returns bundle unpack retry interval from annotation if specified. +// If the retry annotation is not set, return retry = 0 which is subsequently ignored. This interval, if > 0, +// determines the minimum interval between recreating a failed unpack job. +func OperatorGroupBundleUnpackRetryInterval(ogLister v1listers.OperatorGroupNamespaceLister) (time.Duration, error) { + ogs, err := ogLister.List(k8slabels.Everything()) + if err != nil { + return 0, err + } + if len(ogs) != 1 { + return 0, fmt.Errorf("found %d operatorGroups, expected 1", len(ogs)) + } + + timeoutStr, ok := ogs[0].GetAnnotations()[BundleUnpackRetryMinimumIntervalAnnotationKey] + if !ok { + return 0, nil + } + + d, err := time.ParseDuration(timeoutStr) + if err != nil { + return 0, fmt.Errorf("failed to parse unpack timeout annotation(%s: %s): %w", BundleUnpackRetryMinimumIntervalAnnotationKey, timeoutStr, err) + } + + return d, nil +} diff --git a/pkg/controller/bundle/bundle_unpacker_test.go b/pkg/controller/bundle/bundle_unpacker_test.go index ffb006d45c5..dffcbe6a02e 100644 --- a/pkg/controller/bundle/bundle_unpacker_test.go +++ b/pkg/controller/bundle/bundle_unpacker_test.go @@ -1654,7 +1654,7 @@ func TestConfigMapUnpacker(t *testing.T) { ) require.NoError(t, err) - res, err := unpacker.UnpackBundle(tt.args.lookup, tt.args.annotationTimeout) + res, err := unpacker.UnpackBundle(tt.args.lookup, tt.args.annotationTimeout, 0) require.Equal(t, tt.expected.err, err) if tt.expected.res == nil { diff --git a/pkg/controller/bundle/bundlefakes/fake_unpacker.go b/pkg/controller/bundle/bundlefakes/fake_unpacker.go index 56b666c5425..baf0f7bdd7b 100644 --- a/pkg/controller/bundle/bundlefakes/fake_unpacker.go +++ b/pkg/controller/bundle/bundlefakes/fake_unpacker.go @@ -28,7 +28,7 @@ type FakeUnpacker struct { invocationsMutex sync.RWMutex } -func (fake *FakeUnpacker) UnpackBundle(arg1 *v1alpha1.BundleLookup, arg2 time.Duration) (*bundle.BundleUnpackResult, error) { +func (fake *FakeUnpacker) UnpackBundle(arg1 *v1alpha1.BundleLookup, arg2, arg3 time.Duration) (*bundle.BundleUnpackResult, error) { fake.unpackBundleMutex.Lock() ret, specificReturn := fake.unpackBundleReturnsOnCall[len(fake.unpackBundleArgsForCall)] fake.unpackBundleArgsForCall = append(fake.unpackBundleArgsForCall, struct { diff --git a/pkg/controller/operators/catalog/operator.go b/pkg/controller/operators/catalog/operator.go index aa7eb9668b7..e32f9d66026 100644 --- a/pkg/controller/operators/catalog/operator.go +++ b/pkg/controller/operators/catalog/operator.go @@ -949,6 +949,11 @@ func (o *Operator) syncResolvingNamespace(obj interface{}) error { return err } + minUnpackRetryInterval, err := bundle.OperatorGroupBundleUnpackRetryInterval(ogLister) + if err != nil { + return err + } + // TODO: parallel maxGeneration := 0 subscriptionUpdated := false @@ -1044,7 +1049,7 @@ func (o *Operator) syncResolvingNamespace(obj interface{}) error { logger.Debug("unpacking bundles") var unpacked bool - unpacked, steps, bundleLookups, err = o.unpackBundles(namespace, steps, bundleLookups, unpackTimeout) + unpacked, steps, bundleLookups, err = o.unpackBundles(namespace, steps, bundleLookups, unpackTimeout, minUnpackRetryInterval) if err != nil { // If the error was fatal capture and fail if olmerrors.IsFatal(err) { @@ -1501,7 +1506,7 @@ type UnpackedBundleReference struct { Properties string `json:"properties"` } -func (o *Operator) unpackBundles(namespace string, installPlanSteps []*v1alpha1.Step, bundleLookups []v1alpha1.BundleLookup, unpackTimeout time.Duration) (bool, []*v1alpha1.Step, []v1alpha1.BundleLookup, error) { +func (o *Operator) unpackBundles(namespace string, installPlanSteps []*v1alpha1.Step, bundleLookups []v1alpha1.BundleLookup, unpackTimeout, minUnpackRetryInterval time.Duration) (bool, []*v1alpha1.Step, []v1alpha1.BundleLookup, error) { unpacked := true outBundleLookups := make([]v1alpha1.BundleLookup, len(bundleLookups)) @@ -1516,7 +1521,7 @@ func (o *Operator) unpackBundles(namespace string, installPlanSteps []*v1alpha1. var errs []error for i := 0; i < len(outBundleLookups); i++ { lookup := outBundleLookups[i] - res, err := o.bundleUnpacker.UnpackBundle(&lookup, unpackTimeout) + res, err := o.bundleUnpacker.UnpackBundle(&lookup, unpackTimeout, minUnpackRetryInterval) if err != nil { errs = append(errs, err) continue