Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

⚠ Remove DelegatedClient, move Options in client.New #2150

Merged
merged 1 commit into from
Jan 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 91 additions & 15 deletions pkg/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,22 @@ import (
"sigs.k8s.io/controller-runtime/pkg/log"
)

// Options are creation options for a Client.
type Options struct {
// Scheme, if provided, will be used to map go structs to GroupVersionKinds
Scheme *runtime.Scheme

// Mapper, if provided, will be used to map GroupVersionKinds to Resources
Mapper meta.RESTMapper

// Cache, if provided, is used to read objects from the cache.
Cache *CacheOptions

// WarningHandler is used to configure the warning handler responsible for
// surfacing and handling warnings messages sent by the API server.
WarningHandler WarningHandlerOptions
}

// WarningHandlerOptions are options for configuring a
// warning handler for the client which is responsible
// for surfacing API Server warnings.
Expand All @@ -50,19 +66,21 @@ type WarningHandlerOptions struct {
AllowDuplicateLogs bool
}

// Options are creation options for a Client.
type Options struct {
// Scheme, if provided, will be used to map go structs to GroupVersionKinds
Scheme *runtime.Scheme

// Mapper, if provided, will be used to map GroupVersionKinds to Resources
Mapper meta.RESTMapper

// Opts is used to configure the warning handler responsible for
// surfacing and handling warnings messages sent by the API server.
Opts WarningHandlerOptions
// CacheOptions are options for creating a cache-backed client.
type CacheOptions struct {
// Reader is a cache-backed reader that will be used to read objects from the cache.
// +required
Reader Reader
// DisableFor is a list of objects that should not be read from the cache.
DisableFor []Object
// Unstructured is a flag that indicates whether the cache-backed client should
// read unstructured objects or lists from the cache.
Unstructured bool
}

// NewClientFunc allows a user to define how to create a client.
type NewClientFunc func(config *rest.Config, options Options) (Client, error)

// New returns a new Client using the provided config and Options.
// The returned client reads *and* writes directly from the server
// (it doesn't use object caches). It understands how to work with
Expand All @@ -82,7 +100,7 @@ func newClient(config *rest.Config, options Options) (*client, error) {
return nil, fmt.Errorf("must provide non-nil rest.Config to client.New")
}

if !options.Opts.SuppressWarnings {
if !options.WarningHandler.SuppressWarnings {
// surface warnings
logger := log.Log.WithName("KubeAPIWarningLogger")
// Set a WarningHandler, the default WarningHandler
Expand All @@ -93,7 +111,7 @@ func newClient(config *rest.Config, options Options) (*client, error) {
config.WarningHandler = log.NewKubeAPIWarningLogger(
logger,
log.KubeAPIWarningLoggerOptions{
Deduplicate: !options.Opts.AllowDuplicateLogs,
Deduplicate: !options.WarningHandler.AllowDuplicateLogs,
},
)
}
Expand Down Expand Up @@ -143,7 +161,24 @@ func newClient(config *rest.Config, options Options) (*client, error) {
scheme: options.Scheme,
mapper: options.Mapper,
}
if options.Cache == nil || options.Cache.Reader == nil {
return c, nil
}

// We want a cache if we're here.
// Set the cache.
c.cache = options.Cache.Reader

// Load uncached GVKs.
c.cacheUnstructured = options.Cache.Unstructured
uncachedGVKs := map[schema.GroupVersionKind]struct{}{}
for _, obj := range options.Cache.DisableFor {
gvk, err := c.GroupVersionKindFor(obj)
if err != nil {
return nil, err
}
uncachedGVKs[gvk] = struct{}{}
}
return c, nil
}

Expand All @@ -157,6 +192,35 @@ type client struct {
metadataClient metadataClient
scheme *runtime.Scheme
mapper meta.RESTMapper

cache Reader
uncachedGVKs map[schema.GroupVersionKind]struct{}
cacheUnstructured bool
}

func (c *client) shouldBypassCache(obj runtime.Object) (bool, error) {
if c.cache == nil {
return true, nil
}

gvk, err := c.GroupVersionKindFor(obj)
if err != nil {
return false, err
}
// TODO: this is producing unsafe guesses that don't actually work,
// but it matches ~99% of the cases out there.
if meta.IsListType(obj) {
gvk.Kind = strings.TrimSuffix(gvk.Kind, "List")
}
if _, isUncached := c.uncachedGVKs[gvk]; isUncached {
return true, nil
}
if !c.cacheUnstructured {
_, isUnstructured := obj.(*unstructured.Unstructured)
_, isUnstructuredList := obj.(*unstructured.UnstructuredList)
return isUnstructured || isUnstructuredList, nil
}
return false, nil
}

// resetGroupVersionKind is a helper function to restore and preserve GroupVersionKind on an object.
Expand All @@ -169,12 +233,12 @@ func (c *client) resetGroupVersionKind(obj runtime.Object, gvk schema.GroupVersi
}

// GroupVersionKindFor returns the GroupVersionKind for the given object.
func (c *client) GroupVersionKindFor(obj Object) (schema.GroupVersionKind, error) {
func (c *client) GroupVersionKindFor(obj runtime.Object) (schema.GroupVersionKind, error) {
return apiutil.GVKForObject(obj, c.scheme)
}

// IsObjectNamespaced returns true if the GroupVersionKind of the object is namespaced.
func (c *client) IsObjectNamespaced(obj Object) (bool, error) {
func (c *client) IsObjectNamespaced(obj runtime.Object) (bool, error) {
return apiutil.IsObjectNamespaced(obj, c.scheme, c.mapper)
}

Expand Down Expand Up @@ -252,6 +316,12 @@ func (c *client) Patch(ctx context.Context, obj Object, patch Patch, opts ...Pat

// Get implements client.Client.
func (c *client) Get(ctx context.Context, key ObjectKey, obj Object, opts ...GetOption) error {
if isUncached, err := c.shouldBypassCache(obj); err != nil {
return err
} else if !isUncached {
return c.cache.Get(ctx, key, obj, opts...)
}

switch obj.(type) {
case *unstructured.Unstructured:
return c.unstructuredClient.Get(ctx, key, obj, opts...)
Expand All @@ -266,6 +336,12 @@ func (c *client) Get(ctx context.Context, key ObjectKey, obj Object, opts ...Get

// List implements client.Client.
func (c *client) List(ctx context.Context, obj ObjectList, opts ...ListOption) error {
if isUncached, err := c.shouldBypassCache(obj); err != nil {
return err
} else if !isUncached {
return c.cache.List(ctx, obj, opts...)
}

switch x := obj.(type) {
case *unstructured.UnstructuredList:
return c.unstructuredClient.List(ctx, obj, opts...)
Expand Down
72 changes: 33 additions & 39 deletions pkg/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3541,20 +3541,19 @@ U5wwSivyi7vmegHKmblOzNVKA5qPO8zWzqBC
})
})

var _ = Describe("DelegatingClient", func() {
var _ = Describe("ClientWithCache", func() {
Describe("Get", func() {
It("should call cache reader when structured object", func() {
cachedReader := &fakeReader{}
cl, err := client.New(cfg, client.Options{})
Expect(err).NotTo(HaveOccurred())
dReader, err := client.NewDelegatingClient(client.NewDelegatingClientInput{
CacheReader: cachedReader,
Client: cl,
cl, err := client.New(cfg, client.Options{
Cache: &client.CacheOptions{
Reader: cachedReader,
},
})
Expect(err).NotTo(HaveOccurred())
var actual appsv1.Deployment
key := client.ObjectKey{Namespace: "ns", Name: "name"}
Expect(dReader.Get(context.TODO(), key, &actual)).To(Succeed())
Expect(cl.Get(context.TODO(), key, &actual)).To(Succeed())
Expect(1).To(Equal(cachedReader.Called))
})

Expand Down Expand Up @@ -3590,11 +3589,10 @@ var _ = Describe("DelegatingClient", func() {
})
It("should call client reader when not cached", func() {
cachedReader := &fakeReader{}
cl, err := client.New(cfg, client.Options{})
Expect(err).NotTo(HaveOccurred())
dReader, err := client.NewDelegatingClient(client.NewDelegatingClientInput{
CacheReader: cachedReader,
Client: cl,
cl, err := client.New(cfg, client.Options{
Cache: &client.CacheOptions{
Reader: cachedReader,
},
})
Expect(err).NotTo(HaveOccurred())

Expand All @@ -3606,17 +3604,16 @@ var _ = Describe("DelegatingClient", func() {
})
actual.SetName(dep.Name)
key := client.ObjectKey{Namespace: dep.Namespace, Name: dep.Name}
Expect(dReader.Get(context.TODO(), key, actual)).To(Succeed())
Expect(cl.Get(context.TODO(), key, actual)).To(Succeed())
Expect(0).To(Equal(cachedReader.Called))
})
It("should call cache reader when cached", func() {
cachedReader := &fakeReader{}
cl, err := client.New(cfg, client.Options{})
Expect(err).NotTo(HaveOccurred())
dReader, err := client.NewDelegatingClient(client.NewDelegatingClientInput{
CacheReader: cachedReader,
Client: cl,
CacheUnstructured: true,
cl, err := client.New(cfg, client.Options{
Cache: &client.CacheOptions{
Reader: cachedReader,
Unstructured: true,
},
})
Expect(err).NotTo(HaveOccurred())

Expand All @@ -3628,34 +3625,32 @@ var _ = Describe("DelegatingClient", func() {
})
actual.SetName(dep.Name)
key := client.ObjectKey{Namespace: dep.Namespace, Name: dep.Name}
Expect(dReader.Get(context.TODO(), key, actual)).To(Succeed())
Expect(cl.Get(context.TODO(), key, actual)).To(Succeed())
Expect(1).To(Equal(cachedReader.Called))
})
})
})
Describe("List", func() {
It("should call cache reader when structured object", func() {
cachedReader := &fakeReader{}
cl, err := client.New(cfg, client.Options{})
Expect(err).NotTo(HaveOccurred())
dReader, err := client.NewDelegatingClient(client.NewDelegatingClientInput{
CacheReader: cachedReader,
Client: cl,
cl, err := client.New(cfg, client.Options{
Cache: &client.CacheOptions{
Reader: cachedReader,
},
})
Expect(err).NotTo(HaveOccurred())
var actual appsv1.DeploymentList
Expect(dReader.List(context.Background(), &actual)).To(Succeed())
Expect(cl.List(context.Background(), &actual)).To(Succeed())
Expect(1).To(Equal(cachedReader.Called))
})

When("listing unstructured objects", func() {
It("should call client reader when not cached", func() {
cachedReader := &fakeReader{}
cl, err := client.New(cfg, client.Options{})
Expect(err).NotTo(HaveOccurred())
dReader, err := client.NewDelegatingClient(client.NewDelegatingClientInput{
CacheReader: cachedReader,
Client: cl,
cl, err := client.New(cfg, client.Options{
Cache: &client.CacheOptions{
Reader: cachedReader,
},
})
Expect(err).NotTo(HaveOccurred())

Expand All @@ -3665,17 +3660,16 @@ var _ = Describe("DelegatingClient", func() {
Kind: "DeploymentList",
Version: "v1",
})
Expect(dReader.List(context.Background(), actual)).To(Succeed())
Expect(cl.List(context.Background(), actual)).To(Succeed())
Expect(0).To(Equal(cachedReader.Called))
})
It("should call cache reader when cached", func() {
cachedReader := &fakeReader{}
cl, err := client.New(cfg, client.Options{})
Expect(err).NotTo(HaveOccurred())
dReader, err := client.NewDelegatingClient(client.NewDelegatingClientInput{
CacheReader: cachedReader,
Client: cl,
CacheUnstructured: true,
cl, err := client.New(cfg, client.Options{
Cache: &client.CacheOptions{
Reader: cachedReader,
Unstructured: true,
},
})
Expect(err).NotTo(HaveOccurred())

Expand All @@ -3685,7 +3679,7 @@ var _ = Describe("DelegatingClient", func() {
Kind: "DeploymentList",
Version: "v1",
})
Expect(dReader.List(context.Background(), actual)).To(Succeed())
Expect(cl.List(context.Background(), actual)).To(Succeed())
Expect(1).To(Equal(cachedReader.Called))
})
})
Expand Down
3 changes: 1 addition & 2 deletions pkg/client/doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,7 @@ limitations under the License.
// to the API server.
//
// It is a common pattern in Kubernetes to read from a cache and write to the API
// server. This pattern is covered by the DelegatingClient type, which can
// be used to have a client whose Reader is different from the Writer.
// server. This pattern is covered by the creating the Client with a Cache.
//
// # Options
//
Expand Down
4 changes: 2 additions & 2 deletions pkg/client/dryrun.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,12 @@ func (c *dryRunClient) RESTMapper() meta.RESTMapper {
}

// GroupVersionKindFor returns the GroupVersionKind for the given object.
func (c *dryRunClient) GroupVersionKindFor(obj Object) (schema.GroupVersionKind, error) {
func (c *dryRunClient) GroupVersionKindFor(obj runtime.Object) (schema.GroupVersionKind, error) {
return c.client.GroupVersionKindFor(obj)
}

// IsObjectNamespaced returns true if the GroupVersionKind of the object is namespaced.
func (c *dryRunClient) IsObjectNamespaced(obj Object) (bool, error) {
func (c *dryRunClient) IsObjectNamespaced(obj runtime.Object) (bool, error) {
return c.client.IsObjectNamespaced(obj)
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/client/fake/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -564,12 +564,12 @@ func (c *fakeClient) RESTMapper() meta.RESTMapper {
}

// GroupVersionKindFor returns the GroupVersionKind for the given object.
func (c *fakeClient) GroupVersionKindFor(obj client.Object) (schema.GroupVersionKind, error) {
func (c *fakeClient) GroupVersionKindFor(obj runtime.Object) (schema.GroupVersionKind, error) {
return apiutil.GVKForObject(obj, c.scheme)
}

// IsObjectNamespaced returns true if the GroupVersionKind of the object is namespaced.
func (c *fakeClient) IsObjectNamespaced(obj client.Object) (bool, error) {
func (c *fakeClient) IsObjectNamespaced(obj runtime.Object) (bool, error) {
return apiutil.IsObjectNamespaced(obj, c.scheme, c.restMapper)
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/client/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,9 +171,9 @@ type Client interface {
// RESTMapper returns the rest this client is using.
RESTMapper() meta.RESTMapper
// GroupVersionKindFor returns the GroupVersionKind for the given object.
GroupVersionKindFor(obj Object) (schema.GroupVersionKind, error)
GroupVersionKindFor(obj runtime.Object) (schema.GroupVersionKind, error)
// IsObjectNamespaced returns true if the GroupVersionKind of the object is namespaced.
IsObjectNamespaced(obj Object) (bool, error)
IsObjectNamespaced(obj runtime.Object) (bool, error)
}

// WithWatch supports Watch on top of the CRUD operations supported by
Expand Down
4 changes: 2 additions & 2 deletions pkg/client/namespaced_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,12 @@ func (n *namespacedClient) RESTMapper() meta.RESTMapper {
}

// GroupVersionKindFor returns the GroupVersionKind for the given object.
func (n *namespacedClient) GroupVersionKindFor(obj Object) (schema.GroupVersionKind, error) {
func (n *namespacedClient) GroupVersionKindFor(obj runtime.Object) (schema.GroupVersionKind, error) {
return n.client.GroupVersionKindFor(obj)
}

// IsObjectNamespaced returns true if the GroupVersionKind of the object is namespaced.
func (n *namespacedClient) IsObjectNamespaced(obj Object) (bool, error) {
func (n *namespacedClient) IsObjectNamespaced(obj runtime.Object) (bool, error) {
return n.client.IsObjectNamespaced(obj)
}

Expand Down
Loading