Skip to content

Commit

Permalink
Make CRDs built and aggregated lazily for oasv2
Browse files Browse the repository at this point in the history
  • Loading branch information
Jefftree committed Jul 18, 2023
1 parent b2a9c06 commit 735be02
Show file tree
Hide file tree
Showing 3 changed files with 639 additions and 75 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,18 @@ package openapi

import (
"fmt"
"reflect"
"sync"
"time"

"github.com/google/uuid"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/labels"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"
"k8s.io/kube-openapi/pkg/cached"
"k8s.io/kube-openapi/pkg/handler"
"k8s.io/kube-openapi/pkg/validation/spec"

Expand All @@ -49,21 +50,69 @@ type Controller struct {

queue workqueue.RateLimitingInterface

staticSpec *spec.Swagger
staticSpec *spec.Swagger

openAPIService *handler.OpenAPIService

// specs per version and per CRD name
lock sync.Mutex
crdSpecs map[string]map[string]*spec.Swagger
// specs by name. The specs are lazily constructed on request.
// The lock is for the map only.
lock sync.Mutex
specsByName map[string]*specCache
}

// specCache holds the merged version spec for a CRD as well as the CRD object.
// The spec is created lazily from the CRD object on request.
// The mergedVersionSpec is only created on instantiation and is never
// changed. crdCache is a cached.Replaceable and updates are thread
// safe. Thus, no lock is needed to protect this struct.
type specCache struct {
crdCache cached.Replaceable[*apiextensionsv1.CustomResourceDefinition]
mergedVersionSpec cached.Data[*spec.Swagger]
}

func (s *specCache) update(crd *apiextensionsv1.CustomResourceDefinition) {
s.crdCache.Replace(cached.NewResultOK(crd, generateCRDHash(crd)))
}

func createSpecCache(crd *apiextensionsv1.CustomResourceDefinition) *specCache {
s := specCache{}
s.update(crd)

s.mergedVersionSpec = cached.NewTransformer[*apiextensionsv1.CustomResourceDefinition](func(result cached.Result[*apiextensionsv1.CustomResourceDefinition]) cached.Result[*spec.Swagger] {
if result.Err != nil {
// This should never happen, but return the err if it does.
return cached.NewResultErr[*spec.Swagger](result.Err)
}
crd := result.Data
mergeSpec := &spec.Swagger{}
for _, v := range crd.Spec.Versions {
if !v.Served {
continue
}
s, err := builder.BuildOpenAPIV2(crd, v.Name, builder.Options{V2: true})
// Defaults must be pruned here for CRDs to cleanly merge with the static
// spec that already has defaults pruned
if err != nil {
return cached.NewResultErr[*spec.Swagger](err)
}
s.Definitions = handler.PruneDefaults(s.Definitions)
mergeSpec, err = builder.MergeSpecs(mergeSpec, s)
if err != nil {
return cached.NewResultErr[*spec.Swagger](err)
}
}
return cached.NewResultOK(mergeSpec, generateCRDHash(crd))
}, &s.crdCache)
return &s
}

// NewController creates a new Controller with input CustomResourceDefinition informer
func NewController(crdInformer informers.CustomResourceDefinitionInformer) *Controller {
c := &Controller{
crdLister: crdInformer.Lister(),
crdsSynced: crdInformer.Informer().HasSynced,
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "crd_openapi_controller"),
crdSpecs: map[string]map[string]*spec.Swagger{},
crdLister: crdInformer.Lister(),
crdsSynced: crdInformer.Informer().HasSynced,
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "crd_openapi_controller"),
specsByName: map[string]*specCache{},
}

crdInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
Expand Down Expand Up @@ -102,18 +151,9 @@ func (c *Controller) Run(staticSpec *spec.Swagger, openAPIService *handler.OpenA
if !apiextensionshelpers.IsCRDConditionTrue(crd, apiextensionsv1.Established) {
continue
}
newSpecs, changed, err := buildVersionSpecs(crd, nil)
if err != nil {
utilruntime.HandleError(fmt.Errorf("failed to build OpenAPI spec of CRD %s: %v", crd.Name, err))
} else if !changed {
continue
}
c.crdSpecs[crd.Name] = newSpecs
}
if err := c.updateSpecLocked(); err != nil {
utilruntime.HandleError(fmt.Errorf("failed to initially create OpenAPI spec for CRDs: %v", err))
return
c.specsByName[crd.Name] = createSpecCache(crd)
}
c.updateSpecLocked()

// only start one worker thread since its a slow moving API
go wait.Until(c.runWorker, time.Second, stopCh)
Expand Down Expand Up @@ -164,76 +204,59 @@ func (c *Controller) sync(name string) error {

// do we have to remove all specs of this CRD?
if errors.IsNotFound(err) || !apiextensionshelpers.IsCRDConditionTrue(crd, apiextensionsv1.Established) {
if _, found := c.crdSpecs[name]; !found {
if _, found := c.specsByName[name]; !found {
return nil
}
delete(c.crdSpecs, name)
delete(c.specsByName, name)
klog.V(2).Infof("Updating CRD OpenAPI spec because %s was removed", name)
regenerationCounter.With(map[string]string{"crd": name, "reason": "remove"})
return c.updateSpecLocked()
c.updateSpecLocked()
return nil
}

// compute CRD spec and see whether it changed
oldSpecs, updated := c.crdSpecs[crd.Name]
newSpecs, changed, err := buildVersionSpecs(crd, oldSpecs)
if err != nil {
return err
}
if !changed {
// If CRD spec already exists, update the CRD.
// specCache.update() includes the ETag so an update on a spec
// resulting in the same ETag will be a noop.
s, exists := c.specsByName[crd.Name]
if exists {
s.update(crd)
klog.V(2).Infof("Updating CRD OpenAPI spec because %s changed", name)
regenerationCounter.With(map[string]string{"crd": name, "reason": "update"})
return nil
}

// update specs of this CRD
c.crdSpecs[crd.Name] = newSpecs
c.specsByName[crd.Name] = createSpecCache(crd)
klog.V(2).Infof("Updating CRD OpenAPI spec because %s changed", name)
reason := "add"
if updated {
reason = "update"
}
regenerationCounter.With(map[string]string{"crd": name, "reason": reason})
return c.updateSpecLocked()
regenerationCounter.With(map[string]string{"crd": name, "reason": "add"})
c.updateSpecLocked()
return nil
}

func buildVersionSpecs(crd *apiextensionsv1.CustomResourceDefinition, oldSpecs map[string]*spec.Swagger) (map[string]*spec.Swagger, bool, error) {
newSpecs := map[string]*spec.Swagger{}
anyChanged := false
for _, v := range crd.Spec.Versions {
if !v.Served {
continue
}
spec, err := builder.BuildOpenAPIV2(crd, v.Name, builder.Options{V2: true})
// Defaults must be pruned here for CRDs to cleanly merge with the static
// spec that already has defaults pruned
spec.Definitions = handler.PruneDefaults(spec.Definitions)
if err != nil {
return nil, false, err
}
newSpecs[v.Name] = spec
if oldSpecs[v.Name] == nil || !reflect.DeepEqual(oldSpecs[v.Name], spec) {
anyChanged = true
}
}
if !anyChanged && len(oldSpecs) == len(newSpecs) {
return newSpecs, false, nil
// updateSpecLocked updates the cached spec graph.
func (c *Controller) updateSpecLocked() {
specList := make([]cached.Data[*spec.Swagger], 0, len(c.specsByName))
for crd := range c.specsByName {
specList = append(specList, c.specsByName[crd].mergedVersionSpec)
}

return newSpecs, true, nil
}

// updateSpecLocked aggregates all OpenAPI specs and updates openAPIService.
// It is not thread-safe. The caller is responsible to hold proper lock (Controller.lock).
func (c *Controller) updateSpecLocked() error {
crdSpecs := []*spec.Swagger{}
for _, versionSpecs := range c.crdSpecs {
for _, s := range versionSpecs {
crdSpecs = append(crdSpecs, s)
cache := cached.NewListMerger(func(results []cached.Result[*spec.Swagger]) cached.Result[*spec.Swagger] {
localCRDSpec := make([]*spec.Swagger, 0, len(results))
for k := range results {
if results[k].Err == nil {
localCRDSpec = append(localCRDSpec, results[k].Data)
}
}
}
mergedSpec, err := builder.MergeSpecs(c.staticSpec, crdSpecs...)
if err != nil {
return fmt.Errorf("failed to merge specs: %v", err)
}
return c.openAPIService.UpdateSpec(mergedSpec)
mergedSpec, err := builder.MergeSpecs(c.staticSpec, localCRDSpec...)
if err != nil {
return cached.NewResultErr[*spec.Swagger](fmt.Errorf("failed to merge specs: %v", err))
}
// A UUID is returned for the etag because we will only
// create a new merger when a CRD has changed. A hash based
// etag is more expensive because the CRDs are not
// premarshalled.
return cached.NewResultOK(mergedSpec, uuid.New().String())
}, specList)
c.openAPIService.UpdateSpecLazy(cache)
}

func (c *Controller) addCustomResourceDefinition(obj interface{}) {
Expand Down Expand Up @@ -269,3 +292,7 @@ func (c *Controller) deleteCustomResourceDefinition(obj interface{}) {
func (c *Controller) enqueue(obj *apiextensionsv1.CustomResourceDefinition) {
c.queue.Add(obj.Name)
}

func generateCRDHash(crd *apiextensionsv1.CustomResourceDefinition) string {
return fmt.Sprintf("%s,%d", crd.UID, crd.Generation)
}
Loading

0 comments on commit 735be02

Please sign in to comment.