Skip to content

Commit

Permalink
feat: support v1 CRDS in OLM. Simplify InstallPlan Execution via new
Browse files Browse the repository at this point in the history
Stepper interface. All CRDs will be converted and handled at the v1
APIVersion.
  • Loading branch information
exdx committed Apr 17, 2020
1 parent f3f46c8 commit d7f5cfe
Show file tree
Hide file tree
Showing 23 changed files with 1,074 additions and 597 deletions.
187 changes: 51 additions & 136 deletions pkg/controller/operators/catalog/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,9 @@ import (
corev1 "k8s.io/api/core/v1"
rbacv1 "k8s.io/api/rbac/v1"
"k8s.io/apiextensions-apiserver/pkg/apis/apiextensions"
v1beta1ext "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1"
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
"k8s.io/apiextensions-apiserver/pkg/apiserver/validation"
extinf "k8s.io/apiextensions-apiserver/pkg/client/informers/externalversions"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
Expand Down Expand Up @@ -53,7 +52,6 @@ import (
"github.com/operator-framework/operator-lifecycle-manager/pkg/lib/scoped"
sharedtime "github.com/operator-framework/operator-lifecycle-manager/pkg/lib/time"
"github.com/operator-framework/operator-lifecycle-manager/pkg/metrics"
"k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1"
)

const (
Expand All @@ -62,6 +60,7 @@ const (
clusterRoleKind = "ClusterRole"
clusterRoleBindingKind = "ClusterRoleBinding"
configMapKind = "ConfigMap"
csvKind = "ClusterServiceVersion"
serviceAccountKind = "ServiceAccount"
serviceKind = "Service"
roleKind = "Role"
Expand Down Expand Up @@ -323,8 +322,8 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
}

// Register CustomResourceDefinition QueueInformer
crdInformer := extinf.NewSharedInformerFactory(op.opClient.ApiextensionsV1beta1Interface(), resyncPeriod()).Apiextensions().V1beta1().CustomResourceDefinitions()
op.lister.APIExtensionsV1beta1().RegisterCustomResourceDefinitionLister(crdInformer.Lister())
crdInformer := extinf.NewSharedInformerFactory(op.opClient.ApiextensionsInterface(), resyncPeriod()).Apiextensions().V1().CustomResourceDefinitions()
op.lister.APIExtensionsV1().RegisterCustomResourceDefinitionLister(crdInformer.Lister())
crdQueueInformer, err := queueinformer.NewQueueInformer(
ctx,
queueinformer.WithLogger(op.logger),
Expand Down Expand Up @@ -1300,22 +1299,18 @@ func (o *Operator) ResolvePlan(plan *v1alpha1.InstallPlan) error {
return nil
}

func getCRDVersionsMap(crd *v1beta1ext.CustomResourceDefinition) map[string]struct{} {
func GetCRDV1VersionsMap(crd *apiextensionsv1.CustomResourceDefinition) map[string]struct{} {
versionsMap := map[string]struct{}{}

for _, version := range crd.Spec.Versions {
versionsMap[version.Name] = struct{}{}
}
if crd.Spec.Version != "" {
versionsMap[crd.Spec.Version] = struct{}{}
}

return versionsMap
}

// Ensure all existing served versions are present in new CRD
func ensureCRDVersions(oldCRD *v1beta1ext.CustomResourceDefinition, newCRD *v1beta1ext.CustomResourceDefinition) error {
newCRDVersions := getCRDVersionsMap(newCRD)
func EnsureCRDVersions(oldCRD *apiextensionsv1.CustomResourceDefinition, newCRD *apiextensionsv1.CustomResourceDefinition) error {
newCRDVersions := GetCRDV1VersionsMap(newCRD)

for _, oldVersion := range oldCRD.Spec.Versions {
if oldVersion.Served {
Expand All @@ -1325,49 +1320,47 @@ func ensureCRDVersions(oldCRD *v1beta1ext.CustomResourceDefinition, newCRD *v1be
}
}
}
if oldCRD.Spec.Version != "" {
_, ok := newCRDVersions[oldCRD.Spec.Version]
if !ok {
return fmt.Errorf("New CRD (%s) must contain existing version (%s)", oldCRD.Name, oldCRD.Spec.Version)
}
}
return nil
}

// Validate all existing served versions against new CRD's validation (if changed)
func (o *Operator) validateCustomResourceDefinition(oldCRD *v1beta1ext.CustomResourceDefinition, newCRD *v1beta1ext.CustomResourceDefinition) error {
o.logger.Debugf("Comparing %#v to %#v", oldCRD.Spec.Validation, newCRD.Spec.Validation)
func validateV1CRDCompatibility(dynamicClient dynamic.Interface, oldCRD *apiextensionsv1.CustomResourceDefinition, newCRD *apiextensionsv1.CustomResourceDefinition) error {
logrus.Debugf("Comparing %#v to %#v", oldCRD.Spec.Versions, newCRD.Spec.Versions)

// If validation schema is unchanged, return right away
if reflect.DeepEqual(oldCRD.Spec.Validation, newCRD.Spec.Validation) {
return nil
newestSchema := newCRD.Spec.Versions[len(newCRD.Spec.Versions)-1].Schema
for i, oldVersion := range oldCRD.Spec.Versions {
if !reflect.DeepEqual(oldVersion.Schema, newestSchema) {
break
}
if i == len(oldCRD.Spec.Versions)-1 {
// we are on the last iteration
// schema has not changed between versions at this point.
return nil
}
}

convertedCRD := &apiextensions.CustomResourceDefinition{}
if err := v1beta1ext.Convert_v1beta1_CustomResourceDefinition_To_apiextensions_CustomResourceDefinition(newCRD, convertedCRD, nil); err != nil {
if err := apiextensionsv1.Convert_v1_CustomResourceDefinition_To_apiextensions_CustomResourceDefinition(newCRD, convertedCRD, nil); err != nil {
return err
}
for _, version := range oldCRD.Spec.Versions {
if !version.Served {
gvr := schema.GroupVersionResource{Group: oldCRD.Spec.Group, Version: version.Name, Resource: oldCRD.Spec.Names.Plural}
err := o.validateExistingCRs(gvr, convertedCRD)
err := validateExistingCRs(dynamicClient, gvr, convertedCRD)
if err != nil {
return err
}
}
}

if oldCRD.Spec.Version != "" {
gvr := schema.GroupVersionResource{Group: oldCRD.Spec.Group, Version: oldCRD.Spec.Version, Resource: oldCRD.Spec.Names.Plural}
err := o.validateExistingCRs(gvr, convertedCRD)
if err != nil {
return err
}
}
o.logger.Debugf("Successfully validated CRD %s\n", newCRD.Name)
logrus.Debugf("Successfully validated CRD %s\n", newCRD.Name)
return nil
}

func (o *Operator) validateExistingCRs(gvr schema.GroupVersionResource, newCRD *apiextensions.CustomResourceDefinition) error {
crList, err := o.dynamicClient.Resource(gvr).List(metav1.ListOptions{})
func validateExistingCRs(dynamicClient dynamic.Interface, gvr schema.GroupVersionResource, newCRD *apiextensions.CustomResourceDefinition) error {
// make dynamic client
crList, err := dynamicClient.Resource(gvr).List(metav1.ListOptions{})
if err != nil {
return fmt.Errorf("error listing resources in GroupVersionResource %#v: %s", gvr, err)
}
Expand All @@ -1389,14 +1382,14 @@ func (o *Operator) validateExistingCRs(gvr schema.GroupVersionResource, newCRD *
// The function may not always succeed as storedVersions requires at least one
// version. If there is only stored version, it won't be removed until a new
// stored version is added.
func removeDeprecatedStoredVersions(oldCRD *v1beta1ext.CustomResourceDefinition, newCRD *v1beta1ext.CustomResourceDefinition) []string {
func removeDeprecatedV1StoredVersions(oldCRD *apiextensionsv1.CustomResourceDefinition, newCRD *apiextensionsv1.CustomResourceDefinition) []string {
// StoredVersions requires to have at least one version.
if len(oldCRD.Status.StoredVersions) <= 1 {
return nil
}

newStoredVersions := []string{}
newCRDVersions := getCRDVersionsMap(newCRD)
newCRDVersions := GetCRDV1VersionsMap(newCRD)
for _, v := range oldCRD.Status.StoredVersions {
_, ok := newCRDVersions[v]
if ok {
Expand Down Expand Up @@ -1437,113 +1430,35 @@ func (o *Operator) ExecutePlan(plan *v1alpha1.InstallPlan) error {
}

ensurer := newStepEnsurer(kubeclient, crclient, dynamicClient)
b := newBuilder(kubeclient, dynamicClient, o.csvProvidedAPIsIndexer)

for i, step := range plan.Status.Plan {
switch step.Status {
case v1alpha1.StepStatusPresent, v1alpha1.StepStatusCreated:
doStep := true
s, err := b.create(step)
if err != nil {
if _, ok := err.(notSupportedStepperErr); ok {
// stepper not implemented for this type yet
// stepper currently only implemented for CRD types
doStep = false
} else {
return err
}
}
if doStep {
status, err := s.Status()
if err != nil {
return err
}
plan.Status.Plan[i].Status = status
continue
case v1alpha1.StepStatusWaitingForAPI:
switch step.Resource.Kind {
case crdKind:
crd, err := o.opClient.ApiextensionsV1beta1Interface().ApiextensionsV1beta1().CustomResourceDefinitions().Get(step.Resource.Name, metav1.GetOptions{})
if err != nil {
if k8serrors.IsNotFound(err) {
plan.Status.Plan[i].Status = v1alpha1.StepStatusNotPresent
} else {
return errorwrap.Wrapf(err, "error finding the %s CRD", crd.Name)
}
continue
}

established, namesAccepted := false, false
for _, cdt := range crd.Status.Conditions {
switch cdt.Type {
case v1beta1.Established:
if cdt.Status == v1beta1.ConditionTrue {
established = true
}
case v1beta1.NamesAccepted:
if cdt.Status == v1beta1.ConditionTrue {
namesAccepted = true
}
}
}
}

if established && namesAccepted {
plan.Status.Plan[i].Status = v1alpha1.StepStatusCreated
}
continue
}
switch step.Status {
case v1alpha1.StepStatusPresent, v1alpha1.StepStatusCreated, v1alpha1.StepStatusWaitingForAPI:
continue
case v1alpha1.StepStatusUnknown, v1alpha1.StepStatusNotPresent:
o.logger.WithFields(logrus.Fields{"kind": step.Resource.Kind, "name": step.Resource.Name}).Debug("execute resource")
switch step.Resource.Kind {
case crdKind:
// Marshal the manifest into a CRD instance.
var crd v1beta1ext.CustomResourceDefinition
err := json.Unmarshal([]byte(step.Resource.Manifest), &crd)
if err != nil {
return errorwrap.Wrapf(err, "error parsing step manifest: %s", step.Resource.Name)
}

// TODO: check that names are accepted
// Attempt to create the CRD.
_, err = o.opClient.ApiextensionsV1beta1Interface().ApiextensionsV1beta1().CustomResourceDefinitions().Create(&crd)
if k8serrors.IsAlreadyExists(err) {
currentCRD, _ := o.lister.APIExtensionsV1beta1().CustomResourceDefinitionLister().Get(crd.GetName())
// Compare 2 CRDs to see if it needs to be updatetd
if !(reflect.DeepEqual(crd.Spec.Version, currentCRD.Spec.Version) &&
reflect.DeepEqual(crd.Spec.Versions, currentCRD.Spec.Versions) &&
reflect.DeepEqual(crd.Spec.Validation, currentCRD.Spec.Validation)) {
// Verify CRD ownership, only attempt to update if
// CRD has only one owner
// Example: provided=database.coreos.com/v1alpha1/EtcdCluster
matchedCSV, err := index.CRDProviderNames(o.csvProvidedAPIsIndexer, crd)
if err != nil {
return errorwrap.Wrapf(err, "error find matched CSV: %s", step.Resource.Name)
}
crd.SetResourceVersion(currentCRD.GetResourceVersion())
if len(matchedCSV) == 1 {
o.logger.Debugf("Found one owner for CRD %v", crd)
} else if len(matchedCSV) > 1 {
o.logger.Debugf("Found multiple owners for CRD %v", crd)

err := ensureCRDVersions(currentCRD, &crd)
if err != nil {
return errorwrap.Wrapf(err, "error missing existing CRD version(s) in new CRD: %s", step.Resource.Name)
}

if err = o.validateCustomResourceDefinition(currentCRD, &crd); err != nil {
return errorwrap.Wrapf(err, "error validating existing CRs agains new CRD's schema: %s", step.Resource.Name)
}
}
// Remove deprecated version in CRD storedVersions
storeVersions := removeDeprecatedStoredVersions(currentCRD, &crd)
if storeVersions != nil {
currentCRD.Status.StoredVersions = storeVersions
resultCRD, err := o.opClient.ApiextensionsV1beta1Interface().ApiextensionsV1beta1().CustomResourceDefinitions().UpdateStatus(currentCRD)
if err != nil {
return errorwrap.Wrapf(err, "error updating CRD's status: %s", step.Resource.Name)
}
crd.SetResourceVersion(resultCRD.GetResourceVersion())
}
// Update CRD to new version
_, err = o.opClient.ApiextensionsV1beta1Interface().ApiextensionsV1beta1().CustomResourceDefinitions().Update(&crd)
if err != nil {
return errorwrap.Wrapf(err, "error updating CRD: %s", step.Resource.Name)
}
}
// If it already existed, mark the step as Present.
plan.Status.Plan[i].Status = v1alpha1.StepStatusPresent
continue
} else if err != nil {
// Unexpected error creating the CRD.
return err
} else {
// If no error occured, make sure to wait for the API to become available.
plan.Status.Plan[i].Status = v1alpha1.StepStatusWaitingForAPI
continue
}

case v1alpha1.ClusterServiceVersionKind:
// Marshal the manifest into a CSV instance.
var csv v1alpha1.ClusterServiceVersion
Expand Down
Loading

0 comments on commit d7f5cfe

Please sign in to comment.