Skip to content

Commit

Permalink
Merge pull request #2177 from vincepri/rework-cluster-options
Browse files Browse the repository at this point in the history
✨ Expose Cache/Client options on Cluster
  • Loading branch information
k8s-ci-robot committed Feb 7, 2023
2 parents f127c11 + ed9c5ef commit f9d8abc
Show file tree
Hide file tree
Showing 6 changed files with 146 additions and 43 deletions.
11 changes: 9 additions & 2 deletions pkg/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ type Options struct {
// WarningHandler is used to configure the warning handler responsible for
// surfacing and handling warnings messages sent by the API server.
WarningHandler WarningHandlerOptions

// DryRun instructs the client to only perform dry run requests.
DryRun *bool
}

// WarningHandlerOptions are options for configuring a
Expand Down Expand Up @@ -94,8 +97,12 @@ type NewClientFunc func(config *rest.Config, options Options) (Client, error)
// corresponding group, version, and kind for the given type. In the
// case of unstructured types, the group, version, and kind will be extracted
// from the corresponding fields on the object.
func New(config *rest.Config, options Options) (Client, error) {
return newClient(config, options)
func New(config *rest.Config, options Options) (c Client, err error) {
c, err = newClient(config, options)
if err == nil && options.DryRun != nil && *options.DryRun {
c = NewDryRunClient(c)
}
return c, err
}

func newClient(config *rest.Config, options Options) (*client, error) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/client/client_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/log/zap"
)

func TestSource(t *testing.T) {
func TestClient(t *testing.T) {
RegisterFailHandler(Fail)
RunSpecs(t, "Client Suite")
}
Expand Down
48 changes: 40 additions & 8 deletions pkg/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,14 @@ import (
corev1 "k8s.io/api/core/v1"
policyv1 "k8s.io/api/policy/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
kscheme "k8s.io/client-go/kubernetes/scheme"
"k8s.io/utils/pointer"

"sigs.k8s.io/controller-runtime/examples/crd/pkg"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand Down Expand Up @@ -221,8 +223,6 @@ U5wwSivyi7vmegHKmblOzNVKA5qPO8zWzqBC
Expect(client.IgnoreNotFound(err)).NotTo(HaveOccurred())
})

// TODO(seans): Cast "cl" as "client" struct from "Client" interface. Then validate the
// instance values for the "client" struct.
Describe("New", func() {
It("should return a new Client", func() {
cl, err := client.New(cfg, client.Options{})
Expand All @@ -236,29 +236,46 @@ U5wwSivyi7vmegHKmblOzNVKA5qPO8zWzqBC
Expect(cl).To(BeNil())
})

// TODO(seans): cast as client struct and inspect Scheme
It("should use the provided Scheme if provided", func() {
cl, err := client.New(cfg, client.Options{Scheme: scheme})
Expect(err).NotTo(HaveOccurred())
Expect(cl).NotTo(BeNil())
Expect(cl.Scheme()).ToNot(BeNil())
Expect(cl.Scheme()).To(Equal(scheme))
})

// TODO(seans): cast as client struct and inspect Scheme
It("should default the Scheme if not provided", func() {
cl, err := client.New(cfg, client.Options{})
Expect(err).NotTo(HaveOccurred())
Expect(cl).NotTo(BeNil())
Expect(cl.Scheme()).ToNot(BeNil())
Expect(cl.Scheme()).To(Equal(kscheme.Scheme))
})

PIt("should use the provided Mapper if provided", func() {

It("should use the provided Mapper if provided", func() {
mapper := meta.NewDefaultRESTMapper([]schema.GroupVersion{})
cl, err := client.New(cfg, client.Options{Mapper: mapper})
Expect(err).NotTo(HaveOccurred())
Expect(cl).NotTo(BeNil())
Expect(cl.RESTMapper()).ToNot(BeNil())
Expect(cl.RESTMapper()).To(Equal(mapper))
})

// TODO(seans): cast as client struct and inspect Mapper
It("should create a Mapper if not provided", func() {
cl, err := client.New(cfg, client.Options{})
Expect(err).NotTo(HaveOccurred())
Expect(cl).NotTo(BeNil())
Expect(cl.RESTMapper()).ToNot(BeNil())
})

It("should use the provided reader cache if provided, on get and list", func() {
cache := &fakeReader{}
cl, err := client.New(cfg, client.Options{Cache: &client.CacheOptions{Reader: cache}})
Expect(err).NotTo(HaveOccurred())
Expect(cl).NotTo(BeNil())
Expect(cl.Get(ctx, client.ObjectKey{Name: "test"}, &appsv1.Deployment{})).To(Succeed())
Expect(cl.List(ctx, &appsv1.DeploymentList{})).To(Succeed())
Expect(cache.Called).To(Equal(2))
})
})

Expand Down Expand Up @@ -350,7 +367,22 @@ U5wwSivyi7vmegHKmblOzNVKA5qPO8zWzqBC
})

Context("with the DryRun option", func() {
It("should not create a new object", func() {
It("should not create a new object, global option", func() {
cl, err := client.New(cfg, client.Options{DryRun: pointer.Bool(true)})
Expect(err).NotTo(HaveOccurred())
Expect(cl).NotTo(BeNil())

By("creating the object (with DryRun)")
err = cl.Create(context.TODO(), dep)
Expect(err).NotTo(HaveOccurred())

actual, err := clientset.AppsV1().Deployments(ns).Get(ctx, dep.Name, metav1.GetOptions{})
Expect(err).To(HaveOccurred())
Expect(apierrors.IsNotFound(err)).To(BeTrue())
Expect(actual).To(Equal(&appsv1.Deployment{}))
})

It("should not create a new object, inline option", func() {
cl, err := client.New(cfg, client.Options{})
Expect(err).NotTo(HaveOccurred())
Expect(cl).NotTo(BeNil())
Expand Down
7 changes: 4 additions & 3 deletions pkg/client/dryrun_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/utils/pointer"

"sigs.k8s.io/controller-runtime/pkg/client"
)
Expand All @@ -40,10 +41,10 @@ var _ = Describe("DryRunClient", func() {
ctx := context.Background()

getClient := func() client.Client {
nonDryRunClient, err := client.New(cfg, client.Options{})
cl, err := client.New(cfg, client.Options{DryRun: pointer.Bool(true)})
Expect(err).NotTo(HaveOccurred())
Expect(nonDryRunClient).NotTo(BeNil())
return client.NewDryRunClient(nonDryRunClient)
Expect(cl).NotTo(BeNil())
return cl
}

BeforeEach(func() {
Expand Down
111 changes: 87 additions & 24 deletions pkg/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/record"
"k8s.io/utils/pointer"
"sigs.k8s.io/controller-runtime/pkg/client/apiutil"
logf "sigs.k8s.io/controller-runtime/pkg/internal/log"

Expand Down Expand Up @@ -82,6 +83,8 @@ type Options struct {
Scheme *runtime.Scheme

// MapperProvider provides the rest mapper used to map go types to Kubernetes APIs
//
// Deprecated: Set Cache.Mapper and Client.Mapper directly instead.
MapperProvider func(c *rest.Config, httpClient *http.Client) (meta.RESTMapper, error)

// Logger is the logger that should be used by this Cluster.
Expand All @@ -102,29 +105,54 @@ type Options struct {
// Note: If a namespace is specified, controllers can still Watch for a
// cluster-scoped resource (e.g Node). For namespaced resources the cache
// will only hold objects from the desired namespace.
//
// Deprecated: Use Cache.Namespaces instead.
Namespace string

// HTTPClient is the http client that will be used to create the default
// Cache and Client. If not set the rest.HTTPClientFor function will be used
// to create the http client.
HTTPClient *http.Client

// Cache is the cache.Options that will be used to create the default Cache.
// By default, the cache will watch and list requested objects in all namespaces.
Cache cache.Options

// NewCache is the function that will create the cache to be used
// by the manager. If not set this will use the default new cache function.
//
// When using a custom NewCache, the Cache options will be passed to the
// NewCache function.
//
// NOTE: LOW LEVEL PRIMITIVE!
// Only use a custom NewCache if you know what you are doing.
NewCache cache.NewCacheFunc

// Client is the client.Options that will be used to create the default Client.
// By default, the client will use the cache for reads and direct calls for writes.
Client client.Options

// NewClient is the func that creates the client to be used by the manager.
// If not set this will create a Client backed by a Cache for read operations
// and a direct Client for write operations.
// NOTE: The default client will not cache Unstructured.
//
// When using a custom NewClient, the Client options will be passed to the
// NewClient function.
//
// NOTE: LOW LEVEL PRIMITIVE!
// Only use a custom NewClient if you know what you are doing.
NewClient client.NewClientFunc

// ClientDisableCacheFor tells the client that, if any cache is used, to bypass it
// for the given objects.
//
// Deprecated: Use Client.Cache.DisableFor instead.
ClientDisableCacheFor []client.Object

// DryRunClient specifies whether the client should be configured to enforce
// dryRun mode.
//
// Deprecated: Use Client.DryRun instead.
DryRunClient bool

// EventBroadcaster records Events emitted by the manager and sends them to the Kubernetes API
Expand Down Expand Up @@ -171,36 +199,71 @@ func New(config *rest.Config, opts ...Option) (Cluster, error) {
}

// Create the cache for the cached read client and registering informers
cache, err := options.NewCache(config, cache.Options{
HTTPClient: options.HTTPClient,
Scheme: options.Scheme,
Mapper: mapper,
ResyncEvery: options.SyncPeriod,
Namespaces: []string{options.Namespace},
})
cacheOpts := options.Cache
{
if cacheOpts.Scheme == nil {
cacheOpts.Scheme = options.Scheme
}
if cacheOpts.Mapper == nil {
cacheOpts.Mapper = mapper
}
if cacheOpts.HTTPClient == nil {
cacheOpts.HTTPClient = options.HTTPClient
}
if cacheOpts.ResyncEvery == nil {
cacheOpts.ResyncEvery = options.SyncPeriod
}
if len(cacheOpts.Namespaces) == 0 && options.Namespace != "" {
cacheOpts.Namespaces = []string{options.Namespace}
}
}
cache, err := options.NewCache(config, cacheOpts)
if err != nil {
return nil, err
}

writeObj, err := options.NewClient(config, client.Options{
HTTPClient: options.HTTPClient,
Scheme: options.Scheme,
Mapper: mapper,
Cache: &client.CacheOptions{
Reader: cache,
DisableFor: options.ClientDisableCacheFor,
},
})
// Create the client, and default its options.
clientOpts := options.Client
{
if clientOpts.Scheme == nil {
clientOpts.Scheme = options.Scheme
}
if clientOpts.Mapper == nil {
clientOpts.Mapper = mapper
}
if clientOpts.HTTPClient == nil {
clientOpts.HTTPClient = options.HTTPClient
}
if clientOpts.Cache == nil {
clientOpts.Cache = &client.CacheOptions{
Unstructured: false,
}
}
if clientOpts.Cache.Reader == nil {
clientOpts.Cache.Reader = cache
}

// For backward compatibility, the ClientDisableCacheFor option should
// be appended to the DisableFor option in the client.
clientOpts.Cache.DisableFor = append(clientOpts.Cache.DisableFor, options.ClientDisableCacheFor...)

if clientOpts.DryRun == nil && options.DryRunClient {
// For backward compatibility, the DryRunClient (if set) option should override
// the DryRun option in the client (if unset).
clientOpts.DryRun = pointer.Bool(true)
}
}
clientWriter, err := options.NewClient(config, clientOpts)
if err != nil {
return nil, err
}

if options.DryRunClient {
writeObj = client.NewDryRunClient(writeObj)
}

// Create the API Reader, a client with no cache.
apiReader, err := client.New(config, client.Options{HTTPClient: options.HTTPClient, Scheme: options.Scheme, Mapper: mapper})
clientReader, err := client.New(config, client.Options{
HTTPClient: options.HTTPClient,
Scheme: options.Scheme,
Mapper: mapper,
})
if err != nil {
return nil, err
}
Expand All @@ -219,8 +282,8 @@ func New(config *rest.Config, opts ...Option) (Cluster, error) {
scheme: options.Scheme,
cache: cache,
fieldIndexes: cache,
client: writeObj,
apiReader: apiReader,
client: clientWriter,
apiReader: clientReader,
recorderProvider: recorderProvider,
mapper: mapper,
logger: options.Logger,
Expand Down
10 changes: 5 additions & 5 deletions pkg/manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -348,15 +348,15 @@ func New(config *rest.Config, options Options) (Manager, error) {

cluster, err := cluster.New(config, func(clusterOptions *cluster.Options) {
clusterOptions.Scheme = options.Scheme
clusterOptions.MapperProvider = options.MapperProvider
clusterOptions.MapperProvider = options.MapperProvider //nolint:staticcheck
clusterOptions.Logger = options.Logger
clusterOptions.SyncPeriod = options.SyncPeriod
clusterOptions.Namespace = options.Namespace
clusterOptions.Namespace = options.Namespace //nolint:staticcheck
clusterOptions.NewCache = options.NewCache
clusterOptions.NewClient = options.NewClient
clusterOptions.ClientDisableCacheFor = options.ClientDisableCacheFor
clusterOptions.DryRunClient = options.DryRunClient
clusterOptions.EventBroadcaster = options.EventBroadcaster //nolint:staticcheck
clusterOptions.ClientDisableCacheFor = options.ClientDisableCacheFor //nolint:staticcheck
clusterOptions.DryRunClient = options.DryRunClient //nolint:staticcheck
clusterOptions.EventBroadcaster = options.EventBroadcaster //nolint:staticcheck
})
if err != nil {
return nil, err
Expand Down

0 comments on commit f9d8abc

Please sign in to comment.