Skip to content

Commit

Permalink
feat: support v1 CRDS in OLM. Simply InstallPlan Execution via new
Browse files Browse the repository at this point in the history
Stepper interface.
  • Loading branch information
exdx committed Apr 6, 2020
1 parent 3ba5db8 commit 7e377cf
Show file tree
Hide file tree
Showing 15 changed files with 981 additions and 160 deletions.
233 changes: 119 additions & 114 deletions pkg/controller/operators/catalog/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@ import (
corev1 "k8s.io/api/core/v1"
rbacv1 "k8s.io/api/rbac/v1"
"k8s.io/apiextensions-apiserver/pkg/apis/apiextensions"
v1ext "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
v1beta1ext "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1"
"k8s.io/apiextensions-apiserver/pkg/apiserver/validation"
extinf "k8s.io/apiextensions-apiserver/pkg/client/informers/externalversions"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
Expand Down Expand Up @@ -53,7 +53,6 @@ import (
"github.com/operator-framework/operator-lifecycle-manager/pkg/lib/scoped"
sharedtime "github.com/operator-framework/operator-lifecycle-manager/pkg/lib/time"
"github.com/operator-framework/operator-lifecycle-manager/pkg/metrics"
"k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1"
)

const (
Expand Down Expand Up @@ -322,7 +321,7 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
}

// Register CustomResourceDefinition QueueInformer
crdInformer := extinf.NewSharedInformerFactory(op.opClient.ApiextensionsV1beta1Interface(), resyncPeriod()).Apiextensions().V1beta1().CustomResourceDefinitions()
crdInformer := extinf.NewSharedInformerFactory(op.opClient.ApiextensionsInterface(), resyncPeriod()).Apiextensions().V1beta1().CustomResourceDefinitions()
op.lister.APIExtensionsV1beta1().RegisterCustomResourceDefinitionLister(crdInformer.Lister())
crdQueueInformer, err := queueinformer.NewQueueInformer(
ctx,
Expand Down Expand Up @@ -1299,7 +1298,7 @@ func (o *Operator) ResolvePlan(plan *v1alpha1.InstallPlan) error {
return nil
}

func getCRDVersionsMap(crd *v1beta1ext.CustomResourceDefinition) map[string]struct{} {
func getCRDV1Beta1VersionsMap(crd *v1beta1ext.CustomResourceDefinition) map[string]struct{} {
versionsMap := map[string]struct{}{}

for _, version := range crd.Spec.Versions {
Expand All @@ -1312,9 +1311,18 @@ func getCRDVersionsMap(crd *v1beta1ext.CustomResourceDefinition) map[string]stru
return versionsMap
}

func getCRDV1VersionsMap(crd *v1ext.CustomResourceDefinition) map[string]struct{} {
versionsMap := map[string]struct{}{}

for _, version := range crd.Spec.Versions {
versionsMap[version.Name] = struct{}{}
}
return versionsMap
}

// Ensure all existing served versions are present in new CRD
func ensureCRDVersions(oldCRD *v1beta1ext.CustomResourceDefinition, newCRD *v1beta1ext.CustomResourceDefinition) error {
newCRDVersions := getCRDVersionsMap(newCRD)
func ensureCRDV1Beta1Versions(oldCRD *v1beta1ext.CustomResourceDefinition, newCRD *v1beta1ext.CustomResourceDefinition) error {
newCRDVersions := getCRDV1Beta1VersionsMap(newCRD)

for _, oldVersion := range oldCRD.Spec.Versions {
if oldVersion.Served {
Expand All @@ -1333,9 +1341,24 @@ func ensureCRDVersions(oldCRD *v1beta1ext.CustomResourceDefinition, newCRD *v1be
return nil
}

// Validate all existing served versions against new CRD's validation (if changed)
func (o *Operator) validateCustomResourceDefinition(oldCRD *v1beta1ext.CustomResourceDefinition, newCRD *v1beta1ext.CustomResourceDefinition) error {
o.logger.Debugf("Comparing %#v to %#v", oldCRD.Spec.Validation, newCRD.Spec.Validation)
// Ensure all existing served versions are present in new CRD
func ensureCRDV1Versions(oldCRD *v1ext.CustomResourceDefinition, newCRD *v1ext.CustomResourceDefinition) error {
newCRDVersions := getCRDV1VersionsMap(newCRD)

for _, oldVersion := range oldCRD.Spec.Versions {
if oldVersion.Served {
_, ok := newCRDVersions[oldVersion.Name]
if !ok {
return fmt.Errorf("New CRD (%s) must contain existing served versions (%s)", oldCRD.Name, oldVersion.Name)
}
}
}
return nil
}

// Validate all existing served versions against new v1beta CRD's validation (if changed)
func validateV1BetaCustomResourceDefinition(dynamicClient dynamic.Interface, oldCRD *v1beta1ext.CustomResourceDefinition, newCRD *v1beta1ext.CustomResourceDefinition) error {
logrus.Debugf("Comparing %#v to %#v", oldCRD.Spec.Validation, newCRD.Spec.Validation)
// If validation schema is unchanged, return right away
if reflect.DeepEqual(oldCRD.Spec.Validation, newCRD.Spec.Validation) {
return nil
Expand All @@ -1347,7 +1370,7 @@ func (o *Operator) validateCustomResourceDefinition(oldCRD *v1beta1ext.CustomRes
for _, version := range oldCRD.Spec.Versions {
if !version.Served {
gvr := schema.GroupVersionResource{Group: oldCRD.Spec.Group, Version: version.Name, Resource: oldCRD.Spec.Names.Plural}
err := o.validateExistingCRs(gvr, convertedCRD)
err := validateExistingCRs(dynamicClient, gvr, convertedCRD)
if err != nil {
return err
}
Expand All @@ -1356,17 +1379,53 @@ func (o *Operator) validateCustomResourceDefinition(oldCRD *v1beta1ext.CustomRes

if oldCRD.Spec.Version != "" {
gvr := schema.GroupVersionResource{Group: oldCRD.Spec.Group, Version: oldCRD.Spec.Version, Resource: oldCRD.Spec.Names.Plural}
err := o.validateExistingCRs(gvr, convertedCRD)
err := validateExistingCRs(dynamicClient, gvr, convertedCRD)
if err != nil {
return err
}
}
o.logger.Debugf("Successfully validated CRD %s\n", newCRD.Name)
logrus.Debugf("Successfully validated CRD %s\n", newCRD.Name)
return nil
}

// Validate all existing served versions against new CRD's validation (if changed)
func validateV1CustomResourceDefinition(dynamicClient dynamic.Interface, oldCRD *v1ext.CustomResourceDefinition, newCRD *v1ext.CustomResourceDefinition) error {
logrus.Debugf("Comparing %#v to %#v", oldCRD.Spec.Versions, newCRD.Spec.Versions)

// If validation schema is unchanged, return right away
newestSchema := newCRD.Spec.Versions[len(newCRD.Spec.Versions)-1].Schema
for i, oldVersion := range oldCRD.Spec.Versions {
if !reflect.DeepEqual(oldVersion.Schema, newestSchema) {
break
}
if i == len(oldCRD.Spec.Versions)-1 {
// we are on the last iteration
// schema has not changed between versions at this point.
return nil
}
}

convertedCRD := &apiextensions.CustomResourceDefinition{}
if err := v1ext.Convert_v1_CustomResourceDefinition_To_apiextensions_CustomResourceDefinition(newCRD, convertedCRD, nil); err != nil {
return err
}
for _, version := range oldCRD.Spec.Versions {
if !version.Served {
gvr := schema.GroupVersionResource{Group: oldCRD.Spec.Group, Version: version.Name, Resource: oldCRD.Spec.Names.Plural}
err := validateExistingCRs(dynamicClient, gvr, convertedCRD)
if err != nil {
return err
}
}
}

logrus.Debugf("Successfully validated CRD %s\n", newCRD.Name)
return nil
}

func (o *Operator) validateExistingCRs(gvr schema.GroupVersionResource, newCRD *apiextensions.CustomResourceDefinition) error {
crList, err := o.dynamicClient.Resource(gvr).List(metav1.ListOptions{})
func validateExistingCRs(dynamicClient dynamic.Interface, gvr schema.GroupVersionResource, newCRD *apiextensions.CustomResourceDefinition) error {
// make dynamic client
crList, err := dynamicClient.Resource(gvr).List(metav1.ListOptions{})
if err != nil {
return fmt.Errorf("error listing resources in GroupVersionResource %#v: %s", gvr, err)
}
Expand All @@ -1388,14 +1447,36 @@ func (o *Operator) validateExistingCRs(gvr schema.GroupVersionResource, newCRD *
// The function may not always succeed as storedVersions requires at least one
// version. If there is only stored version, it won't be removed until a new
// stored version is added.
func removeDeprecatedStoredVersions(oldCRD *v1beta1ext.CustomResourceDefinition, newCRD *v1beta1ext.CustomResourceDefinition) []string {
func removeDeprecatedV1Beta1StoredVersions(oldCRD *v1beta1ext.CustomResourceDefinition, newCRD *v1beta1ext.CustomResourceDefinition) []string {
// StoredVersions requires to have at least one version.
if len(oldCRD.Status.StoredVersions) <= 1 {
return nil
}

newStoredVersions := []string{}
newCRDVersions := getCRDV1Beta1VersionsMap(newCRD)
for _, v := range oldCRD.Status.StoredVersions {
_, ok := newCRDVersions[v]
if ok {
newStoredVersions = append(newStoredVersions, v)
}
}

if len(newStoredVersions) < 1 {
return nil
} else {
return newStoredVersions
}
}

func removeDeprecatedV1StoredVersions(oldCRD *v1ext.CustomResourceDefinition, newCRD *v1ext.CustomResourceDefinition) []string {
// StoredVersions requires to have at least one version.
if len(oldCRD.Status.StoredVersions) <= 1 {
return nil
}

newStoredVersions := []string{}
newCRDVersions := getCRDVersionsMap(newCRD)
newCRDVersions := getCRDV1VersionsMap(newCRD)
for _, v := range oldCRD.Status.StoredVersions {
_, ok := newCRDVersions[v]
if ok {
Expand Down Expand Up @@ -1438,111 +1519,35 @@ func (o *Operator) ExecutePlan(plan *v1alpha1.InstallPlan) error {
ensurer := newStepEnsurer(kubeclient, crclient, dynamicClient)

for i, step := range plan.Status.Plan {
b := newBuilder(kubeclient, dynamicClient, o.csvProvidedAPIsIndexer)
doStep := true
s, err := b.step(step.Status, step.Resource.Manifest, step.Resource.Name)
if err != nil {
if _, ok := err.(*notSupportedStepperErr); ok {
// stepper not implemented for this type yet
// stepper currently only implemented for CRD types
doStep = false
} else {
return err
}
}
if doStep {
status, err := s.Status()
if err != nil {
return err
}
plan.Status.Plan[i].Status = status
return nil
}

switch step.Status {
case v1alpha1.StepStatusPresent, v1alpha1.StepStatusCreated:
continue
case v1alpha1.StepStatusWaitingForAPI:
switch step.Resource.Kind {
case crdKind:
crd, err := o.opClient.ApiextensionsV1beta1Interface().ApiextensionsV1beta1().CustomResourceDefinitions().Get(step.Resource.Name, metav1.GetOptions{})
if err != nil {
if k8serrors.IsNotFound(err) {
plan.Status.Plan[i].Status = v1alpha1.StepStatusNotPresent
} else {
return errorwrap.Wrapf(err, "error finding the %s CRD", crd.Name)
}
continue
}

established, namesAccepted := false, false
for _, cdt := range crd.Status.Conditions {
switch cdt.Type {
case v1beta1.Established:
if cdt.Status == v1beta1.ConditionTrue {
established = true
}
case v1beta1.NamesAccepted:
if cdt.Status == v1beta1.ConditionTrue {
namesAccepted = true
}
}
}

if established && namesAccepted {
plan.Status.Plan[i].Status = v1alpha1.StepStatusCreated
}
continue
}
continue
case v1alpha1.StepStatusUnknown, v1alpha1.StepStatusNotPresent:
o.logger.WithFields(logrus.Fields{"kind": step.Resource.Kind, "name": step.Resource.Name}).Debug("execute resource")
switch step.Resource.Kind {
case crdKind:
// Marshal the manifest into a CRD instance.
var crd v1beta1ext.CustomResourceDefinition
err := json.Unmarshal([]byte(step.Resource.Manifest), &crd)
if err != nil {
return errorwrap.Wrapf(err, "error parsing step manifest: %s", step.Resource.Name)
}

// TODO: check that names are accepted
// Attempt to create the CRD.
_, err = o.opClient.ApiextensionsV1beta1Interface().ApiextensionsV1beta1().CustomResourceDefinitions().Create(&crd)
if k8serrors.IsAlreadyExists(err) {
currentCRD, _ := o.lister.APIExtensionsV1beta1().CustomResourceDefinitionLister().Get(crd.GetName())
// Compare 2 CRDs to see if it needs to be updatetd
if !(reflect.DeepEqual(crd.Spec.Version, currentCRD.Spec.Version) &&
reflect.DeepEqual(crd.Spec.Versions, currentCRD.Spec.Versions) &&
reflect.DeepEqual(crd.Spec.Validation, currentCRD.Spec.Validation)) {
// Verify CRD ownership, only attempt to update if
// CRD has only one owner
// Example: provided=database.coreos.com/v1alpha1/EtcdCluster
matchedCSV, err := index.CRDProviderNames(o.csvProvidedAPIsIndexer, crd)
if err != nil {
return errorwrap.Wrapf(err, "error find matched CSV: %s", step.Resource.Name)
}
crd.SetResourceVersion(currentCRD.GetResourceVersion())
if len(matchedCSV) == 1 {
o.logger.Debugf("Found one owner for CRD %v", crd)
} else if len(matchedCSV) > 1 {
o.logger.Debugf("Found multiple owners for CRD %v", crd)

err := ensureCRDVersions(currentCRD, &crd)
if err != nil {
return errorwrap.Wrapf(err, "error missing existing CRD version(s) in new CRD: %s", step.Resource.Name)
}

if err = o.validateCustomResourceDefinition(currentCRD, &crd); err != nil {
return errorwrap.Wrapf(err, "error validating existing CRs agains new CRD's schema: %s", step.Resource.Name)
}
}
// Remove deprecated version in CRD storedVersions
storeVersions := removeDeprecatedStoredVersions(currentCRD, &crd)
if storeVersions != nil {
currentCRD.Status.StoredVersions = storeVersions
resultCRD, err := o.opClient.ApiextensionsV1beta1Interface().ApiextensionsV1beta1().CustomResourceDefinitions().UpdateStatus(currentCRD)
if err != nil {
return errorwrap.Wrapf(err, "error updating CRD's status: %s", step.Resource.Name)
}
crd.SetResourceVersion(resultCRD.GetResourceVersion())
}
// Update CRD to new version
_, err = o.opClient.ApiextensionsV1beta1Interface().ApiextensionsV1beta1().CustomResourceDefinitions().Update(&crd)
if err != nil {
return errorwrap.Wrapf(err, "error updating CRD: %s", step.Resource.Name)
}
}
// If it already existed, mark the step as Present.
plan.Status.Plan[i].Status = v1alpha1.StepStatusPresent
continue
} else if err != nil {
// Unexpected error creating the CRD.
return err
} else {
// If no error occured, make sure to wait for the API to become available.
plan.Status.Plan[i].Status = v1alpha1.StepStatusWaitingForAPI
continue
}

case v1alpha1.ClusterServiceVersionKind:
// Marshal the manifest into a CSV instance.
var csv v1alpha1.ClusterServiceVersion
Expand Down
Loading

0 comments on commit 7e377cf

Please sign in to comment.