Skip to content

Commit

Permalink
preserve failed unpack jobs, enforce minimum interval between failing…
Browse files Browse the repository at this point in the history
… unpack jobs

Signed-off-by: Ankita Thomas <ankithom@redhat.com>
  • Loading branch information
ankitathomas committed Aug 30, 2023
1 parent 0610898 commit a9c554a
Show file tree
Hide file tree
Showing 4 changed files with 80 additions and 15 deletions.
80 changes: 70 additions & 10 deletions pkg/controller/bundle/bundle_unpacker.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,13 @@ const (
// e.g 1m30s
BundleUnpackTimeoutAnnotationKey = "operatorframework.io/bundle-unpack-timeout"
BundleUnpackPodLabel = "job-name"

// BundleUnpackRetryMinimumIntervalAnnotationKey sets a minimum interval to wait before
// attempting to recreate a failed unpack job for a bundle.
BundleUnpackRetryMinimumIntervalAnnotationKey = "operatorframework.io/bundle-unpack-min-retry-interval"

// bundleUnpackRefLabel is used to filter for all unpack jobs for a specific bundle.
bundleUnpackRefLabel = "operatorframework.io/bundle-unpack-ref"
)

type BundleUnpackResult struct {
Expand Down Expand Up @@ -239,6 +246,7 @@ func (c *ConfigMapUnpacker) job(cmRef *corev1.ObjectReference, bundlePath string
}
job.SetNamespace(cmRef.Namespace)
job.SetName(cmRef.Name)
job.SetLabels(map[string]string{bundleUnpackRefLabel: cmRef.Name})
job.SetOwnerReferences([]metav1.OwnerReference{ownerRef(cmRef)})
if c.runAsUser > 0 {
job.Spec.Template.Spec.SecurityContext.RunAsUser = &c.runAsUser
Expand Down Expand Up @@ -279,7 +287,7 @@ func (c *ConfigMapUnpacker) job(cmRef *corev1.ObjectReference, bundlePath string
//go:generate go run github.com/maxbrunsfeld/counterfeiter/v6 . Unpacker

type Unpacker interface {
UnpackBundle(lookup *operatorsv1alpha1.BundleLookup, timeout time.Duration) (result *BundleUnpackResult, err error)
UnpackBundle(lookup *operatorsv1alpha1.BundleLookup, timeout, retryInterval time.Duration) (result *BundleUnpackResult, err error)
}

type ConfigMapUnpacker struct {
Expand Down Expand Up @@ -440,7 +448,7 @@ const (
NotUnpackedMessage = "bundle contents have not yet been persisted to installplan status"
)

func (c *ConfigMapUnpacker) UnpackBundle(lookup *operatorsv1alpha1.BundleLookup, timeout time.Duration) (result *BundleUnpackResult, err error) {
func (c *ConfigMapUnpacker) UnpackBundle(lookup *operatorsv1alpha1.BundleLookup, timeout, retryInterval time.Duration) (result *BundleUnpackResult, err error) {
result = newBundleUnpackResult(lookup)

// if bundle lookup failed condition already present, then there is nothing more to do
Expand Down Expand Up @@ -502,7 +510,7 @@ func (c *ConfigMapUnpacker) UnpackBundle(lookup *operatorsv1alpha1.BundleLookup,
secrets = append(secrets, corev1.LocalObjectReference{Name: secretName})
}
var job *batchv1.Job
job, err = c.ensureJob(cmRef, result.Path, secrets, timeout)
job, err = c.ensureJob(cmRef, result.Path, secrets, timeout, retryInterval)
if err != nil || job == nil {
// ensureJob can return nil if the job present does not match the expected job (spec and ownerefs)
// The current job is deleted in that case so UnpackBundle needs to be retried
Expand Down Expand Up @@ -641,7 +649,7 @@ func (c *ConfigMapUnpacker) ensureConfigmap(csRef *corev1.ObjectReference, name
return
}

func (c *ConfigMapUnpacker) ensureJob(cmRef *corev1.ObjectReference, bundlePath string, secrets []corev1.LocalObjectReference, timeout time.Duration) (job *batchv1.Job, err error) {
func (c *ConfigMapUnpacker) ensureJob(cmRef *corev1.ObjectReference, bundlePath string, secrets []corev1.LocalObjectReference, timeout time.Duration, unpackRetryInterval time.Duration) (job *batchv1.Job, err error) {
fresh := c.job(cmRef, bundlePath, secrets, timeout)
job, err = c.jobLister.Jobs(fresh.GetNamespace()).Get(fresh.GetName())
if err != nil {
Expand All @@ -651,13 +659,40 @@ func (c *ConfigMapUnpacker) ensureJob(cmRef *corev1.ObjectReference, bundlePath

return
}
// Cleanup old unpacking job and retry
if _, isFailed := getCondition(job, batchv1.JobFailed); isFailed {
err = c.client.BatchV1().Jobs(job.GetNamespace()).Delete(context.TODO(), job.GetName(), metav1.DeleteOptions{})
if err == nil {
job, err = c.client.BatchV1().Jobs(fresh.GetNamespace()).Create(context.TODO(), fresh, metav1.CreateOptions{})

// only check for retries if an unpackRetryInterval is specified
if unpackRetryInterval > 0 {
if failedCond, isFailed := getCondition(job, batchv1.JobFailed); isFailed {
lastFailureTime := failedCond.LastTransitionTime.Time
// Look for other unpack jobs for the same bundle
var jobs []*batchv1.Job
jobs, err = c.jobLister.Jobs(fresh.GetNamespace()).List(k8slabels.ValidatedSetSelector{bundleUnpackRefLabel: cmRef.Name})
if err != nil {
return
}

var failed bool
var cond *batchv1.JobCondition
for _, j := range jobs {
cond, failed = getCondition(j, batchv1.JobFailed)
if !failed {
// found an in-progress unpack attempt
job = j
break
}
if cond != nil && lastFailureTime.Before(cond.LastTransitionTime.Time) {
lastFailureTime = cond.LastTransitionTime.Time
}
}

if failed {
if time.Now().After(lastFailureTime.Add(unpackRetryInterval)) {
fresh.SetName(fmt.Sprintf("%s-%d", fresh.GetName(), len(jobs)))
job, err = c.client.BatchV1().Jobs(fresh.GetNamespace()).Create(context.TODO(), fresh, metav1.CreateOptions{})
return
}
}
}
return
}

if equality.Semantic.DeepDerivative(fresh.GetOwnerReferences(), job.GetOwnerReferences()) && equality.Semantic.DeepDerivative(fresh.Spec, job.Spec) {
Expand Down Expand Up @@ -825,3 +860,28 @@ func OperatorGroupBundleUnpackTimeout(ogLister v1listers.OperatorGroupNamespaceL

return d, nil
}

// OperatorGroupBundleUnpackRetryInterval returns bundle unpack retry interval from annotation if specified.
// If the retry annotation is not set, return retry = 0 which is subsequently ignored. This interval, if > 0,
// determines the minimum interval between recreating a failed unpack job.
func OperatorGroupBundleUnpackRetryInterval(ogLister v1listers.OperatorGroupNamespaceLister) (time.Duration, error) {
ogs, err := ogLister.List(k8slabels.Everything())
if err != nil {
return 0, err
}
if len(ogs) != 1 {
return 0, fmt.Errorf("found %d operatorGroups, expected 1", len(ogs))
}

timeoutStr, ok := ogs[0].GetAnnotations()[BundleUnpackRetryMinimumIntervalAnnotationKey]
if !ok {
return 0, nil
}

d, err := time.ParseDuration(timeoutStr)
if err != nil {
return 0, fmt.Errorf("failed to parse unpack timeout annotation(%s: %s): %w", BundleUnpackRetryMinimumIntervalAnnotationKey, timeoutStr, err)
}

return d, nil
}
2 changes: 1 addition & 1 deletion pkg/controller/bundle/bundle_unpacker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1654,7 +1654,7 @@ func TestConfigMapUnpacker(t *testing.T) {
)
require.NoError(t, err)

res, err := unpacker.UnpackBundle(tt.args.lookup, tt.args.annotationTimeout)
res, err := unpacker.UnpackBundle(tt.args.lookup, tt.args.annotationTimeout, 0)
require.Equal(t, tt.expected.err, err)

if tt.expected.res == nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/bundle/bundlefakes/fake_unpacker.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 8 additions & 3 deletions pkg/controller/operators/catalog/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -949,6 +949,11 @@ func (o *Operator) syncResolvingNamespace(obj interface{}) error {
return err
}

minUnpackRetryInterval, err := bundle.OperatorGroupBundleUnpackRetryInterval(ogLister)
if err != nil {
return err
}

// TODO: parallel
maxGeneration := 0
subscriptionUpdated := false
Expand Down Expand Up @@ -1044,7 +1049,7 @@ func (o *Operator) syncResolvingNamespace(obj interface{}) error {
logger.Debug("unpacking bundles")

var unpacked bool
unpacked, steps, bundleLookups, err = o.unpackBundles(namespace, steps, bundleLookups, unpackTimeout)
unpacked, steps, bundleLookups, err = o.unpackBundles(namespace, steps, bundleLookups, unpackTimeout, minUnpackRetryInterval)
if err != nil {
// If the error was fatal capture and fail
if olmerrors.IsFatal(err) {
Expand Down Expand Up @@ -1501,7 +1506,7 @@ type UnpackedBundleReference struct {
Properties string `json:"properties"`
}

func (o *Operator) unpackBundles(namespace string, installPlanSteps []*v1alpha1.Step, bundleLookups []v1alpha1.BundleLookup, unpackTimeout time.Duration) (bool, []*v1alpha1.Step, []v1alpha1.BundleLookup, error) {
func (o *Operator) unpackBundles(namespace string, installPlanSteps []*v1alpha1.Step, bundleLookups []v1alpha1.BundleLookup, unpackTimeout, minUnpackRetryInterval time.Duration) (bool, []*v1alpha1.Step, []v1alpha1.BundleLookup, error) {
unpacked := true

outBundleLookups := make([]v1alpha1.BundleLookup, len(bundleLookups))
Expand All @@ -1516,7 +1521,7 @@ func (o *Operator) unpackBundles(namespace string, installPlanSteps []*v1alpha1.
var errs []error
for i := 0; i < len(outBundleLookups); i++ {
lookup := outBundleLookups[i]
res, err := o.bundleUnpacker.UnpackBundle(&lookup, unpackTimeout)
res, err := o.bundleUnpacker.UnpackBundle(&lookup, unpackTimeout, minUnpackRetryInterval)
if err != nil {
errs = append(errs, err)
continue
Expand Down

0 comments on commit a9c554a

Please sign in to comment.