Skip to content

Commit

Permalink
handler,predicate: pause object reconciliation by annotation (#60)
Browse files Browse the repository at this point in the history
* handler,predicate: add NewPause(key) to pause a controller
by event handler or predicate via an annotation with key
string key, respectively

internal/annotation: add generic library for creating
predicates and event handlers for arbitrary annotation keys

Signed-off-by: Eric Stroczynski <ericstroczynski@gmail.com>

* handler: create test registry for instrumented handler

Signed-off-by: Eric Stroczynski <ericstroczynski@gmail.com>
  • Loading branch information
Eric Stroczynski committed Jul 23, 2021
1 parent ec654a6 commit 3986790
Show file tree
Hide file tree
Showing 9 changed files with 1,057 additions and 7 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module github.com/operator-framework/operator-lib
go 1.15

require (
github.com/go-logr/logr v0.4.0
github.com/onsi/ginkgo v1.16.4
github.com/onsi/gomega v1.13.0
github.com/operator-framework/api v0.10.0
Expand Down
66 changes: 66 additions & 0 deletions handler/example_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
// Copyright 2021 The Operator-SDK Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package handler_test

import (
"context"
"os"

"github.com/operator-framework/operator-lib/handler"
v1 "k8s.io/api/core/v1"
"sigs.k8s.io/controller-runtime/pkg/client/config"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/manager/signals"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/controller-runtime/pkg/source"
)

// This example applies the Pause handler to all incoming Pod events on a Pod controller.
func ExampleNewPause() {
cfg, err := config.GetConfig()
if err != nil {
os.Exit(1)
}

mgr, err := manager.New(cfg, manager.Options{})
if err != nil {
os.Exit(1)
}

c, err := controller.NewUnmanaged("pod", mgr, controller.Options{
Reconciler: reconcile.Func(func(context.Context, reconcile.Request) (reconcile.Result, error) {
return reconcile.Result{}, nil
}),
})
if err != nil {
os.Exit(1)
}

// Filter out Pods with the "my.app/paused: true" annotation.
pause, err := handler.NewPause("my.app/paused")
if err != nil {
os.Exit(1)
}
if err := c.Watch(&source.Kind{Type: &v1.Pod{}}, pause); err != nil {
os.Exit(1)
}

<-mgr.Elected()

if err := c.Start(signals.SetupSignalHandler()); err != nil {
os.Exit(1)
}
}
18 changes: 11 additions & 7 deletions handler/instrumented_enqueue_object_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,25 +15,29 @@
package handler

import (
"github.com/operator-framework/operator-lib/handler/internal/metrics"

. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"github.com/prometheus/client_golang/prometheus"
dto "github.com/prometheus/client_model/go"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/util/workqueue"
"sigs.k8s.io/controller-runtime/pkg/controller/controllertest"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/metrics"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

"k8s.io/client-go/util/workqueue"
)

var _ = Describe("InstrumentedEnqueueRequestForObject", func() {
var q workqueue.RateLimitingInterface
var instance InstrumentedEnqueueRequestForObject
var pod *corev1.Pod

registry := prometheus.NewRegistry()
registry.MustRegister(metrics.ResourceCreatedAt)

BeforeEach(func() {
q = controllertest.Queue{Interface: workqueue.New()}
instance = InstrumentedEnqueueRequestForObject{}
Expand Down Expand Up @@ -69,7 +73,7 @@ var _ = Describe("InstrumentedEnqueueRequestForObject", func() {
}))

// verify metrics
gauges, err := metrics.Registry.Gather()
gauges, err := registry.Gather()
Expect(err).NotTo(HaveOccurred())
Expect(len(gauges)).To(Equal(1))
assertMetrics(gauges[0], 1, []*corev1.Pod{pod})
Expand Down Expand Up @@ -104,7 +108,7 @@ var _ = Describe("InstrumentedEnqueueRequestForObject", func() {
}))

// verify metrics
gauges, err := metrics.Registry.Gather()
gauges, err := registry.Gather()
Expect(err).NotTo(HaveOccurred())
Expect(len(gauges)).To(Equal(0))
})
Expand All @@ -129,7 +133,7 @@ var _ = Describe("InstrumentedEnqueueRequestForObject", func() {
}))

// verify metrics
gauges, err := metrics.Registry.Gather()
gauges, err := registry.Gather()
Expect(err).NotTo(HaveOccurred())
Expect(len(gauges)).To(Equal(0))
})
Expand Down Expand Up @@ -164,7 +168,7 @@ var _ = Describe("InstrumentedEnqueueRequestForObject", func() {
}))

// verify metrics
gauges, err := metrics.Registry.Gather()
gauges, err := registry.Gather()
Expect(err).NotTo(HaveOccurred())
Expect(len(gauges)).To(Equal(1))
assertMetrics(gauges[0], 2, []*corev1.Pod{newpod, pod})
Expand Down
36 changes: 36 additions & 0 deletions handler/pause.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
// Copyright 2021 The Operator-SDK Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package handler

import (
"github.com/operator-framework/operator-lib/internal/annotation"

"sigs.k8s.io/controller-runtime/pkg/handler"
)

// NewPause returns an event handler that filters out objects with a truthy "paused" annotation.
// When an annotation with key string key is present on an object and has a truthy value, ex. "true",
// the watch constructed with this event handler will not add events for that object to the queue.
// Key string key must be a valid annotation key.
//
// A note on security: since users that can CRUD a particular API can apply or remove annotations with
// default cluster admission controllers, this same set of users can therefore start or stop reconciliation
// of objects via this pause mechanism. If this is a concern, configure an admission webhook to enforce
// a stricter annotation modification policy. See AdmissionReview configuration for user info available
// to a webhook:
// https://kubernetes.io/docs/reference/access-authn-authz/extensible-admission-controllers/#request
func NewPause(key string) (handler.EventHandler, error) {
return annotation.NewFalsyEventHandler(key, annotation.Options{Log: log})
}
210 changes: 210 additions & 0 deletions internal/annotation/filter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,210 @@
// Copyright 2021 The Operator-SDK Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Package annotation contains event handler and predicate builders for annotations.
// There are two types of builders:
//
// - Falsy builders result in objects being queued if the annotation is not present OR contains a falsy value.
// - Truthy builders are the falsy complement: objects will be enqueued if the annotation is present AND contains a truthy value.
//
// Truthiness/falsiness is determined by Go's strconv.ParseBool().
package annotation

import (
"fmt"
"strconv"

"github.com/go-logr/logr"
"k8s.io/apimachinery/pkg/api/validation"
"k8s.io/apimachinery/pkg/util/validation/field"
"k8s.io/client-go/util/workqueue"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
logf "sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/predicate"
)

// Options configures a filter.
type Options struct {
Log logr.Logger

// Internally set.
truthy bool
}

// NewFalsyPredicate returns a predicate that passes objects
// that do not have annotation with key string key or whose value is falsy.
func NewFalsyPredicate(key string, opts Options) (predicate.Predicate, error) {
opts.truthy = false
return newFilter(key, opts)
}

// NewFalsyEventHandler returns an event handler that enqueues objects
// that do not have annotation with key string key or whose value is falsy.
func NewFalsyEventHandler(key string, opts Options) (handler.EventHandler, error) {
opts.truthy = false
return newEventHandler(key, opts)
}

// NewTruthyPredicate returns a predicate that passes objects
// that do have annotation with key string key and whose value is truthy.
func NewTruthyPredicate(key string, opts Options) (predicate.Predicate, error) {
opts.truthy = true
return newFilter(key, opts)
}

// NewTruthyEventHandler returns an event handler that enqueues objects
// that do have annotation with key string key and whose value is truthy.
func NewTruthyEventHandler(key string, opts Options) (handler.EventHandler, error) {
opts.truthy = true
return newEventHandler(key, opts)
}

func defaultOptions(opts *Options) {
if opts.Log == nil {
opts.Log = logf.Log
}
}

// newEventHandler returns a filter for use as an event handler.
func newEventHandler(key string, opts Options) (handler.EventHandler, error) {
f, err := newFilter(key, opts)
if err != nil {
return nil, err
}

f.hdlr = &handler.EnqueueRequestForObject{}
return handler.Funcs{
CreateFunc: func(evt event.CreateEvent, q workqueue.RateLimitingInterface) {
if f.Create(evt) {
f.hdlr.Create(evt, q)
}
},
UpdateFunc: func(evt event.UpdateEvent, q workqueue.RateLimitingInterface) {
if f.Update(evt) {
f.hdlr.Update(evt, q)
}
},
DeleteFunc: func(evt event.DeleteEvent, q workqueue.RateLimitingInterface) {
if f.Delete(evt) {
f.hdlr.Delete(evt, q)
}
},
GenericFunc: func(evt event.GenericEvent, q workqueue.RateLimitingInterface) {
if f.Generic(evt) {
f.hdlr.Generic(evt, q)
}
},
}, nil
}

// newFilter returns a filter for use as a predicate.
func newFilter(key string, opts Options) (*filter, error) {
defaultOptions(&opts)

// Make sure the annotation key and eventual value are valid together.
if err := validateAnnotation(key, opts.truthy); err != nil {
return nil, err
}

f := filter{}
f.key = key
// Falsy filters return true in all cases except when the annotation is present and true.
// Truthy filters only return true when the annotation is present and true.
f.ret = !opts.truthy
f.log = opts.Log.WithName("pause")
return &f, nil
}

func validateAnnotation(key string, truthy bool) error {
fldPath := field.NewPath("metadata", "annotations")
annotation := map[string]string{key: fmt.Sprintf("%v", truthy)}
return validation.ValidateAnnotations(annotation, fldPath).ToAggregate()
}

// filter implements a filter for objects with a truthy "paused" annotation (see Key).
// When this annotation is removed or value does not evaluate to "true",
// the controller will see events from these objects again.
type filter struct {
key string
ret bool
log logr.Logger
hdlr *handler.EnqueueRequestForObject
}

// Create implements predicate.Predicate.Create().
func (f *filter) Create(evt event.CreateEvent) bool {
if evt.Object == nil {
if f.hdlr == nil {
f.log.Error(nil, "CreateEvent received with no metadata", "event", evt)
}
return f.ret
}
return f.run(evt.Object)
}

// Update implements predicate.Predicate.Update().
func (f *filter) Update(evt event.UpdateEvent) bool {
if evt.ObjectNew != nil {
return f.run(evt.ObjectNew)
} else if evt.ObjectOld != nil {
return f.run(evt.ObjectOld)
}
if f.hdlr == nil {
f.log.Error(nil, "UpdateEvent received with no metadata", "event", evt)
}
return f.ret
}

// Delete implements predicate.Predicate.Delete().
func (f *filter) Delete(evt event.DeleteEvent) bool {
if evt.Object == nil {
if f.hdlr == nil {
f.log.Error(nil, "DeleteEvent received with no metadata", "event", evt)
}
return f.ret
}
return f.run(evt.Object)
}

// Generic implements predicate.Predicate.Generic().
func (f *filter) Generic(evt event.GenericEvent) bool {
if evt.Object == nil {
if f.hdlr == nil {
f.log.Error(nil, "GenericEvent received with no metadata", "event", evt)
}
return f.ret
}
return f.run(evt.Object)
}

func (f *filter) run(obj client.Object) bool {
annotations := obj.GetAnnotations()
if len(annotations) == 0 {
return f.ret
}
annoStr, hasAnno := annotations[f.key]
if !hasAnno {
return f.ret
}
annoBool, err := strconv.ParseBool(annoStr)
if err != nil {
f.log.Error(err, "Bad annotation value", "key", f.key, "value", annoStr)
return f.ret
}
// If the filter is falsy (f.ret == true) and value is false, then the object passes the filter.
// If the filter is truthy (f.ret == false) and value is true, then the object passes the filter.
return !annoBool == f.ret
}
Loading

0 comments on commit 3986790

Please sign in to comment.