Skip to content

Commit

Permalink
Fixes for controller-runtime rebase 0.8.2->0.9.2 (#73)
Browse files Browse the repository at this point in the history
Signed-off-by: Oren Shomron <shomron@gmail.com>
  • Loading branch information
shomron authored and ritazh committed Sep 10, 2021
1 parent 519bcc0 commit d390a0a
Show file tree
Hide file tree
Showing 7 changed files with 107 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,30 @@ func New(config *rest.Config, opts cache.Options) (cache.Cache, error) {
return &dynamicInformerCache{InformersMap: im}, nil
}

// BuilderWithOptions returns a Cache constructor that will build the a cache
// honoring the options argument, this is useful to specify options like
// SelectorsByObject
// WARNING: if SelectorsByObject is specified. filtered out resources are not
// returned.
func BuilderWithOptions(options cache.Options) cache.NewCacheFunc {
return func(config *rest.Config, opts cache.Options) (cache.Cache, error) {
if opts.Scheme == nil {
opts.Scheme = options.Scheme
}
if opts.Mapper == nil {
opts.Mapper = options.Mapper
}
if opts.Resync == nil {
opts.Resync = options.Resync
}
if opts.Namespace == "" {
opts.Namespace = options.Namespace
}
opts.SelectorsByObject = options.SelectorsByObject
return New(config, opts)
}
}

func defaultOpts(config *rest.Config, opts cache.Options) (cache.Options, error) {
// Use the default Kubernetes Scheme if unset
if opts.Scheme == nil {
Expand All @@ -59,7 +83,7 @@ func defaultOpts(config *rest.Config, opts cache.Options) (cache.Options, error)
// Construct a new Mapper if unset
if opts.Mapper == nil {
var err error
opts.Mapper, err = apiutil.NewDiscoveryRESTMapper(config)
opts.Mapper, err = apiutil.NewDynamicRESTMapper(config)
if err != nil {
log.WithName("setup").Error(err, "Failed to get API Group-Resources")
return opts, fmt.Errorf("could not create RESTMapper from config")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -672,7 +672,7 @@ func CacheTest(createCacheFunc func(config *rest.Config, opts cache.Options) (ca

It("should be able to restrict cache to a namespace", func() {
By("creating a namespaced cache")
namespacedCache, err := cache.New(cfg, cache.Options{Namespace: testNamespaceOne})
namespacedCache, err := dynamiccache.New(cfg, cache.Options{Namespace: testNamespaceOne})
Expect(err).NotTo(HaveOccurred())

By("running the cache and waiting for it to sync")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,9 @@ import (
)

var (
_ cache.Informers = &dynamicInformerCache{}
_ client.Reader = &dynamicInformerCache{}
_ cache.Cache = &dynamicInformerCache{}
_ cache.Informers = &dynamicInformerCache{}
_ client.Reader = &dynamicInformerCache{}
_ cache.Cache = &dynamicInformerCache{}
)

// ErrCacheNotStarted is returned when trying to read from the cache that wasn't started.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@ package dynamiccache_test
import (
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"github.com/open-policy-agent/gatekeeper/third_party/sigs.k8s.io/controller-runtime/pkg/dynamiccache"

"k8s.io/client-go/rest"

"github.com/open-policy-agent/gatekeeper/third_party/sigs.k8s.io/controller-runtime/pkg/dynamiccache"
"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/client/apiutil"
"sigs.k8s.io/controller-runtime/pkg/manager"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,17 @@ type specificInformersMap struct {
selectors SelectorsByGVK
}

// Start starts the informer managed by a MapEntry.
// Blocks until the informer stops. The informer can be stopped
// either individually (via the entry's stop channel) or globally
// via the provided stop argument.
func (e *MapEntry) Start(stop <-chan struct{}) {
// Stop on either the whole map stopping or just this informer being removed.
internalStop, cancel := eitherChan(stop, e.stop)
defer cancel()
e.Informer.Run(internalStop)
}

// Start calls Run on each of the informers and sets started to true. Blocks on the context.
// It doesn't return start because it can't return an error, and it's not a runnable directly.
func (ip *specificInformersMap) Start(ctx context.Context) {
Expand All @@ -148,8 +159,8 @@ func (ip *specificInformersMap) Start(ctx context.Context) {
ip.stop = ctx.Done()

// Start each informer
for _, informer := range ip.informersByGVK {
go informer.Informer.Run(ctx.Done())
for _, entry := range ip.informersByGVK {
go entry.Start(ctx.Done())
}

// Set started to true so we immediately start any informers added later.
Expand Down Expand Up @@ -199,8 +210,12 @@ func (ip *specificInformersMap) Get(ctx context.Context, gvk schema.GroupVersion

if started && !i.Informer.HasSynced() {
// Wait for it to sync before returning the Informer so that folks don't read from a stale cache.
if !cache.WaitForCacheSync(ctx.Done(), i.Informer.HasSynced) {
return started, nil, apierrors.NewTimeoutError(fmt.Sprintf("failed waiting for %T Informer to sync", obj), 0)
// Cancel for context, informer stopping, or entire map stopping.
syncStop, cancel := mergeChan(ctx.Done(), i.stop, ip.stop)
defer cancel()
if !cache.WaitForCacheSync(syncStop, i.Informer.HasSynced) {
// Return entry even on timeout - caller may have intended a non-blocking fetch.
return started, i, apierrors.NewTimeoutError(fmt.Sprintf("failed waiting for %T Informer to sync", obj), 0)
}
}

Expand Down Expand Up @@ -241,14 +256,15 @@ func (ip *specificInformersMap) addInformerToMap(gvk schema.GroupVersionKind, ob
i := &MapEntry{
Informer: ni,
Reader: CacheReader{indexer: ni.GetIndexer(), groupVersionKind: gvk, scopeName: rm.Scope.Name()},
stop: make(chan struct{}),
}
ip.informersByGVK[gvk] = i

// Start the Informer if need by
// TODO(seans): write thorough tests and document what happens here - can you add indexers?
// can you add eventhandlers?
if ip.started {
go i.Informer.Run(ip.stop)
go i.Start(ip.stop)
}
return i, ip.started, nil
}
Expand All @@ -266,7 +282,6 @@ func (ip *specificInformersMap) Remove(gvk schema.GroupVersionKind) {
delete(ip.informersByGVK, gvk)
}


// newListWatch returns a new ListWatch object that can be used to create a SharedIndexInformer.
func createStructuredListWatch(gvk schema.GroupVersionKind, ip *specificInformersMap) (*cache.ListWatch, error) {
// Kubernetes APIs work against Resources, not GroupVersionKinds. Map the
Expand Down Expand Up @@ -401,8 +416,57 @@ func createMetadataListWatch(gvk schema.GroupVersionKind, ip *specificInformersM
// hammer the apiserver with list requests simultaneously.
func resyncPeriod(resync time.Duration) func() time.Duration {
return func() time.Duration {
/* #nosec */
// using math/rand insted of crypto/rand will cause G404 issue while using gosec
// the factor will fall into [0.9, 1.1)
factor := rand.Float64()/5.0 + 0.9 //nolint:gosec
return time.Duration(float64(resync.Nanoseconds()) * factor)
}
}

// eitherChan returns a channel that is closed when either of the input channels are signaled.
// The caller must call the returned CancelFunc to ensure no resources are leaked.
func eitherChan(a, b <-chan struct{}) (<-chan struct{}, context.CancelFunc) {
var once sync.Once
out := make(chan struct{})
cancel := make(chan struct{})
cancelFunc := func() {
once.Do(func() {
close(cancel)
})
}
go func() {
defer close(out)
select {
case <-a:
case <-b:
case <-cancel:
}
}()

return out, cancelFunc
}

// mergeChan returns a channel that is closed when any of the input channels are signaled.
// The caller must call the returned CancelFunc to ensure no resources are leaked.
func mergeChan(a, b, c <-chan struct{}) (<-chan struct{}, context.CancelFunc) {
var once sync.Once
out := make(chan struct{})
cancel := make(chan struct{})
cancelFunc := func() {
once.Do(func() {
close(cancel)
})
}
go func() {
defer close(out)
select {
case <-a:
case <-b:
case <-c:
case <-cancel:
}
}()

return out, cancelFunc
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
"fmt"
"time"

"github.com/open-policy-agent/gatekeeper/third_party/sigs.k8s.io/controller-runtime/pkg/dynamiccache/internal"
"github.com/open-policy-agent/gatekeeper/third_party/sigs.k8s.io/controller-runtime/pkg/internal/objectutil"
corev1 "k8s.io/api/core/v1"
apimeta "k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/runtime"
Expand Down Expand Up @@ -93,7 +93,7 @@ func (c *multiNamespaceCache) GetInformer(ctx context.Context, obj client.Object

// If the object is clusterscoped, get the informer from clusterCache,
// if not use the namespaced caches.
isNamespaced, err := internal.IsAPINamespaced(obj, c.Scheme, c.RESTMapper)
isNamespaced, err := objectutil.IsAPINamespaced(obj, c.Scheme, c.RESTMapper)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -123,7 +123,7 @@ func (c *multiNamespaceCache) GetInformerForKind(ctx context.Context, gvk schema

// If the object is clusterscoped, get the informer from clusterCache,
// if not use the namespaced caches.
isNamespaced, err := internal.IsAPINamespacedWithGVK(gvk, c.Scheme, c.RESTMapper)
isNamespaced, err := objectutil.IsAPINamespacedWithGVK(gvk, c.Scheme, c.RESTMapper)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -187,7 +187,7 @@ func (c *multiNamespaceCache) WaitForCacheSync(ctx context.Context) bool {
}

func (c *multiNamespaceCache) IndexField(ctx context.Context, obj client.Object, field string, extractValue client.IndexerFunc) error {
isNamespaced, err := internal.IsAPINamespaced(obj, c.Scheme, c.RESTMapper)
isNamespaced, err := objectutil.IsAPINamespaced(obj, c.Scheme, c.RESTMapper)
if err != nil {
return nil //nolint:nilerr
}
Expand All @@ -205,7 +205,7 @@ func (c *multiNamespaceCache) IndexField(ctx context.Context, obj client.Object,
}

func (c *multiNamespaceCache) Get(ctx context.Context, key client.ObjectKey, obj client.Object) error {
isNamespaced, err := internal.IsAPINamespaced(obj, c.Scheme, c.RESTMapper)
isNamespaced, err := objectutil.IsAPINamespaced(obj, c.Scheme, c.RESTMapper)
if err != nil {
return err
}
Expand All @@ -227,7 +227,7 @@ func (c *multiNamespaceCache) List(ctx context.Context, list client.ObjectList,
listOpts := client.ListOptions{}
listOpts.ApplyOptions(opts)

isNamespaced, err := internal.IsAPINamespaced(list, c.Scheme, c.RESTMapper)
isNamespaced, err := objectutil.IsAPINamespaced(list, c.Scheme, c.RESTMapper)
if err != nil {
return err
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

package internal
package objectutil

import (
"errors"
Expand Down

0 comments on commit d390a0a

Please sign in to comment.