Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(csv): CSV update process optimization regression #407

Merged
merged 9 commits into from
Aug 7, 2018
57 changes: 25 additions & 32 deletions pkg/controller/operators/olm/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
log "github.com/sirupsen/logrus"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/informers"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
Expand Down Expand Up @@ -105,16 +104,16 @@ func NewOperator(kubeconfig string, wakeupInterval time.Duration, annotations ma
return op, nil
}

func (a *Operator) requeueCSV(csv *v1alpha1.ClusterServiceVersion) {
k, err := cache.DeletionHandlingMetaNamespaceKeyFunc(csv)
if err != nil {
log.Infof("creating key failed: %s", err)
return
}
log.Infof("requeueing %s", csv.SelfLink)
a.csvQueue.AddRateLimited(k)
return
}
//func (a *Operator) requeueCSV(csv *v1alpha1.ClusterServiceVersion) {
// k, err := cache.DeletionHandlingMetaNamespaceKeyFunc(csv)
// if err != nil {
// log.Infof("creating key failed: %s", err)
// return
// }
// log.Infof("requeueing %s", csv.SelfLink)
// a.csvQueue.AddRateLimited(k)
// return
//}

// syncClusterServiceVersion is the method that gets called when we see a CSV event in the cluster
func (a *Operator) syncClusterServiceVersion(obj interface{}) (syncError error) {
Expand Down Expand Up @@ -201,7 +200,7 @@ func (a *Operator) transitionCSVState(in v1alpha1.ClusterServiceVersion) (out *v
}

out.SetPhase(v1alpha1.CSVPhaseInstalling, v1alpha1.CSVReasonInstallSuccessful, "waiting for install components to report healthy")
a.requeueCSV(out)
//a.requeueCSV(out)
return
case v1alpha1.CSVPhaseInstalling:
installer, strategy, _ := a.parseStrategiesAndUpdateStatus(out)
Expand Down Expand Up @@ -236,15 +235,17 @@ func (a *Operator) transitionCSVState(in v1alpha1.ClusterServiceVersion) (out *v

// if we can find a newer version that's successfully installed, we're safe to mark all intermediates
for _, csv := range a.findIntermediatesForDeletion(out) {
// TODO fix this
// we only mark them in this step, in case some get deleted but others fail and break the replacement chain
csv.SetPhase(v1alpha1.CSVPhaseDeleting, v1alpha1.CSVReasonReplaced, "has been replaced by a newer ClusterServiceVersion that has successfully installed.")
// ignore errors and success here; this step is just an optimization to speed up GC
a.client.OperatorsV1alpha1().ClusterServiceVersions(csv.GetNamespace()).UpdateStatus(csv)
}

// if there's no newer version, requeue for processing (likely will be GCable before resync)
a.requeueCSV(out)
//a.requeueCSV(out)
case v1alpha1.CSVPhaseDeleting:
syncError := a.OpClient.DeleteCustomResource(v1alpha1.GroupName, v1alpha1.GroupVersion, out.GetNamespace(), v1alpha1.ClusterServiceVersionKind, out.GetName())
foreground := metav1.DeletePropagationForeground
syncError = a.client.OperatorsV1alpha1().ClusterServiceVersions(out.GetNamespace()).Delete(out.GetName(), &metav1.DeleteOptions{PropagationPolicy: &foreground})
if syncError != nil {
logger.Debugf("unable to get delete csv marked for deletion: %s", syncError.Error())
}
Expand Down Expand Up @@ -282,15 +283,12 @@ func (a *Operator) findIntermediatesForDeletion(csv *v1alpha1.ClusterServiceVers

// csvsInNamespace finds all CSVs in a namespace
func (a *Operator) csvsInNamespace(namespace string) (csvs []*v1alpha1.ClusterServiceVersion) {
csvsInNamespace, err := a.OpClient.ListCustomResource(v1alpha1.GroupName, v1alpha1.GroupVersion, namespace, v1alpha1.ClusterServiceVersionKind)
// TODO: read from indexer
csvsInNamespace, err := a.client.OperatorsV1alpha1().ClusterServiceVersions(namespace).List(metav1.ListOptions{})
if err != nil {
return nil
}
for _, csvUnst := range csvsInNamespace.Items {
csv := v1alpha1.ClusterServiceVersion{}
if err := runtime.DefaultUnstructuredConverter.FromUnstructured(csvUnst.UnstructuredContent(), &csv); err != nil {
continue
}
for _, csv := range csvsInNamespace.Items {
csvs = append(csvs, &csv)
}
return
Expand All @@ -308,7 +306,7 @@ func (a *Operator) checkReplacementsAndUpdateStatus(csv *v1alpha1.ClusterService
csv.SetPhase(v1alpha1.CSVPhaseReplacing, v1alpha1.CSVReasonBeingReplaced, msg)

// requeue so that we quickly pick up on replacement status changes
a.requeueCSV(csv)
//a.requeueCSV(csv)

return fmt.Errorf("replacing")
}
Expand All @@ -334,7 +332,7 @@ func (a *Operator) updateInstallStatus(csv *v1alpha1.ClusterServiceVersion, inst
// if there's an error checking install that shouldn't fail the strategy, requeue with message
if strategyErr != nil {
csv.SetPhase(v1alpha1.CSVPhaseInstalling, requeueConditionReason, fmt.Sprintf("installing: %s", strategyErr))
a.requeueCSV(csv)
//a.requeueCSV(csv)
return strategyErr
}

Expand All @@ -359,7 +357,7 @@ func (a *Operator) parseStrategiesAndUpdateStatus(csv *v1alpha1.ClusterServiceVe
}
if previousStrategy != nil {
// check for status changes if we know we're replacing a CSV
a.requeueCSV(previousCSV)
//a.requeueCSV(previousCSV)
}

strName := strategy.GetStrategyName()
Expand Down Expand Up @@ -443,20 +441,15 @@ func (a *Operator) isBeingReplaced(in *v1alpha1.ClusterServiceVersion, csvsInNam
return
}

func (a *Operator) isReplacing(in *v1alpha1.ClusterServiceVersion) (previous *v1alpha1.ClusterServiceVersion) {
func (a *Operator) isReplacing(in *v1alpha1.ClusterServiceVersion) *v1alpha1.ClusterServiceVersion {
log.Debugf("checking if csv is replacing an older version")
if in.Spec.Replaces == "" {
return nil
}
oldCSVUnst, err := a.OpClient.GetCustomResource(v1alpha1.GroupName, v1alpha1.GroupVersion, in.GetNamespace(), v1alpha1.ClusterServiceVersionKind, in.Spec.Replaces)
previous, err := a.client.OperatorsV1alpha1().ClusterServiceVersions(in.GetNamespace()).Get(in.Spec.Replaces, metav1.GetOptions{})
if err != nil {
log.Debugf("unable to get previous csv: %s", err.Error())
return nil
}
p := v1alpha1.ClusterServiceVersion{}
if err := runtime.DefaultUnstructuredConverter.FromUnstructured(oldCSVUnst.UnstructuredContent(), &p); err != nil {
log.Debugf("unable to parse previous csv: %s", err.Error())
return nil
}
return &p
return previous
}
Loading