Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

✨ Expose Cache/Client options on Cluster #2177

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions pkg/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ type Options struct {
// WarningHandler is used to configure the warning handler responsible for
// surfacing and handling warnings messages sent by the API server.
WarningHandler WarningHandlerOptions

// DryRun instructs the client to only perform dry run requests.
DryRun *bool
}

// WarningHandlerOptions are options for configuring a
Expand Down Expand Up @@ -94,8 +97,12 @@ type NewClientFunc func(config *rest.Config, options Options) (Client, error)
// corresponding group, version, and kind for the given type. In the
// case of unstructured types, the group, version, and kind will be extracted
// from the corresponding fields on the object.
func New(config *rest.Config, options Options) (Client, error) {
return newClient(config, options)
func New(config *rest.Config, options Options) (c Client, err error) {
c, err = newClient(config, options)
if err == nil && options.DryRun != nil && *options.DryRun {
c = NewDryRunClient(c)
}
return c, err
}

func newClient(config *rest.Config, options Options) (*client, error) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/client/client_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/log/zap"
)

func TestSource(t *testing.T) {
func TestClient(t *testing.T) {
RegisterFailHandler(Fail)
RunSpecs(t, "Client Suite")
}
Expand Down
48 changes: 40 additions & 8 deletions pkg/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,14 @@ import (
corev1 "k8s.io/api/core/v1"
policyv1 "k8s.io/api/policy/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
kscheme "k8s.io/client-go/kubernetes/scheme"
"k8s.io/utils/pointer"

"sigs.k8s.io/controller-runtime/examples/crd/pkg"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand Down Expand Up @@ -221,8 +223,6 @@ U5wwSivyi7vmegHKmblOzNVKA5qPO8zWzqBC
Expect(client.IgnoreNotFound(err)).NotTo(HaveOccurred())
})

// TODO(seans): Cast "cl" as "client" struct from "Client" interface. Then validate the
// instance values for the "client" struct.
Describe("New", func() {
It("should return a new Client", func() {
cl, err := client.New(cfg, client.Options{})
Expand All @@ -236,29 +236,46 @@ U5wwSivyi7vmegHKmblOzNVKA5qPO8zWzqBC
Expect(cl).To(BeNil())
})

// TODO(seans): cast as client struct and inspect Scheme
It("should use the provided Scheme if provided", func() {
cl, err := client.New(cfg, client.Options{Scheme: scheme})
Expect(err).NotTo(HaveOccurred())
Expect(cl).NotTo(BeNil())
Expect(cl.Scheme()).ToNot(BeNil())
Expect(cl.Scheme()).To(Equal(scheme))
})

// TODO(seans): cast as client struct and inspect Scheme
It("should default the Scheme if not provided", func() {
cl, err := client.New(cfg, client.Options{})
Expect(err).NotTo(HaveOccurred())
Expect(cl).NotTo(BeNil())
Expect(cl.Scheme()).ToNot(BeNil())
Expect(cl.Scheme()).To(Equal(kscheme.Scheme))
})

PIt("should use the provided Mapper if provided", func() {

It("should use the provided Mapper if provided", func() {
mapper := meta.NewDefaultRESTMapper([]schema.GroupVersion{})
cl, err := client.New(cfg, client.Options{Mapper: mapper})
Expect(err).NotTo(HaveOccurred())
Expect(cl).NotTo(BeNil())
Expect(cl.RESTMapper()).ToNot(BeNil())
Expect(cl.RESTMapper()).To(Equal(mapper))
})

// TODO(seans): cast as client struct and inspect Mapper
It("should create a Mapper if not provided", func() {
cl, err := client.New(cfg, client.Options{})
Expect(err).NotTo(HaveOccurred())
Expect(cl).NotTo(BeNil())
Expect(cl.RESTMapper()).ToNot(BeNil())
})

It("should use the provided reader cache if provided, on get and list", func() {
cache := &fakeReader{}
cl, err := client.New(cfg, client.Options{Cache: &client.CacheOptions{Reader: cache}})
Expect(err).NotTo(HaveOccurred())
Expect(cl).NotTo(BeNil())
Expect(cl.Get(ctx, client.ObjectKey{Name: "test"}, &appsv1.Deployment{})).To(Succeed())
Expect(cl.List(ctx, &appsv1.DeploymentList{})).To(Succeed())
Expect(cache.Called).To(Equal(2))
})
})

Expand Down Expand Up @@ -350,7 +367,22 @@ U5wwSivyi7vmegHKmblOzNVKA5qPO8zWzqBC
})

Context("with the DryRun option", func() {
It("should not create a new object", func() {
It("should not create a new object, global option", func() {
cl, err := client.New(cfg, client.Options{DryRun: pointer.Bool(true)})
Expect(err).NotTo(HaveOccurred())
Expect(cl).NotTo(BeNil())

By("creating the object (with DryRun)")
err = cl.Create(context.TODO(), dep)
Expect(err).NotTo(HaveOccurred())

actual, err := clientset.AppsV1().Deployments(ns).Get(ctx, dep.Name, metav1.GetOptions{})
Expect(err).To(HaveOccurred())
Expect(apierrors.IsNotFound(err)).To(BeTrue())
Expect(actual).To(Equal(&appsv1.Deployment{}))
})

It("should not create a new object, inline option", func() {
cl, err := client.New(cfg, client.Options{})
Expect(err).NotTo(HaveOccurred())
Expect(cl).NotTo(BeNil())
Expand Down
7 changes: 4 additions & 3 deletions pkg/client/dryrun_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/utils/pointer"

"sigs.k8s.io/controller-runtime/pkg/client"
)
Expand All @@ -40,10 +41,10 @@ var _ = Describe("DryRunClient", func() {
ctx := context.Background()

getClient := func() client.Client {
nonDryRunClient, err := client.New(cfg, client.Options{})
cl, err := client.New(cfg, client.Options{DryRun: pointer.Bool(true)})
Expect(err).NotTo(HaveOccurred())
Expect(nonDryRunClient).NotTo(BeNil())
return client.NewDryRunClient(nonDryRunClient)
Expect(cl).NotTo(BeNil())
return cl
}

BeforeEach(func() {
Expand Down
111 changes: 87 additions & 24 deletions pkg/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/record"
"k8s.io/utils/pointer"
"sigs.k8s.io/controller-runtime/pkg/client/apiutil"
logf "sigs.k8s.io/controller-runtime/pkg/internal/log"

Expand Down Expand Up @@ -82,6 +83,8 @@ type Options struct {
Scheme *runtime.Scheme

// MapperProvider provides the rest mapper used to map go types to Kubernetes APIs
//
// Deprecated: Set Cache.Mapper and Client.Mapper directly instead.
MapperProvider func(c *rest.Config, httpClient *http.Client) (meta.RESTMapper, error)

// Logger is the logger that should be used by this Cluster.
Expand All @@ -102,29 +105,54 @@ type Options struct {
// Note: If a namespace is specified, controllers can still Watch for a
// cluster-scoped resource (e.g Node). For namespaced resources the cache
// will only hold objects from the desired namespace.
//
// Deprecated: Use Cache.Namespaces instead.
Namespace string

// HTTPClient is the http client that will be used to create the default
// Cache and Client. If not set the rest.HTTPClientFor function will be used
// to create the http client.
HTTPClient *http.Client

// Cache is the cache.Options that will be used to create the default Cache.
// By default, the cache will watch and list requested objects in all namespaces.
Cache cache.Options

// NewCache is the function that will create the cache to be used
// by the manager. If not set this will use the default new cache function.
//
// When using a custom NewCache, the Cache options will be passed to the
// NewCache function.
//
// NOTE: LOW LEVEL PRIMITIVE!
// Only use a custom NewCache if you know what you are doing.
NewCache cache.NewCacheFunc

// Client is the client.Options that will be used to create the default Client.
// By default, the client will use the cache for reads and direct calls for writes.
Client client.Options

// NewClient is the func that creates the client to be used by the manager.
// If not set this will create a Client backed by a Cache for read operations
// and a direct Client for write operations.
// NOTE: The default client will not cache Unstructured.
//
// When using a custom NewClient, the Client options will be passed to the
// NewClient function.
//
// NOTE: LOW LEVEL PRIMITIVE!
// Only use a custom NewClient if you know what you are doing.
NewClient client.NewClientFunc

// ClientDisableCacheFor tells the client that, if any cache is used, to bypass it
// for the given objects.
//
// Deprecated: Use Client.Cache.DisableFor instead.
ClientDisableCacheFor []client.Object

// DryRunClient specifies whether the client should be configured to enforce
// dryRun mode.
//
// Deprecated: Use Client.DryRun instead.
DryRunClient bool

// EventBroadcaster records Events emitted by the manager and sends them to the Kubernetes API
Expand Down Expand Up @@ -171,36 +199,71 @@ func New(config *rest.Config, opts ...Option) (Cluster, error) {
}

// Create the cache for the cached read client and registering informers
cache, err := options.NewCache(config, cache.Options{
HTTPClient: options.HTTPClient,
Scheme: options.Scheme,
Mapper: mapper,
ResyncEvery: options.SyncPeriod,
Namespaces: []string{options.Namespace},
})
cacheOpts := options.Cache
{
if cacheOpts.Scheme == nil {
cacheOpts.Scheme = options.Scheme
}
if cacheOpts.Mapper == nil {
cacheOpts.Mapper = mapper
}
if cacheOpts.HTTPClient == nil {
cacheOpts.HTTPClient = options.HTTPClient
}
if cacheOpts.ResyncEvery == nil {
cacheOpts.ResyncEvery = options.SyncPeriod
}
if len(cacheOpts.Namespaces) == 0 && options.Namespace != "" {
cacheOpts.Namespaces = []string{options.Namespace}
}
}
cache, err := options.NewCache(config, cacheOpts)
if err != nil {
return nil, err
}

writeObj, err := options.NewClient(config, client.Options{
HTTPClient: options.HTTPClient,
Scheme: options.Scheme,
Mapper: mapper,
Cache: &client.CacheOptions{
Reader: cache,
DisableFor: options.ClientDisableCacheFor,
},
})
// Create the client, and default its options.
clientOpts := options.Client
{
if clientOpts.Scheme == nil {
clientOpts.Scheme = options.Scheme
}
if clientOpts.Mapper == nil {
clientOpts.Mapper = mapper
vincepri marked this conversation as resolved.
Show resolved Hide resolved
}
if clientOpts.HTTPClient == nil {
clientOpts.HTTPClient = options.HTTPClient
}
if clientOpts.Cache == nil {
clientOpts.Cache = &client.CacheOptions{
Unstructured: false,
}
}
if clientOpts.Cache.Reader == nil {
clientOpts.Cache.Reader = cache
}

// For backward compatibility, the ClientDisableCacheFor option should
// be appended to the DisableFor option in the client.
clientOpts.Cache.DisableFor = append(clientOpts.Cache.DisableFor, options.ClientDisableCacheFor...)

if clientOpts.DryRun == nil && options.DryRunClient {
// For backward compatibility, the DryRunClient (if set) option should override
// the DryRun option in the client (if unset).
clientOpts.DryRun = pointer.Bool(true)
}
}
clientWriter, err := options.NewClient(config, clientOpts)
if err != nil {
return nil, err
}

if options.DryRunClient {
writeObj = client.NewDryRunClient(writeObj)
}

// Create the API Reader, a client with no cache.
apiReader, err := client.New(config, client.Options{HTTPClient: options.HTTPClient, Scheme: options.Scheme, Mapper: mapper})
clientReader, err := client.New(config, client.Options{
HTTPClient: options.HTTPClient,
Scheme: options.Scheme,
Mapper: mapper,
})
if err != nil {
return nil, err
}
Expand All @@ -219,8 +282,8 @@ func New(config *rest.Config, opts ...Option) (Cluster, error) {
scheme: options.Scheme,
cache: cache,
fieldIndexes: cache,
client: writeObj,
apiReader: apiReader,
client: clientWriter,
apiReader: clientReader,
recorderProvider: recorderProvider,
mapper: mapper,
logger: options.Logger,
Expand Down
10 changes: 5 additions & 5 deletions pkg/manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -348,15 +348,15 @@ func New(config *rest.Config, options Options) (Manager, error) {

cluster, err := cluster.New(config, func(clusterOptions *cluster.Options) {
clusterOptions.Scheme = options.Scheme
clusterOptions.MapperProvider = options.MapperProvider
clusterOptions.MapperProvider = options.MapperProvider //nolint:staticcheck
clusterOptions.Logger = options.Logger
clusterOptions.SyncPeriod = options.SyncPeriod
clusterOptions.Namespace = options.Namespace
clusterOptions.Namespace = options.Namespace //nolint:staticcheck
clusterOptions.NewCache = options.NewCache
clusterOptions.NewClient = options.NewClient
clusterOptions.ClientDisableCacheFor = options.ClientDisableCacheFor
clusterOptions.DryRunClient = options.DryRunClient
clusterOptions.EventBroadcaster = options.EventBroadcaster //nolint:staticcheck
clusterOptions.ClientDisableCacheFor = options.ClientDisableCacheFor //nolint:staticcheck
clusterOptions.DryRunClient = options.DryRunClient //nolint:staticcheck
clusterOptions.EventBroadcaster = options.EventBroadcaster //nolint:staticcheck
})
if err != nil {
return nil, err
Expand Down