diff --git a/examples/README.md b/examples/README.md index 2110ae214e..a378d0d22f 100644 --- a/examples/README.md +++ b/examples/README.md @@ -31,6 +31,15 @@ This example implements a *new* Kubernetes resource, ChaosPod, and creates a cus 5. Adds ChaosPod webhooks to manager 6. Starts the manager +### multicluster + +This example implements a simplistic controller that watches pods in one cluster (`referencecluster`) and creates +an identical pod for each pod observed in a different cluster (`mirrorcluster`). + +* `main.go`: Initialization code +* `main_test.go`: Tests that verify the reconciliation +* `reconciler/reconciler.go`: The actual reconciliation logic + ## Deploying and Running -To install and run the provided examples, see the Kubebuilder [Quick Start](https://book.kubebuilder.io/quick-start.html). \ No newline at end of file +To install and run the provided examples, see the Kubebuilder [Quick Start](https://book.kubebuilder.io/quick-start.html). diff --git a/examples/multicluster/main.go b/examples/multicluster/main.go new file mode 100644 index 0000000000..5338a6eb97 --- /dev/null +++ b/examples/multicluster/main.go @@ -0,0 +1,74 @@ +/* +Copyright 2020 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package main + +import ( + "fmt" + "os" + + "k8s.io/client-go/rest" + + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/examples/multicluster/reconciler" + "sigs.k8s.io/controller-runtime/pkg/client/config" + "sigs.k8s.io/controller-runtime/pkg/clusterconnector" +) + +func main() { + log := ctrl.Log.WithName("pod-mirror-controller") + + referenceClusterCfg, err := config.GetConfigWithContext("reference-cluster") + if err != nil { + log.Error(err, "failed to get reference cluster config") + os.Exit(1) + } + + mirrorClusterCfg, err := config.GetConfigWithContext("mirror-cluster") + if err != nil { + log.Error(err, "failed to get mirror cluster config") + os.Exit(1) + } + ctrl.SetupSignalHandler() + + if err := run(referenceClusterCfg, mirrorClusterCfg, ctrl.SetupSignalHandler()); err != nil { + log.Error(err, "failed to run controller") + os.Exit(1) + } + + log.Info("Finished gracefully") +} + +func run(referenceClusterConfig, mirrorClusterConfig *rest.Config, stop <-chan struct{}) error { + mgr, err := ctrl.NewManager(referenceClusterConfig, ctrl.Options{}) + if err != nil { + return fmt.Errorf("failed to construct manager: %w", err) + } + clusterConnector, err := clusterconnector.New(mirrorClusterConfig, mgr, "mirror_cluster") + if err != nil { + return fmt.Errorf("failed to construct clusterconnector: %w", err) + } + + if err := reconciler.Add(mgr, clusterConnector); err != nil { + return fmt.Errorf("failed to construct reconciler: %w", err) + } + + if err := mgr.Start(stop); err != nil { + return fmt.Errorf("failed to start manager: %w", err) + } + + return nil +} diff --git a/examples/multicluster/main_test.go b/examples/multicluster/main_test.go new file mode 100644 index 0000000000..63012567c5 --- /dev/null +++ b/examples/multicluster/main_test.go @@ -0,0 +1,129 @@ +package main + +import ( + "context" + "fmt" + "testing" + + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/rest" + + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/envtest" + "sigs.k8s.io/controller-runtime/pkg/envtest/printer" +) + +func TestRun(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecsWithDefaultAndCustomReporters(t, "MirrorPod reconciler Integration Suite", []Reporter{printer.NewlineReporter{}}) +} + +var _ = Describe("clusterconnector.ClusterConnector", func() { + var stop chan struct{} + var referenceClusterCfg *rest.Config + var mirrorClusterCfg *rest.Config + var referenceClusterTestEnv *envtest.Environment + var mirrorClusterTestEnv *envtest.Environment + var referenceClusterClient client.Client + var mirrorClusterClient client.Client + + Describe("multi-cluster-controller", func() { + BeforeEach(func() { + stop = make(chan struct{}) + referenceClusterTestEnv = &envtest.Environment{} + mirrorClusterTestEnv = &envtest.Environment{} + + var err error + referenceClusterCfg, err = referenceClusterTestEnv.Start() + Expect(err).NotTo(HaveOccurred()) + + mirrorClusterCfg, err = mirrorClusterTestEnv.Start() + Expect(err).NotTo(HaveOccurred()) + + referenceClusterClient, err = client.New(referenceClusterCfg, client.Options{}) + Expect(err).NotTo(HaveOccurred()) + + mirrorClusterClient, err = client.New(mirrorClusterCfg, client.Options{}) + Expect(err).NotTo(HaveOccurred()) + + go func() { + run(referenceClusterCfg, mirrorClusterCfg, stop) + }() + }) + + AfterEach(func() { + close(stop) + Expect(referenceClusterTestEnv.Stop()).To(Succeed()) + Expect(mirrorClusterTestEnv.Stop()).To(Succeed()) + }) + + It("Should reconcile pods", func() { + ctx := context.Background() + referencePod := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: "satan", + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{{ + Name: "fancy-one", + Image: "nginx", + }}, + }, + } + Expect(referenceClusterClient.Create(ctx, referencePod)).NotTo(HaveOccurred()) + name := types.NamespacedName{Namespace: referencePod.Namespace, Name: referencePod.Name} + + By("Setting a finalizer", func() { + Eventually(func() error { + updatedPod := &corev1.Pod{} + if err := referenceClusterClient.Get(ctx, name, updatedPod); err != nil { + return err + } + if n := len(updatedPod.Finalizers); n != 1 { + return fmt.Errorf("expected exactly one finalizer, got %d", n) + } + return nil + }).Should(Succeed()) + + }) + + By("Creating a pod in the mirror cluster", func() { + Eventually(func() error { + return mirrorClusterClient.Get(ctx, name, &corev1.Pod{}) + }).Should(Succeed()) + + }) + + By("Recreating a manually deleted pod in the mirror cluster", func() { + Expect(mirrorClusterClient.Delete(ctx, + &corev1.Pod{ObjectMeta: metav1.ObjectMeta{Namespace: name.Namespace, Name: name.Name}}), + ).NotTo(HaveOccurred()) + + Eventually(func() error { + return mirrorClusterClient.Get(ctx, name, &corev1.Pod{}) + }).Should(Succeed()) + + }) + + By("Cleaning up after the reference pod got deleted", func() { + Expect(referenceClusterClient.Delete(ctx, referencePod)).NotTo(HaveOccurred()) + + Eventually(func() bool { + return apierrors.IsNotFound(mirrorClusterClient.Get(ctx, name, &corev1.Pod{})) + }).Should(BeTrue()) + + Eventually(func() bool { + return apierrors.IsNotFound(referenceClusterClient.Get(ctx, name, &corev1.Pod{})) + }).Should(BeTrue()) + }) + }) + + }) + +}) diff --git a/examples/multicluster/reconciler/reconciler.go b/examples/multicluster/reconciler/reconciler.go new file mode 100644 index 0000000000..a8304c4552 --- /dev/null +++ b/examples/multicluster/reconciler/reconciler.go @@ -0,0 +1,125 @@ +/* +Copyright 2020 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package reconciler + +import ( + "context" + "fmt" + "time" + + corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/sets" + + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/clusterconnector" + "sigs.k8s.io/controller-runtime/pkg/handler" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + "sigs.k8s.io/controller-runtime/pkg/source" +) + +func Add(mgr ctrl.Manager, mirrorCluster clusterconnector.ClusterConnector) error { + return ctrl.NewControllerManagedBy(mgr). + // Watch Pods in the reference cluster + For(&corev1.Pod{}). + // Watch pods in the mirror cluster + Watches( + source.NewKindWithCache(&corev1.Pod{}, mirrorCluster.GetCache()), + &handler.EnqueueRequestForObject{}, + ). + Complete(&reconciler{ + referenceClusterClient: mgr.GetClient(), + mirrorClusterClient: mirrorCluster.GetClient(), + }) +} + +type reconciler struct { + referenceClusterClient client.Client + mirrorClusterClient client.Client +} + +func (r *reconciler) Reconcile(req reconcile.Request) (reconcile.Result, error) { + return reconcile.Result{}, r.reconcile(req) +} + +const podFinalizerName = "pod-finalzer.mirror.org/v1" + +func (r *reconciler) reconcile(req reconcile.Request) error { + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + referencePod := &corev1.Pod{} + if err := r.referenceClusterClient.Get(ctx, req.NamespacedName, referencePod); err != nil { + if apierrors.IsNotFound(err) { + return nil + } + return fmt.Errorf("failed to get pod from reference clsuster: %w", err) + } + + if referencePod.DeletionTimestamp != nil && sets.NewString(referencePod.Finalizers...).Has(podFinalizerName) { + mirrorClusterPod := &corev1.Pod{ObjectMeta: metav1.ObjectMeta{ + Namespace: referencePod.Namespace, + Name: referencePod.Name, + }} + if err := r.mirrorClusterClient.Delete(ctx, mirrorClusterPod); err != nil && !apierrors.IsNotFound(err) { + return fmt.Errorf("failed to delete pod in mirror cluster: %w", err) + } + + referencePod.Finalizers = sets.NewString(referencePod.Finalizers...).Delete(podFinalizerName).UnsortedList() + if err := r.referenceClusterClient.Update(ctx, referencePod); err != nil { + return fmt.Errorf("failed to update pod in refernce cluster after removing finalizer: %w", err) + } + + return nil + } + + if !sets.NewString(referencePod.Finalizers...).Has(podFinalizerName) { + referencePod.Finalizers = append(referencePod.Finalizers, podFinalizerName) + if err := r.referenceClusterClient.Update(ctx, referencePod); err != nil { + return fmt.Errorf("failed to update pod after adding finalizer: %w", err) + } + } + + // Check if pod already exists + podName := types.NamespacedName{Namespace: referencePod.Namespace, Name: referencePod.Name} + podExists := true + if err := r.mirrorClusterClient.Get(ctx, podName, &corev1.Pod{}); err != nil { + if !apierrors.IsNotFound(err) { + return fmt.Errorf("failed to check in mirror cluster if pod exists: %w", err) + } + podExists = false + } + if podExists { + return nil + } + + mirrorPod := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: referencePod.Namespace, + Name: referencePod.Name, + }, + Spec: *referencePod.Spec.DeepCopy(), + } + if err := r.mirrorClusterClient.Create(ctx, mirrorPod); err != nil { + return fmt.Errorf("failed to create mirror pod: %w", err) + } + + return nil +} diff --git a/pkg/clusterconnector/clusterconnector.go b/pkg/clusterconnector/clusterconnector.go new file mode 100644 index 0000000000..45709a7c8b --- /dev/null +++ b/pkg/clusterconnector/clusterconnector.go @@ -0,0 +1,320 @@ +/* +Copyright 2020 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package clusterconnector + +import ( + "fmt" + "time" + + "github.com/go-logr/logr" + "k8s.io/apimachinery/pkg/api/meta" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/kubernetes/scheme" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/record" + + "sigs.k8s.io/controller-runtime/pkg/cache" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/client/apiutil" + logf "sigs.k8s.io/controller-runtime/pkg/internal/log" + internalrecorder "sigs.k8s.io/controller-runtime/pkg/internal/recorder" + "sigs.k8s.io/controller-runtime/pkg/manager/runner" + "sigs.k8s.io/controller-runtime/pkg/recorder" +) + +// ClusterConnector contains everything thats needed to build a controller +// that watches and interacts with objects in the given cluster. +type ClusterConnector interface { + // SetFields will set any dependencies on an object for which the object has implemented the inject + // interface - e.g. inject.Client. + SetFields(interface{}) error + + // GetConfig returns an initialized Config + GetConfig() *rest.Config + + // GetScheme returns an initialized Scheme + GetScheme() *runtime.Scheme + + // GetClient returns a client configured with the Config. This client may + // not be a fully "direct" client -- it may read from a cache, for + // instance. See Options.NewClient for more information on how the default + // implementation works. + GetClient() client.Client + + // GetFieldIndexer returns a client.FieldIndexer configured with the client + GetFieldIndexer() client.FieldIndexer + + // GetCache returns a cache.Cache + GetCache() cache.Cache + + // GetEventRecorderFor returns a new EventRecorder for the provided name + GetEventRecorderFor(name string) record.EventRecorder + + // GetRESTMapper returns a RESTMapper + GetRESTMapper() meta.RESTMapper + + // GetAPIReader returns a reader that will be configured to use the API server. + // This should be used sparingly and only when the client does not fit your + // use case. + GetAPIReader() client.Reader + + // AddToManager adds the ClusterConnector to a manager + AddToManager(mgr Manager) error +} + +// NewClientFunc allows a user to define how to create a client +type NewClientFunc func(cache cache.Cache, config *rest.Config, options client.Options) (client.Client, error) + +// DefaultNewClient creates the default caching client +func DefaultNewClient(cache cache.Cache, config *rest.Config, options client.Options) (client.Client, error) { + // Create the Client for Write operations. + c, err := client.New(config, options) + if err != nil { + return nil, err + } + + return &client.DelegatingClient{ + Reader: &client.DelegatingReader{ + CacheReader: cache, + ClientReader: c, + }, + Writer: c, + StatusClient: c, + }, nil +} + +// Options holds all possible Options for a ClusterConnector +type Options struct { + // Scheme is the scheme used to resolve runtime.Objects to GroupVersionKinds / Resources + // Defaults to the kubernetes/client-go scheme.Scheme, but it's almost always better + // idea to pass your own scheme in. See the documentation in pkg/scheme for more information. + Scheme *runtime.Scheme + + // MapperProvider provides the rest mapper used to map go types to Kubernetes APIs + MapperProvider func(c *rest.Config) (meta.RESTMapper, error) + + // SyncPeriod determines the minimum frequency at which watched resources are + // reconciled. A lower period will correct entropy more quickly, but reduce + // responsiveness to change if there are many watched resources. Change this + // value only if you know what you are doing. Defaults to 10 hours if unset. + // there will a 10 percent jitter between the SyncPeriod of all controllers + // so that all controllers will not send list requests simultaneously. + SyncPeriod *time.Duration + + // NewCache is the function that will create the cache to be used + // by the manager. If not set this will use the default new cache function. + NewCache cache.NewCacheFunc + + // NewClient will create the client to be used by the manager. + // If not set this will create the default DelegatingClient that will + // use the cache for reads and the client for writes. + NewClient NewClientFunc + + // DryRunClient specifies whether the client should be configured to enforce + // dryRun mode. + DryRunClient *bool + + // Namespace if specified restricts the manager's cache to watch objects in + // the desired namespace Defaults to all namespaces + // + // Note: If a namespace is specified, controllers can still Watch for a + // cluster-scoped resource (e.g Node). For namespaced resources the cache + // will only hold objects from the desired namespace. + Namespace *string + + // EventBroadcaster records Events emitted by the manager and sends them to the Kubernetes API + // Use this to customize the event correlator and spam filter + EventBroadcaster record.EventBroadcaster + + // Dependency injection for testing + newRecorderProvider func(config *rest.Config, scheme *runtime.Scheme, logger logr.Logger, broadcaster record.EventBroadcaster) (recorder.Provider, error) +} + +// Apply implements Option +func (o Options) Apply(target *Options) { + if o.Scheme != nil { + target.Scheme = o.Scheme + } + if o.MapperProvider != nil { + target.MapperProvider = o.MapperProvider + } + if o.SyncPeriod != nil { + target.SyncPeriod = o.SyncPeriod + } + if o.NewCache != nil { + target.NewCache = o.NewCache + } + if o.NewClient != nil { + target.NewClient = o.NewClient + } + + if o.DryRunClient != nil { + target.DryRunClient = o.DryRunClient + } + + if o.Namespace != nil { + target.Namespace = o.Namespace + } + + if o.EventBroadcaster != nil { + target.EventBroadcaster = o.EventBroadcaster + } + + if o.newRecorderProvider != nil { + target.newRecorderProvider = o.newRecorderProvider + } +} + +var _ Option = Options{} + +// Option can be used to customize a ClusterConnector +type Option interface { + Apply(o *Options) +} + +// setOptionsDefaults set default values for Options fields +func setOptionsDefaults(options Options) Options { + // Use the Kubernetes client-go scheme if none is specified + if options.Scheme == nil { + options.Scheme = scheme.Scheme + } + + if options.MapperProvider == nil { + options.MapperProvider = func(c *rest.Config) (meta.RESTMapper, error) { + return apiutil.NewDynamicRESTMapper(c) + } + } + + // Allow newClient to be mocked + if options.NewClient == nil { + options.NewClient = DefaultNewClient + } + + // Allow newCache to be mocked + if options.NewCache == nil { + options.NewCache = cache.New + } + + if options.DryRunClient == nil { + var b bool + options.DryRunClient = &b + } + + if options.Namespace == nil { + var s string + options.Namespace = &s + } + + // Allow newRecorderProvider to be mocked + if options.newRecorderProvider == nil { + options.newRecorderProvider = internalrecorder.NewProvider + } + + if options.EventBroadcaster == nil { + options.EventBroadcaster = record.NewBroadcaster() + } + + return options +} + +// Manager manages the lifecycle of Runnables +type Manager interface { + Add(r runner.Runnable) error +} + +// New creates a new ClusterConnector +func New(config *rest.Config, mgr Manager, name string, opts ...Option) (ClusterConnector, error) { + cc, err := NewUnmanaged(config, name, opts...) + if err != nil { + return nil, err + } + + if err := cc.AddToManager(mgr); err != nil { + return nil, err + } + + return cc, nil +} + +// NewUnmanaged creates a new unmanaged ClusterConnector. It must be manually added to a +// Manager by calling its AddToManager. +func NewUnmanaged(config *rest.Config, name string, opts ...Option) (ClusterConnector, error) { + log := logf.RuntimeLog.WithName("clusterconnector").WithValues("name", name) + if config == nil { + return nil, fmt.Errorf("must specify Config") + } + + options := Options{} + for _, opt := range opts { + opt.Apply(&options) + } + + // Set default values for options fields + options = setOptionsDefaults(options) + + // Create the mapper provider + mapper, err := options.MapperProvider(config) + if err != nil { + log.Error(err, "Failed to get API Group-Resources") + return nil, err + } + + // Create the cache for the cached read client and registering informers + cache, err := options.NewCache(config, cache.Options{ + Scheme: options.Scheme, + Mapper: mapper, + Resync: options.SyncPeriod, + Namespace: *options.Namespace}, + ) + if err != nil { + return nil, err + } + + apiReader, err := client.New(config, client.Options{Scheme: options.Scheme, Mapper: mapper}) + if err != nil { + return nil, err + } + + writeObj, err := options.NewClient(cache, config, client.Options{Scheme: options.Scheme, Mapper: mapper}) + if err != nil { + return nil, err + } + + if *options.DryRunClient { + writeObj = client.NewDryRunClient(writeObj) + } + + // Create the recorder provider to inject event recorders for the components. + // TODO(directxman12): the log for the event provider should have a context (name, tags, etc) specific + // to the particular controller that it's being injected into, rather than a generic one like is here. + recorderProvider, err := options.newRecorderProvider(config, options.Scheme, log.WithName("events"), options.EventBroadcaster) + if err != nil { + return nil, err + } + + return &clusterConnector{ + config: config, + scheme: options.Scheme, + cache: cache, + fieldIndexes: cache, + client: writeObj, + apiReader: apiReader, + recorderProvider: recorderProvider, + mapper: mapper, + }, nil + +} diff --git a/pkg/clusterconnector/clusterconnector_suite_test.go b/pkg/clusterconnector/clusterconnector_suite_test.go new file mode 100644 index 0000000000..1e3e1c3398 --- /dev/null +++ b/pkg/clusterconnector/clusterconnector_suite_test.go @@ -0,0 +1,59 @@ +/* +Copyright 2020 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package clusterconnector + +import ( + "testing" + + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" + + "sigs.k8s.io/controller-runtime/pkg/envtest" + "sigs.k8s.io/controller-runtime/pkg/envtest/printer" + logf "sigs.k8s.io/controller-runtime/pkg/log" + "sigs.k8s.io/controller-runtime/pkg/log/zap" +) + +func TestClusterConnector(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecsWithDefaultAndCustomReporters(t, "ClusterConnector Integration Suite", []Reporter{printer.NewlineReporter{}}) +} + +var testenv *envtest.Environment +var cfg *rest.Config +var clientset *kubernetes.Clientset + +var _ = BeforeSuite(func(done Done) { + logf.SetLogger(zap.LoggerTo(GinkgoWriter, true)) + + testenv = &envtest.Environment{} + + var err error + cfg, err = testenv.Start() + Expect(err).NotTo(HaveOccurred()) + + clientset, err = kubernetes.NewForConfig(cfg) + Expect(err).NotTo(HaveOccurred()) + + close(done) +}, 60) + +var _ = AfterSuite(func() { + Expect(testenv.Stop()).To(Succeed()) +}) diff --git a/pkg/clusterconnector/clusterconnector_test.go b/pkg/clusterconnector/clusterconnector_test.go new file mode 100644 index 0000000000..f9298a3126 --- /dev/null +++ b/pkg/clusterconnector/clusterconnector_test.go @@ -0,0 +1,192 @@ +/* +Copyright 2020 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package clusterconnector + +import ( + "fmt" + + "github.com/go-logr/logr" + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/record" + + "sigs.k8s.io/controller-runtime/pkg/cache" + "sigs.k8s.io/controller-runtime/pkg/cache/informertest" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/recorder" + "sigs.k8s.io/controller-runtime/pkg/runtime/inject" +) + +var _ inject.Injector = &injectable{} +var _ inject.Cache = &injectable{} +var _ inject.Client = &injectable{} +var _ inject.Scheme = &injectable{} +var _ inject.Config = &injectable{} + +type injectable struct { + scheme func(scheme *runtime.Scheme) error + client func(client.Client) error + config func(config *rest.Config) error + cache func(cache.Cache) error + f func(inject.Func) error +} + +func (i *injectable) InjectCache(c cache.Cache) error { + if i.cache == nil { + return nil + } + return i.cache(c) +} + +func (i *injectable) InjectConfig(config *rest.Config) error { + if i.config == nil { + return nil + } + return i.config(config) +} + +func (i *injectable) InjectClient(c client.Client) error { + if i.client == nil { + return nil + } + return i.client(c) +} + +func (i *injectable) InjectScheme(scheme *runtime.Scheme) error { + if i.scheme == nil { + return nil + } + return i.scheme(scheme) +} + +func (i *injectable) InjectFunc(f inject.Func) error { + if i.f == nil { + return nil + } + return i.f(f) +} + +var _ = Describe("clusterconnector.ClusterConnector", func() { + var stop chan struct{} + + BeforeEach(func() { + stop = make(chan struct{}) + }) + + AfterEach(func() { + close(stop) + }) + Describe("New", func() { + + It("should return an error it can't create a recorder.Provider", func(done Done) { + m, err := NewUnmanaged(cfg, "", Options{ + newRecorderProvider: func(config *rest.Config, scheme *runtime.Scheme, logger logr.Logger, broadcaster record.EventBroadcaster) (recorder.Provider, error) { + return nil, fmt.Errorf("expected error") + }, + }) + Expect(m).To(BeNil()) + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("expected error")) + + close(done) + }) + + }) + + Describe("SetFields", func() { + It("should inject field values", func(done Done) { + c, err := NewUnmanaged(cfg, "", Options{}) + Expect(err).NotTo(HaveOccurred()) + cc, ok := c.(*clusterConnector) + Expect(ok).To(BeTrue()) + + cc.cache = &informertest.FakeInformers{} + + By("Injecting the dependencies") + err = c.SetFields(&injectable{ + scheme: func(scheme *runtime.Scheme) error { + defer GinkgoRecover() + Expect(scheme).To(Equal(c.GetScheme())) + return nil + }, + config: func(config *rest.Config) error { + defer GinkgoRecover() + Expect(config).To(Equal(c.GetConfig())) + return nil + }, + client: func(client client.Client) error { + defer GinkgoRecover() + Expect(client).To(Equal(c.GetClient())) + return nil + }, + cache: func(concreteCache cache.Cache) error { + defer GinkgoRecover() + Expect(concreteCache).To(Equal(c.GetCache())) + return nil + }, + f: func(f inject.Func) error { + defer GinkgoRecover() + Expect(f).NotTo(BeNil()) + return nil + }, + }) + Expect(err).NotTo(HaveOccurred()) + + By("Returning an error if dependency injection fails") + + expected := fmt.Errorf("expected error") + err = c.SetFields(&injectable{ + client: func(client client.Client) error { + return expected + }, + }) + Expect(err).To(Equal(expected)) + + err = c.SetFields(&injectable{ + scheme: func(scheme *runtime.Scheme) error { + return expected + }, + }) + Expect(err).To(Equal(expected)) + + err = c.SetFields(&injectable{ + config: func(config *rest.Config) error { + return expected + }, + }) + Expect(err).To(Equal(expected)) + + err = c.SetFields(&injectable{ + cache: func(c cache.Cache) error { + return expected + }, + }) + Expect(err).To(Equal(expected)) + + err = c.SetFields(&injectable{ + f: func(c inject.Func) error { + return expected + }, + }) + Expect(err).To(Equal(expected)) + close(done) + }) + }) + +}) diff --git a/pkg/clusterconnector/internal.go b/pkg/clusterconnector/internal.go new file mode 100644 index 0000000000..853b18e1b5 --- /dev/null +++ b/pkg/clusterconnector/internal.go @@ -0,0 +1,121 @@ +/* +Copyright 2020 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package clusterconnector + +import ( + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/record" + + "k8s.io/apimachinery/pkg/api/meta" + "sigs.k8s.io/controller-runtime/pkg/cache" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/recorder" + "sigs.k8s.io/controller-runtime/pkg/runtime/inject" +) + +var _ ClusterConnector = &clusterConnector{} + +type clusterConnector struct { + // config is the rest.config used to talk to the apiserver. Required. + config *rest.Config + + // scheme is the scheme injected into Controllers, EventHandlers, Sources and Predicates. Defaults + // to scheme.scheme. + scheme *runtime.Scheme + + cache cache.Cache + + // TODO(directxman12): Provide an escape hatch to get individual indexers + // client is the client injected into Controllers (and EventHandlers, Sources and Predicates). + client client.Client + + // apiReader is the reader that will make requests to the api server and not the cache. + apiReader client.Reader + + // fieldIndexes knows how to add field indexes over the Cache used by this controller, + // which can later be consumed via field selectors from the injected client. + fieldIndexes client.FieldIndexer + + // recorderProvider is used to generate event recorders that will be injected into Controllers + // (and EventHandlers, Sources and Predicates). + recorderProvider recorder.Provider + + // mapper is used to map resources to kind, and map kind and version. + mapper meta.RESTMapper +} + +func (cc *clusterConnector) SetFields(i interface{}) error { + if _, err := inject.ConfigInto(cc.config, i); err != nil { + return err + } + if _, err := inject.ClientInto(cc.client, i); err != nil { + return err + } + if _, err := inject.APIReaderInto(cc.apiReader, i); err != nil { + return err + } + if _, err := inject.SchemeInto(cc.scheme, i); err != nil { + return err + } + if _, err := inject.CacheInto(cc.cache, i); err != nil { + return err + } + if _, err := inject.InjectorInto(cc.SetFields, i); err != nil { + return err + } + if _, err := inject.MapperInto(cc.mapper, i); err != nil { + return err + } + return nil +} + +func (cc *clusterConnector) GetConfig() *rest.Config { + return cc.config +} + +func (cc *clusterConnector) GetClient() client.Client { + return cc.client +} + +func (cc *clusterConnector) GetScheme() *runtime.Scheme { + return cc.scheme +} + +func (cc *clusterConnector) GetFieldIndexer() client.FieldIndexer { + return cc.fieldIndexes +} + +func (cc *clusterConnector) GetCache() cache.Cache { + return cc.cache +} + +func (cc *clusterConnector) GetEventRecorderFor(name string) record.EventRecorder { + return cc.recorderProvider.GetEventRecorderFor(name) +} + +func (cc *clusterConnector) GetRESTMapper() meta.RESTMapper { + return cc.mapper +} + +func (cc *clusterConnector) GetAPIReader() client.Reader { + return cc.apiReader +} + +func (cc *clusterConnector) AddToManager(mgr Manager) error { + return mgr.Add(cc.GetCache()) +} diff --git a/pkg/manager/internal.go b/pkg/manager/internal.go index 3e1467df50..0b43622535 100644 --- a/pkg/manager/internal.go +++ b/pkg/manager/internal.go @@ -25,19 +25,15 @@ import ( "time" "github.com/prometheus/client_golang/prometheus/promhttp" - "k8s.io/apimachinery/pkg/api/meta" - "k8s.io/apimachinery/pkg/runtime" - "k8s.io/client-go/rest" "k8s.io/client-go/tools/leaderelection" "k8s.io/client-go/tools/leaderelection/resourcelock" - "k8s.io/client-go/tools/record" "sigs.k8s.io/controller-runtime/pkg/cache" - "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/clusterconnector" "sigs.k8s.io/controller-runtime/pkg/healthz" logf "sigs.k8s.io/controller-runtime/pkg/internal/log" + "sigs.k8s.io/controller-runtime/pkg/manager/runner" "sigs.k8s.io/controller-runtime/pkg/metrics" - "sigs.k8s.io/controller-runtime/pkg/recorder" "sigs.k8s.io/controller-runtime/pkg/runtime/inject" "sigs.k8s.io/controller-runtime/pkg/webhook" ) @@ -55,44 +51,25 @@ const ( var log = logf.RuntimeLog.WithName("manager") -type controllerManager struct { - // config is the rest.config used to talk to the apiserver. Required. - config *rest.Config +type runnableCache interface { + runner.Runnable + WaitForCacheSync(stop <-chan struct{}) bool +} - // scheme is the scheme injected into Controllers, EventHandlers, Sources and Predicates. Defaults - // to scheme.scheme. - scheme *runtime.Scheme +type controllerManager struct { + // caches holds all caches this controllerManager handles. They must be started before everything else. + caches []runnableCache // leaderElectionRunnables is the set of Controllers that the controllerManager injects deps into and Starts. // These Runnables are managed by lead election. - leaderElectionRunnables []Runnable + leaderElectionRunnables []runner.Runnable // nonLeaderElectionRunnables is the set of webhook servers that the controllerManager injects deps into and Starts. // These Runnables will not be blocked by lead election. - nonLeaderElectionRunnables []Runnable - - cache cache.Cache - - // TODO(directxman12): Provide an escape hatch to get individual indexers - // client is the client injected into Controllers (and EventHandlers, Sources and Predicates). - client client.Client - - // apiReader is the reader that will make requests to the api server and not the cache. - apiReader client.Reader - - // fieldIndexes knows how to add field indexes over the Cache used by this controller, - // which can later be consumed via field selectors from the injected client. - fieldIndexes client.FieldIndexer - - // recorderProvider is used to generate event recorders that will be injected into Controllers - // (and EventHandlers, Sources and Predicates). - recorderProvider recorder.Provider + nonLeaderElectionRunnables []runner.Runnable // resourceLock forms the basis for leader election resourceLock resourcelock.Interface - // mapper is used to map resources to kind, and map kind and version. - mapper meta.RESTMapper - // metricsListener is used to serve prometheus metrics metricsListener net.Listener @@ -139,8 +116,6 @@ type controllerManager struct { // election was configured. elected chan struct{} - startCache func(stop <-chan struct{}) error - // port is the port that the webhook server serves at. port int // host is the hostname that the webhook server binds to. @@ -161,6 +136,8 @@ type controllerManager struct { // retryPeriod is the duration the LeaderElector clients should wait // between tries of actions. retryPeriod time.Duration + + clusterconnector.ClusterConnector } type errSignaler struct { @@ -209,7 +186,7 @@ func (r *errSignaler) GotError() chan struct{} { } // Add sets dependencies on i, and adds it to the list of Runnables to start. -func (cm *controllerManager) Add(r Runnable) error { +func (cm *controllerManager) Add(r runner.Runnable) error { cm.mu.Lock() defer cm.mu.Unlock() @@ -221,7 +198,10 @@ func (cm *controllerManager) Add(r Runnable) error { var shouldStart bool // Add the runnable to the leader election or the non-leaderelection list - if leRunnable, ok := r.(LeaderElectionRunnable); ok && !leRunnable.NeedLeaderElection() { + if cache, ok := r.(cache.Cache); ok { + shouldStart = cm.started + cm.caches = append(cm.caches, cache) + } else if leRunnable, ok := r.(LeaderElectionRunnable); ok && !leRunnable.NeedLeaderElection() { shouldStart = cm.started cm.nonLeaderElectionRunnables = append(cm.nonLeaderElectionRunnables, r) } else { @@ -242,31 +222,15 @@ func (cm *controllerManager) Add(r Runnable) error { } func (cm *controllerManager) SetFields(i interface{}) error { - if _, err := inject.ConfigInto(cm.config, i); err != nil { - return err - } - if _, err := inject.ClientInto(cm.client, i); err != nil { - return err - } - if _, err := inject.APIReaderInto(cm.apiReader, i); err != nil { - return err - } - if _, err := inject.SchemeInto(cm.scheme, i); err != nil { - return err - } - if _, err := inject.CacheInto(cm.cache, i); err != nil { - return err - } if _, err := inject.InjectorInto(cm.SetFields, i); err != nil { return err } + if _, err := inject.StopChannelInto(cm.internalStop, i); err != nil { return err } - if _, err := inject.MapperInto(cm.mapper, i); err != nil { - return err - } - return nil + + return cm.ClusterConnector.SetFields(i) } // AddMetricsExtraHandler adds extra handler served on path to the http server that serves metrics. @@ -322,38 +286,6 @@ func (cm *controllerManager) AddReadyzCheck(name string, check healthz.Checker) return nil } -func (cm *controllerManager) GetConfig() *rest.Config { - return cm.config -} - -func (cm *controllerManager) GetClient() client.Client { - return cm.client -} - -func (cm *controllerManager) GetScheme() *runtime.Scheme { - return cm.scheme -} - -func (cm *controllerManager) GetFieldIndexer() client.FieldIndexer { - return cm.fieldIndexes -} - -func (cm *controllerManager) GetCache() cache.Cache { - return cm.cache -} - -func (cm *controllerManager) GetEventRecorderFor(name string) record.EventRecorder { - return cm.recorderProvider.GetEventRecorderFor(name) -} - -func (cm *controllerManager) GetRESTMapper() meta.RESTMapper { - return cm.mapper -} - -func (cm *controllerManager) GetAPIReader() client.Reader { - return cm.apiReader -} - func (cm *controllerManager) GetWebhookServer() *webhook.Server { if cm.webhookServer == nil { cm.webhookServer = &webhook.Server{ @@ -528,19 +460,19 @@ func (cm *controllerManager) waitForCache() { return } - // Start the Cache. Allow the function to start the cache to be mocked out for testing - if cm.startCache == nil { - cm.startCache = cm.cache.Start + for idx := range cm.caches { + go func(idx int) { + if err := cm.caches[idx].Start(cm.internalStop); err != nil { + cm.errSignal.SignalError(err) + } + }(idx) } - go func() { - if err := cm.startCache(cm.internalStop); err != nil { - cm.errSignal.SignalError(err) - } - }() // Wait for the caches to sync. // TODO(community): Check the return value and write a test - cm.cache.WaitForCacheSync(cm.internalStop) + for _, cache := range cm.caches { + cache.WaitForCacheSync(cm.internalStop) + } cm.started = true } diff --git a/pkg/manager/manager.go b/pkg/manager/manager.go index 1526cef476..6d0cdd41b9 100644 --- a/pkg/manager/manager.go +++ b/pkg/manager/manager.go @@ -22,20 +22,18 @@ import ( "net/http" "time" - "github.com/go-logr/logr" - "k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/runtime" - "k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/rest" "k8s.io/client-go/tools/leaderelection/resourcelock" "k8s.io/client-go/tools/record" + "sigs.k8s.io/controller-runtime/pkg/cache" "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/client/apiutil" + "sigs.k8s.io/controller-runtime/pkg/clusterconnector" "sigs.k8s.io/controller-runtime/pkg/healthz" - internalrecorder "sigs.k8s.io/controller-runtime/pkg/internal/recorder" "sigs.k8s.io/controller-runtime/pkg/leaderelection" + "sigs.k8s.io/controller-runtime/pkg/manager/runner" "sigs.k8s.io/controller-runtime/pkg/metrics" "sigs.k8s.io/controller-runtime/pkg/recorder" "sigs.k8s.io/controller-runtime/pkg/webhook" @@ -49,17 +47,13 @@ type Manager interface { // implements the inject interface - e.g. inject.Client. // Depending on if a Runnable implements LeaderElectionRunnable interface, a Runnable can be run in either // non-leaderelection mode (always running) or leader election mode (managed by leader election if enabled). - Add(Runnable) error + Add(runner.Runnable) error // Elected is closed when this manager is elected leader of a group of // managers, either because it won a leader election or because no leader // election was configured. Elected() <-chan struct{} - // SetFields will set any dependencies on an object for which the object has implemented the inject - // interface - e.g. inject.Client. - SetFields(interface{}) error - // AddMetricsExtraHandler adds an extra handler served on path to the http server that serves metrics. // Might be useful to register some diagnostic endpoints e.g. pprof. Note that these endpoints meant to be // sensitive and shouldn't be exposed publicly. @@ -77,37 +71,10 @@ type Manager interface { // Returns an error if there is an error starting any controller. Start(<-chan struct{}) error - // GetConfig returns an initialized Config - GetConfig() *rest.Config - - // GetScheme returns an initialized Scheme - GetScheme() *runtime.Scheme - - // GetClient returns a client configured with the Config. This client may - // not be a fully "direct" client -- it may read from a cache, for - // instance. See Options.NewClient for more information on how the default - // implementation works. - GetClient() client.Client - - // GetFieldIndexer returns a client.FieldIndexer configured with the client - GetFieldIndexer() client.FieldIndexer - - // GetCache returns a cache.Cache - GetCache() cache.Cache - - // GetEventRecorderFor returns a new EventRecorder for the provided name - GetEventRecorderFor(name string) record.EventRecorder - - // GetRESTMapper returns a RESTMapper - GetRESTMapper() meta.RESTMapper - - // GetAPIReader returns a reader that will be configured to use the API server. - // This should be used sparingly and only when the client does not fit your - // use case. - GetAPIReader() client.Reader - // GetWebhookServer returns a webhook.Server GetWebhookServer() *webhook.Server + + clusterconnector.ClusterConnector } // Options are the arguments for creating a new Manager @@ -195,7 +162,7 @@ type Options struct { // NewClient will create the client to be used by the manager. // If not set this will create the default DelegatingClient that will // use the cache for reads and the client for writes. - NewClient NewClientFunc + NewClient clusterconnector.NewClientFunc // DryRunClient specifies whether the client should be configured to enforce // dryRun mode. @@ -206,25 +173,11 @@ type Options struct { EventBroadcaster record.EventBroadcaster // Dependency injection for testing - newRecorderProvider func(config *rest.Config, scheme *runtime.Scheme, logger logr.Logger, broadcaster record.EventBroadcaster) (recorder.Provider, error) newResourceLock func(config *rest.Config, recorderProvider recorder.Provider, options leaderelection.Options) (resourcelock.Interface, error) newMetricsListener func(addr string) (net.Listener, error) newHealthProbeListener func(addr string) (net.Listener, error) } -// NewClientFunc allows a user to define how to create a client -type NewClientFunc func(cache cache.Cache, config *rest.Config, options client.Options) (client.Client, error) - -// Runnable allows a component to be started. -// It's very important that Start blocks until -// it's done running. -type Runnable interface { - // Start starts running the component. The component will stop running - // when the channel is closed. Start blocks until the channel is closed or - // an error occurs. - Start(<-chan struct{}) error -} - // RunnableFunc implements Runnable using a function. // It's very important that the given function block // until it's done running. @@ -244,51 +197,31 @@ type LeaderElectionRunnable interface { // New returns a new Manager for creating Controllers. func New(config *rest.Config, options Options) (Manager, error) { - // Initialize a rest.config if none was specified - if config == nil { - return nil, fmt.Errorf("must specify Config") - } - - // Set default values for options fields - options = setOptionsDefaults(options) - - // Create the mapper provider - mapper, err := options.MapperProvider(config) - if err != nil { - log.Error(err, "Failed to get API Group-Resources") - return nil, err - } - - // Create the cache for the cached read client and registering informers - cache, err := options.NewCache(config, cache.Options{Scheme: options.Scheme, Mapper: mapper, Resync: options.SyncPeriod, Namespace: options.Namespace}) - if err != nil { - return nil, err - } - - apiReader, err := client.New(config, client.Options{Scheme: options.Scheme, Mapper: mapper}) - if err != nil { - return nil, err - } - - writeObj, err := options.NewClient(cache, config, client.Options{Scheme: options.Scheme, Mapper: mapper}) + // Having to duplicate everything here is bad, but any other approach would break + // the api. We should eventually switch the manager to the functional opts pattern, + // then we can just embedd clusterconnector.Options into its Options. + // + // We have to use NewUnmanaged here because the managers Add depends on the clusterconnector + // and will panic if the latter is nil. We add it ourselves after we constructed the manager. + clusterConnector, err := clusterconnector.NewUnmanaged(config, "", clusterconnector.Options{ + Scheme: options.Scheme, + MapperProvider: options.MapperProvider, + SyncPeriod: options.SyncPeriod, + Namespace: &options.Namespace, + NewCache: options.NewCache, + NewClient: options.NewClient, + DryRunClient: &options.DryRunClient, + EventBroadcaster: options.EventBroadcaster, + }) if err != nil { return nil, err } - if options.DryRunClient { - writeObj = client.NewDryRunClient(writeObj) - } - - // Create the recorder provider to inject event recorders for the components. - // TODO(directxman12): the log for the event provider should have a context (name, tags, etc) specific - // to the particular controller that it's being injected into, rather than a generic one like is here. - recorderProvider, err := options.newRecorderProvider(config, options.Scheme, log.WithName("events"), options.EventBroadcaster) - if err != nil { - return nil, err - } + // Set default values for options fields + options = setOptionsDefaults(options) // Create the resource lock to enable leader election) - resourceLock, err := options.newResourceLock(config, recorderProvider, leaderelection.Options{ + resourceLock, err := options.newResourceLock(config, clusterConnector, leaderelection.Options{ LeaderElection: options.LeaderElection, LeaderElectionID: options.LeaderElectionID, LeaderElectionNamespace: options.LeaderElectionNamespace, @@ -316,16 +249,8 @@ func New(config *rest.Config, options Options) (Manager, error) { stop := make(chan struct{}) - return &controllerManager{ - config: config, - scheme: options.Scheme, - cache: cache, - fieldIndexes: cache, - client: writeObj, - apiReader: apiReader, - recorderProvider: recorderProvider, + cm := &controllerManager{ resourceLock: resourceLock, - mapper: mapper, metricsListener: metricsListener, metricsExtraHandlers: metricsExtraHandlers, internalStop: stop, @@ -340,7 +265,14 @@ func New(config *rest.Config, options Options) (Manager, error) { healthProbeListener: healthProbeListener, readinessEndpointName: options.ReadinessEndpointName, livenessEndpointName: options.LivenessEndpointName, - }, nil + ClusterConnector: clusterConnector, + } + + if err := cm.ClusterConnector.AddToManager(cm); err != nil { + return nil, err + } + + return cm, nil } // DefaultNewClient creates the default caching client @@ -376,32 +308,6 @@ func defaultHealthProbeListener(addr string) (net.Listener, error) { // setOptionsDefaults set default values for Options fields func setOptionsDefaults(options Options) Options { - // Use the Kubernetes client-go scheme if none is specified - if options.Scheme == nil { - options.Scheme = scheme.Scheme - } - - if options.MapperProvider == nil { - options.MapperProvider = func(c *rest.Config) (meta.RESTMapper, error) { - return apiutil.NewDynamicRESTMapper(c) - } - } - - // Allow newClient to be mocked - if options.NewClient == nil { - options.NewClient = DefaultNewClient - } - - // Allow newCache to be mocked - if options.NewCache == nil { - options.NewCache = cache.New - } - - // Allow newRecorderProvider to be mocked - if options.newRecorderProvider == nil { - options.newRecorderProvider = internalrecorder.NewProvider - } - // Allow newResourceLock to be mocked if options.newResourceLock == nil { options.newResourceLock = leaderelection.NewResourceLock @@ -423,10 +329,6 @@ func setOptionsDefaults(options Options) Options { options.RetryPeriod = &retryPeriod } - if options.EventBroadcaster == nil { - options.EventBroadcaster = record.NewBroadcaster() - } - if options.ReadinessEndpointName == "" { options.ReadinessEndpointName = defaultReadinessEndpoint } diff --git a/pkg/manager/manager_test.go b/pkg/manager/manager_test.go index 0788c5a972..07a5512a1f 100644 --- a/pkg/manager/manager_test.go +++ b/pkg/manager/manager_test.go @@ -23,7 +23,6 @@ import ( "net/http" rt "runtime" - "github.com/go-logr/logr" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" "github.com/prometheus/client_golang/prometheus" @@ -32,10 +31,9 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/rest" "k8s.io/client-go/tools/leaderelection/resourcelock" - "k8s.io/client-go/tools/record" "sigs.k8s.io/controller-runtime/pkg/cache" - "sigs.k8s.io/controller-runtime/pkg/cache/informertest" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/clusterconnector" "sigs.k8s.io/controller-runtime/pkg/leaderelection" fakeleaderelection "sigs.k8s.io/controller-runtime/pkg/leaderelection/fake" "sigs.k8s.io/controller-runtime/pkg/metrics" @@ -44,6 +42,9 @@ import ( "sigs.k8s.io/controller-runtime/pkg/runtime/inject" ) +// ControllerManager is a clusterconnector.Manager +var _ clusterconnector.Manager = &controllerManager{} + var _ = Describe("manger.Manager", func() { var stop chan struct{} @@ -112,19 +113,6 @@ var _ = Describe("manger.Manager", func() { close(done) }) - It("should return an error it can't create a recorder.Provider", func(done Done) { - m, err := New(cfg, Options{ - newRecorderProvider: func(config *rest.Config, scheme *runtime.Scheme, logger logr.Logger, broadcaster record.EventBroadcaster) (recorder.Provider, error) { - return nil, fmt.Errorf("expected error") - }, - }) - Expect(m).To(BeNil()) - Expect(err).To(HaveOccurred()) - Expect(err.Error()).To(ContainSubstring("expected error")) - - close(done) - }) - It("should lazily initialize a webhook server if needed", func(done Done) { By("creating a manager with options") m, err := New(cfg, Options{Port: 9443, Host: "foo.com"}) @@ -336,9 +324,9 @@ var _ = Describe("manger.Manager", func() { Expect(err).NotTo(HaveOccurred()) mgr, ok := m.(*controllerManager) Expect(ok).To(BeTrue()) - mgr.startCache = func(stop <-chan struct{}) error { + mgr.caches = []runnableCache{&fakeRunnableCache{start: func(stop <-chan struct{}) error { return fmt.Errorf("expected error") - } + }}} Expect(m.Start(stop)).To(MatchError(ContainSubstring("expected error"))) close(done) @@ -750,10 +738,6 @@ var _ = Describe("manger.Manager", func() { It("should inject field values", func(done Done) { m, err := New(cfg, Options{}) Expect(err).NotTo(HaveOccurred()) - mgr, ok := m.(*controllerManager) - Expect(ok).To(BeTrue()) - - mgr.cache = &informertest.FakeInformers{} By("Injecting the dependencies") err = m.SetFields(&injectable{ @@ -859,7 +843,7 @@ var _ = Describe("manger.Manager", func() { Expect(err).NotTo(HaveOccurred()) mgr, ok := m.(*controllerManager) Expect(ok).To(BeTrue()) - Expect(m.GetConfig()).To(Equal(mgr.config)) + Expect(m.GetConfig()).To(Equal(mgr.ClusterConnector.GetConfig())) }) It("should provide a function to get the Client", func() { @@ -867,7 +851,7 @@ var _ = Describe("manger.Manager", func() { Expect(err).NotTo(HaveOccurred()) mgr, ok := m.(*controllerManager) Expect(ok).To(BeTrue()) - Expect(m.GetClient()).To(Equal(mgr.client)) + Expect(m.GetClient()).To(Equal(mgr.ClusterConnector.GetClient())) }) It("should provide a function to get the Scheme", func() { @@ -875,7 +859,7 @@ var _ = Describe("manger.Manager", func() { Expect(err).NotTo(HaveOccurred()) mgr, ok := m.(*controllerManager) Expect(ok).To(BeTrue()) - Expect(m.GetScheme()).To(Equal(mgr.scheme)) + Expect(m.GetScheme()).To(Equal(mgr.ClusterConnector.GetScheme())) }) It("should provide a function to get the FieldIndexer", func() { @@ -883,7 +867,7 @@ var _ = Describe("manger.Manager", func() { Expect(err).NotTo(HaveOccurred()) mgr, ok := m.(*controllerManager) Expect(ok).To(BeTrue()) - Expect(m.GetFieldIndexer()).To(Equal(mgr.fieldIndexes)) + Expect(m.GetFieldIndexer()).To(Equal(mgr.ClusterConnector.GetFieldIndexer())) }) It("should provide a function to get the EventRecorder", func() { @@ -976,3 +960,17 @@ func (i *injectable) InjectStopChannel(stop <-chan struct{}) error { func (i *injectable) Start(<-chan struct{}) error { return nil } + +var _ runnableCache = &fakeRunnableCache{} + +type fakeRunnableCache struct { + start func(<-chan struct{}) error +} + +func (frc *fakeRunnableCache) Start(c <-chan struct{}) error { + return frc.start(c) +} + +func (frc *fakeRunnableCache) WaitForCacheSync(_ <-chan struct{}) bool { + return true +} diff --git a/pkg/manager/runner/runner.go b/pkg/manager/runner/runner.go new file mode 100644 index 0000000000..78d113c34f --- /dev/null +++ b/pkg/manager/runner/runner.go @@ -0,0 +1,27 @@ +/* +Copyright 2020 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package runner + +// Runnable allows a component to be started. +// It's very important that Start blocks until +// it's done running. +type Runnable interface { + // Start starts running the component. The component will stop running + // when the channel is closed. Start blocks until the channel is closed or + // an error occurs. + Start(<-chan struct{}) error +}