Skip to content

Commit

Permalink
Merge pull request #1174 from DirectXMan12/feature/metadata-only-watch
Browse files Browse the repository at this point in the history
✨ metadata-only watches
  • Loading branch information
k8s-ci-robot committed Oct 20, 2020
2 parents 5c2b42d + 5de9250 commit bae8fdb
Show file tree
Hide file tree
Showing 11 changed files with 1,586 additions and 340 deletions.
57 changes: 48 additions & 9 deletions pkg/builder/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@ import (
"strings"

"github.com/go-logr/logr"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/rest"

"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/apiutil"
"sigs.k8s.io/controller-runtime/pkg/controller"
Expand All @@ -37,6 +39,17 @@ import (
var newController = controller.New
var getGvk = apiutil.GVKForObject

// project represents other forms that the we can use to
// send/receive a given resource (metadata-only, unstructured, etc)
type objectProjection int

const (
// projectAsNormal doesn't change the object from the form given
projectAsNormal objectProjection = iota
// projectAsMetadata turns this into an metadata-only watch
projectAsMetadata
)

// Builder builds a Controller.
type Builder struct {
forInput ForInput
Expand All @@ -57,9 +70,10 @@ func ControllerManagedBy(m manager.Manager) *Builder {

// ForInput represents the information set by For method.
type ForInput struct {
object client.Object
predicates []predicate.Predicate
err error
object client.Object
predicates []predicate.Predicate
objectProjection objectProjection
err error
}

// For defines the type of Object being *reconciled*, and configures the ControllerManagedBy to respond to create / delete /
Expand All @@ -82,8 +96,9 @@ func (blder *Builder) For(object client.Object, opts ...ForOption) *Builder {

// OwnsInput represents the information set by Owns method.
type OwnsInput struct {
object client.Object
predicates []predicate.Predicate
object client.Object
predicates []predicate.Predicate
objectProjection objectProjection
}

// Owns defines types of Objects being *generated* by the ControllerManagedBy, and configures the ControllerManagedBy to respond to
Expand Down Expand Up @@ -188,19 +203,43 @@ func (blder *Builder) Build(r reconcile.Reconciler) (controller.Controller, erro
return blder.ctrl, nil
}

func (blder *Builder) project(obj client.Object, proj objectProjection) (client.Object, error) {
switch proj {
case projectAsNormal:
return obj, nil
case projectAsMetadata:
metaObj := &metav1.PartialObjectMetadata{}
gvk, err := getGvk(obj, blder.mgr.GetScheme())
if err != nil {
return nil, fmt.Errorf("unable to determine GVK of %T for a metadata-only watch: %w", obj, err)
}
metaObj.SetGroupVersionKind(gvk)
return metaObj, nil
default:
panic(fmt.Sprintf("unexpected projection type %v on type %T, should not be possible since this is an internal field", proj, obj))
}
}

func (blder *Builder) doWatch() error {
// Reconcile type
src := &source.Kind{Type: blder.forInput.object}
typeForSrc, err := blder.project(blder.forInput.object, blder.forInput.objectProjection)
if err != nil {
return err
}
src := &source.Kind{Type: typeForSrc}
hdler := &handler.EnqueueRequestForObject{}
allPredicates := append(blder.globalPredicates, blder.forInput.predicates...)
err := blder.ctrl.Watch(src, hdler, allPredicates...)
if err != nil {
if err := blder.ctrl.Watch(src, hdler, allPredicates...); err != nil {
return err
}

// Watches the managed types
for _, own := range blder.ownsInput {
src := &source.Kind{Type: own.object}
typeForSrc, err := blder.project(own.object, own.objectProjection)
if err != nil {
return err
}
src := &source.Kind{Type: typeForSrc}
hdler := &handler.EnqueueRequestForOwner{
OwnerType: blder.forInput.object,
IsController: true,
Expand Down
65 changes: 60 additions & 5 deletions pkg/builder/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,17 @@ import (
"github.com/go-logr/logr"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"

appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/rest"
"k8s.io/client-go/util/workqueue"

"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
Expand Down Expand Up @@ -358,8 +361,60 @@ var _ = Describe("application", func() {
})
})

Describe("watching with projections", func() {
var mgr manager.Manager
BeforeEach(func() {
// use a cache that intercepts requests for fully typed objects to
// ensure we use the projected versions
var err error
mgr, err = manager.New(cfg, manager.Options{NewCache: newNonTypedOnlyCache})
Expect(err).NotTo(HaveOccurred())
})

It("should support watching For & Owns as metadata", func() {
bldr := ControllerManagedBy(mgr).
For(&appsv1.Deployment{}, OnlyMetadata).
Owns(&appsv1.ReplicaSet{}, OnlyMetadata)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
doReconcileTest(ctx, "8", bldr, mgr, true)
})
})
})

// newNonTypedOnlyCache returns a new cache that wraps the normal cache,
// returning an error if normal, typed objects have informers requested.
func newNonTypedOnlyCache(config *rest.Config, opts cache.Options) (cache.Cache, error) {
normalCache, err := cache.New(config, opts)
if err != nil {
return nil, err
}
return &nonTypedOnlyCache{
Cache: normalCache,
}, nil
}

// nonTypedOnlyCache is a cache.Cache that only provides metadata &
// unstructured informers.
type nonTypedOnlyCache struct {
cache.Cache
}

func (c *nonTypedOnlyCache) GetInformer(ctx context.Context, obj client.Object) (cache.Informer, error) {
switch obj.(type) {
case (*metav1.PartialObjectMetadata):
return c.Cache.GetInformer(ctx, obj)
default:
return nil, fmt.Errorf("did not want to provide an informer for normal type %T", obj)
}
}
func (c *nonTypedOnlyCache) GetInformerForKind(ctx context.Context, gvk schema.GroupVersionKind) (cache.Informer, error) {
return nil, fmt.Errorf("don't try to sidestep the restriction on informer types by calling GetInformerForKind")
}

// TODO(directxman12): this function has too many arguments, and the whole
// "nameSuffix" think is a bit of a hack It should be cleaned up significantly by someone with a bit of time
func doReconcileTest(ctx context.Context, nameSuffix string, blder *Builder, mgr manager.Manager, complete bool) {
deployName := "deploy-name-" + nameSuffix
rsName := "rs-name-" + nameSuffix
Expand Down Expand Up @@ -422,8 +477,8 @@ func doReconcileTest(ctx context.Context, nameSuffix string, blder *Builder, mgr
Expect(err).NotTo(HaveOccurred())

By("Waiting for the Deployment Reconcile")
Expect(<-ch).To(Equal(reconcile.Request{
NamespacedName: types.NamespacedName{Namespace: "default", Name: deployName}}))
Eventually(ch).Should(Receive(Equal(reconcile.Request{
NamespacedName: types.NamespacedName{Namespace: "default", Name: deployName}})))

By("Creating a ReplicaSet")
// Expect a Reconcile when an Owned object is managedObjects.
Expand Down Expand Up @@ -452,8 +507,8 @@ func doReconcileTest(ctx context.Context, nameSuffix string, blder *Builder, mgr
Expect(err).NotTo(HaveOccurred())

By("Waiting for the ReplicaSet Reconcile")
Expect(<-ch).To(Equal(reconcile.Request{
NamespacedName: types.NamespacedName{Namespace: "default", Name: deployName}}))
Eventually(ch).Should(Receive(Equal(reconcile.Request{
NamespacedName: types.NamespacedName{Namespace: "default", Name: deployName}})))

}

Expand Down
56 changes: 54 additions & 2 deletions pkg/builder/example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,71 @@ import (
"fmt"
"os"

logf "sigs.k8s.io/controller-runtime/pkg/log"

appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/config"
logf "sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/log/zap"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/manager/signals"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
)

func ExampleBuilder_metadata_only() {
logf.SetLogger(zap.New())

var log = logf.Log.WithName("builder-examples")

mgr, err := manager.New(config.GetConfigOrDie(), manager.Options{})
if err != nil {
log.Error(err, "could not create manager")
os.Exit(1)
}

cl := mgr.GetClient()
err = builder.
ControllerManagedBy(mgr). // Create the ControllerManagedBy
For(&appsv1.ReplicaSet{}). // ReplicaSet is the Application API
Owns(&corev1.Pod{}, builder.OnlyMetadata). // ReplicaSet owns Pods created by it, and caches them as metadata only
Complete(reconcile.Func(func(ctx context.Context, req reconcile.Request) (reconcile.Result, error) {
// Read the ReplicaSet
rs := &appsv1.ReplicaSet{}
err := cl.Get(ctx, req.NamespacedName, rs)
if err != nil {
return reconcile.Result{}, client.IgnoreNotFound(err)
}

// List the Pods matching the PodTemplate Labels, but only their metadata
var podsMeta metav1.PartialObjectMetadataList
err = cl.List(ctx, &podsMeta, client.InNamespace(req.Namespace), client.MatchingLabels(rs.Spec.Template.Labels))
if err != nil {
return reconcile.Result{}, client.IgnoreNotFound(err)
}

// Update the ReplicaSet
rs.Labels["pod-count"] = fmt.Sprintf("%v", len(podsMeta.Items))
err = cl.Update(ctx, rs)
if err != nil {
return reconcile.Result{}, err
}

return reconcile.Result{}, nil
}))
if err != nil {
log.Error(err, "could not create controller")
os.Exit(1)
}

if err := mgr.Start(signals.SetupSignalHandler()); err != nil {
log.Error(err, "could not start manager")
os.Exit(1)
}
}

// This example creates a simple application ControllerManagedBy that is configured for ReplicaSets and Pods.
//
// * Create a new application for ReplicaSets that manages Pods owned by the ReplicaSet and calls into
Expand Down
33 changes: 33 additions & 0 deletions pkg/builder/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,3 +76,36 @@ var _ OwnsOption = &Predicates{}
var _ WatchesOption = &Predicates{}

// }}}

// {{{ For & Owns Dual-Type options

// asProjection configures the projection (currently only metadata) on the input.
// Currently only metadata is supported. We might want to expand
// this to arbitrary non-special local projections in the future.
type projectAs objectProjection

// ApplyToFor applies this configuration to the given ForInput options.
func (p projectAs) ApplyToFor(opts *ForInput) {
opts.objectProjection = objectProjection(p)
}

// ApplyToOwns applies this configuration to the given OwnsInput options.
func (p projectAs) ApplyToOwns(opts *OwnsInput) {
opts.objectProjection = objectProjection(p)
}

var (
// OnlyMetadata tells the controller to *only* cache metadata, and to watch
// the the API server in metadata-only form. This is useful when watching
// lots of objects, really big objects, or objects for which you only know
// the the GVK, but not the structure. You'll need to pass
// metav1.PartialObjectMetadata to the client when fetching objects in your
// reconciler, otherwise you'll end up with a duplicate structured or
// unstructured cache.
OnlyMetadata = projectAs(projectAsMetadata)

_ ForOption = OnlyMetadata
_ OwnsOption = OnlyMetadata
)

// }}}
Loading

0 comments on commit bae8fdb

Please sign in to comment.