diff --git a/samples/controller/controller.go b/samples/controller/controller.go index 8d745c727e..7550a3d672 100644 --- a/samples/controller/controller.go +++ b/samples/controller/controller.go @@ -18,7 +18,6 @@ package main import ( "fmt" - "time" "github.com/golang/glog" appsv1 "k8s.io/api/apps/v1" @@ -27,21 +26,16 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/util/runtime" - "k8s.io/apimachinery/pkg/util/wait" - kubeinformers "k8s.io/client-go/informers" - "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/scheme" - typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1" - appslisters "k8s.io/client-go/listers/apps/v1" - "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/record" - "k8s.io/client-go/util/workqueue" + "github.com/kubernetes-sigs/kubebuilder/pkg/controller" + "github.com/kubernetes-sigs/kubebuilder/pkg/controller/eventhandlers" + "github.com/kubernetes-sigs/kubebuilder/pkg/controller/predicates" + "github.com/kubernetes-sigs/kubebuilder/pkg/controller/types" samplev1alpha1 "github.com/kubernetes-sigs/kubebuilder/samples/controller/pkg/apis/samplecontroller/v1alpha1" - clientset "github.com/kubernetes-sigs/kubebuilder/samples/controller/pkg/client/clientset/versioned" samplescheme "github.com/kubernetes-sigs/kubebuilder/samples/controller/pkg/client/clientset/versioned/scheme" - informers "github.com/kubernetes-sigs/kubebuilder/samples/controller/pkg/client/informers/externalversions" - listers "github.com/kubernetes-sigs/kubebuilder/samples/controller/pkg/client/listers/samplecontroller/v1alpha1" + "github.com/kubernetes-sigs/kubebuilder/samples/controller/pkg/inject/args" ) const controllerAgentName = "sample-controller" @@ -63,196 +57,53 @@ const ( // Controller is the controller implementation for Foo resources type Controller struct { - // kubeclientset is a standard kubernetes clientset - kubeclientset kubernetes.Interface - // sampleclientset is a clientset for our own API group - sampleclientset clientset.Interface + args.InjectArgs - deploymentsLister appslisters.DeploymentLister - deploymentsSynced cache.InformerSynced - foosLister listers.FooLister - foosSynced cache.InformerSynced - - // workqueue is a rate limited work queue. This is used to queue work to be - // processed instead of performing it as soon as a change happens. This - // means we can ensure we only process a fixed amount of resources at a - // time, and makes it easy to ensure we are never processing the same item - // simultaneously in two different workers. - workqueue workqueue.RateLimitingInterface // recorder is an event recorder for recording Event resources to the // Kubernetes API. recorder record.EventRecorder } // NewController returns a new sample controller -func NewController( - kubeclientset kubernetes.Interface, - sampleclientset clientset.Interface, - kubeInformerFactory kubeinformers.SharedInformerFactory, - sampleInformerFactory informers.SharedInformerFactory) *Controller { - - // obtain references to shared index informers for the Deployment and Foo - // types. - deploymentInformer := kubeInformerFactory.Apps().V1().Deployments() - fooInformer := sampleInformerFactory.Samplecontroller().V1alpha1().Foos() - - // Create event broadcaster - // Add sample-controller types to the default Kubernetes Scheme so Events can be - // logged for sample-controller types. +func NewController(iargs args.InjectArgs) *controller.GenericController { samplescheme.AddToScheme(scheme.Scheme) - glog.V(4).Info("Creating event broadcaster") - eventBroadcaster := record.NewBroadcaster() - eventBroadcaster.StartLogging(glog.Infof) - eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: kubeclientset.CoreV1().Events("")}) - recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: controllerAgentName}) - controller := &Controller{ - kubeclientset: kubeclientset, - sampleclientset: sampleclientset, - deploymentsLister: deploymentInformer.Lister(), - deploymentsSynced: deploymentInformer.Informer().HasSynced, - foosLister: fooInformer.Lister(), - foosSynced: fooInformer.Informer().HasSynced, - workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Foos"), - recorder: recorder, + c := &Controller{ + recorder: iargs.CreateRecorder(controllerAgentName), + } + + genericController := &controller.GenericController{ + Name: controllerAgentName, + InformerRegistry: iargs.ControllerManager, + Reconcile: c.syncHandler, } glog.Info("Setting up event handlers") - // Set up an event handler for when Foo resources change - fooInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ - AddFunc: controller.enqueueFoo, - UpdateFunc: func(old, new interface{}) { - controller.enqueueFoo(new) - }, - }) + genericController.Watch(&samplev1alpha1.Foo{}) + // Set up an event handler for when Deployment resources change. This // handler will lookup the owner of the given Deployment, and if it is // owned by a Foo resource will enqueue that Foo resource for // processing. This way, we don't need to implement custom logic for // handling Deployment resources. More info on this pattern: // https://github.com/kubernetes/community/blob/8cafef897a22026d42f5e5bb3f104febe7e29830/contributors/devel/controllers.md - deploymentInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ - AddFunc: controller.handleObject, - UpdateFunc: func(old, new interface{}) { - newDepl := new.(*appsv1.Deployment) - oldDepl := old.(*appsv1.Deployment) - if newDepl.ResourceVersion == oldDepl.ResourceVersion { - // Periodic resync will send update events for all known Deployments. - // Two different versions of the same Deployment will always have different RVs. - return - } - controller.handleObject(new) - }, - DeleteFunc: controller.handleObject, - }) + genericController.WatchControllerOf(&appsv1.Deployment{}, eventhandlers.Path{c.LookupFoo}, + predicates.ResourceVersionChanged) - return controller + return genericController } -// Run will set up the event handlers for types we are interested in, as well -// as syncing informer caches and starting workers. It will block until stopCh -// is closed, at which point it will shutdown the workqueue and wait for -// workers to finish processing their current work items. -func (c *Controller) Run(threadiness int, stopCh <-chan struct{}) error { - defer runtime.HandleCrash() - defer c.workqueue.ShutDown() - - // Start the informer factories to begin populating the informer caches - glog.Info("Starting Foo controller") - - // Wait for the caches to be synced before starting workers - glog.Info("Waiting for informer caches to sync") - if ok := cache.WaitForCacheSync(stopCh, c.deploymentsSynced, c.foosSynced); !ok { - return fmt.Errorf("failed to wait for caches to sync") - } - - glog.Info("Starting workers") - // Launch two workers to process Foo resources - for i := 0; i < threadiness; i++ { - go wait.Until(c.runWorker, time.Second, stopCh) - } - - glog.Info("Started workers") - <-stopCh - glog.Info("Shutting down workers") - - return nil -} - -// runWorker is a long-running function that will continually call the -// processNextWorkItem function in order to read and process a message on the -// workqueue. -func (c *Controller) runWorker() { - for c.processNextWorkItem() { - } -} - -// processNextWorkItem will read a single work item off the workqueue and -// attempt to process it, by calling the syncHandler. -func (c *Controller) processNextWorkItem() bool { - obj, shutdown := c.workqueue.Get() - - if shutdown { - return false - } - - // We wrap this block in a func so we can defer c.workqueue.Done. - err := func(obj interface{}) error { - // We call Done here so the workqueue knows we have finished - // processing this item. We also must remember to call Forget if we - // do not want this work item being re-queued. For example, we do - // not call Forget if a transient error occurs, instead the item is - // put back on the workqueue and attempted again after a back-off - // period. - defer c.workqueue.Done(obj) - var key string - var ok bool - // We expect strings to come off the workqueue. These are of the - // form namespace/name. We do this as the delayed nature of the - // workqueue means the items in the informer cache may actually be - // more up to date that when the item was initially put onto the - // workqueue. - if key, ok = obj.(string); !ok { - // As the item in the workqueue is actually invalid, we call - // Forget here else we'd go into a loop of attempting to - // process a work item that is invalid. - c.workqueue.Forget(obj) - runtime.HandleError(fmt.Errorf("expected string in workqueue but got %#v", obj)) - return nil - } - // Run the syncHandler, passing it the namespace/name string of the - // Foo resource to be synced. - if err := c.syncHandler(key); err != nil { - return fmt.Errorf("error syncing '%s': %s", key, err.Error()) - } - // Finally, if no error occurs we Forget this item so it does not - // get queued again until another change happens. - c.workqueue.Forget(obj) - glog.Infof("Successfully synced '%s'", key) - return nil - }(obj) - - if err != nil { - runtime.HandleError(err) - return true - } - - return true +// LookupFoo looksup a Foo from the lister +func (c Controller) LookupFoo(r types.ReconcileKey) (interface{}, error) { + return c.Informers.Samplecontroller().V1alpha1().Foos().Lister().Foos(r.Namespace).Get(r.Name) } // syncHandler compares the actual state with the desired, and attempts to // converge the two. It then updates the Status block of the Foo resource // with the current status of the resource. -func (c *Controller) syncHandler(key string) error { - // Convert the namespace/name string into a distinct namespace and name - namespace, name, err := cache.SplitMetaNamespaceKey(key) - if err != nil { - runtime.HandleError(fmt.Errorf("invalid resource key: %s", key)) - return nil - } - - // Get the Foo resource with this namespace/name - foo, err := c.foosLister.Foos(namespace).Get(name) +func (c *Controller) syncHandler(key types.ReconcileKey) error { + namespace, name := key.Namespace, key.Name + foo, err := c.Informers.Samplecontroller().V1alpha1().Foos().Lister().Foos(namespace).Get(name) if err != nil { // The Foo resource may no longer exist, in which case we stop // processing. @@ -274,10 +125,10 @@ func (c *Controller) syncHandler(key string) error { } // Get the deployment with the name specified in Foo.spec - deployment, err := c.deploymentsLister.Deployments(foo.Namespace).Get(deploymentName) + deployment, err := c.KubernetesInformers.Apps().V1().Deployments().Lister().Deployments(foo.Namespace).Get(deploymentName) // If the resource doesn't exist, we'll create it if errors.IsNotFound(err) { - deployment, err = c.kubeclientset.AppsV1().Deployments(foo.Namespace).Create(newDeployment(foo)) + deployment, err = c.KubernetesClientSet.AppsV1().Deployments(foo.Namespace).Create(newDeployment(foo)) } // If an error occurs during Get/Create, we'll requeue the item so we can @@ -300,7 +151,7 @@ func (c *Controller) syncHandler(key string) error { // should update the Deployment resource. if foo.Spec.Replicas != nil && *foo.Spec.Replicas != *deployment.Spec.Replicas { glog.V(4).Infof("Foo %s replicas: %d, deployment replicas: %d", name, *foo.Spec.Replicas, *deployment.Spec.Replicas) - deployment, err = c.kubeclientset.AppsV1().Deployments(foo.Namespace).Update(newDeployment(foo)) + deployment, err = c.KubernetesClientSet.AppsV1().Deployments(foo.Namespace).Update(newDeployment(foo)) } // If an error occurs during Update, we'll requeue the item so we can @@ -331,63 +182,10 @@ func (c *Controller) updateFooStatus(foo *samplev1alpha1.Foo, deployment *appsv1 // update the Status block of the Foo resource. UpdateStatus will not // allow changes to the Spec of the resource, which is ideal for ensuring // nothing other than resource status has been updated. - _, err := c.sampleclientset.SamplecontrollerV1alpha1().Foos(foo.Namespace).Update(fooCopy) + _, err := c.Clientset.SamplecontrollerV1alpha1().Foos(foo.Namespace).Update(fooCopy) return err } -// enqueueFoo takes a Foo resource and converts it into a namespace/name -// string which is then put onto the work queue. This method should *not* be -// passed resources of any type other than Foo. -func (c *Controller) enqueueFoo(obj interface{}) { - var key string - var err error - if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil { - runtime.HandleError(err) - return - } - c.workqueue.AddRateLimited(key) -} - -// handleObject will take any resource implementing metav1.Object and attempt -// to find the Foo resource that 'owns' it. It does this by looking at the -// objects metadata.ownerReferences field for an appropriate OwnerReference. -// It then enqueues that Foo resource to be processed. If the object does not -// have an appropriate OwnerReference, it will simply be skipped. -func (c *Controller) handleObject(obj interface{}) { - var object metav1.Object - var ok bool - if object, ok = obj.(metav1.Object); !ok { - tombstone, ok := obj.(cache.DeletedFinalStateUnknown) - if !ok { - runtime.HandleError(fmt.Errorf("error decoding object, invalid type")) - return - } - object, ok = tombstone.Obj.(metav1.Object) - if !ok { - runtime.HandleError(fmt.Errorf("error decoding object tombstone, invalid type")) - return - } - glog.V(4).Infof("Recovered deleted object '%s' from tombstone", object.GetName()) - } - glog.V(4).Infof("Processing object: %s", object.GetName()) - if ownerRef := metav1.GetControllerOf(object); ownerRef != nil { - // If this object is not owned by a Foo, we should not do anything more - // with it. - if ownerRef.Kind != "Foo" { - return - } - - foo, err := c.foosLister.Foos(object.GetNamespace()).Get(ownerRef.Name) - if err != nil { - glog.V(4).Infof("ignoring orphaned object '%s' of foo '%s'", object.GetSelfLink(), ownerRef.Name) - return - } - - c.enqueueFoo(foo) - return - } -} - // newDeployment creates a new Deployment for a Foo resource. It also sets // the appropriate OwnerReferences on the resource so handleObject can discover // the Foo resource that 'owns' it. diff --git a/samples/controller/main.go b/samples/controller/main.go index 1ba31cc6d7..1aed08885b 100644 --- a/samples/controller/main.go +++ b/samples/controller/main.go @@ -18,60 +18,33 @@ package main import ( "flag" - "time" - "github.com/golang/glog" - kubeinformers "k8s.io/client-go/informers" - "k8s.io/client-go/kubernetes" - "k8s.io/client-go/tools/clientcmd" // Uncomment the following line to load the gcp plugin (only required to authenticate against GKE clusters). - // _ "k8s.io/client-go/plugin/pkg/client/auth/gcp" - - clientset "github.com/kubernetes-sigs/kubebuilder/samples/controller/pkg/client/clientset/versioned" - informers "github.com/kubernetes-sigs/kubebuilder/samples/controller/pkg/client/informers/externalversions" - "github.com/kubernetes-sigs/kubebuilder/samples/controller/pkg/signals" -) - -var ( - masterURL string - kubeconfig string + _ "k8s.io/client-go/plugin/pkg/client/auth/gcp" + + "github.com/kubernetes-sigs/kubebuilder/pkg/config" + "github.com/kubernetes-sigs/kubebuilder/pkg/inject/run" + "github.com/kubernetes-sigs/kubebuilder/pkg/signals" + "github.com/kubernetes-sigs/kubebuilder/samples/controller/pkg/apis/samplecontroller/v1alpha1" + "github.com/kubernetes-sigs/kubebuilder/samples/controller/pkg/inject/args" + "k8s.io/api/apps/v1" ) func main() { + // Setup clients, informers, channels for injection flag.Parse() + config := config.GetConfigOrDie() + rargs := run.RunArguments{Stop: signals.SetupSignalHandler()} + iargs := args.CreateInjectArgs(config) - // set up signals so we handle the first shutdown signal gracefully - stopCh := signals.SetupSignalHandler() - - cfg, err := clientcmd.BuildConfigFromFlags(masterURL, kubeconfig) - if err != nil { - glog.Fatalf("Error building kubeconfig: %s", err.Error()) - } - - kubeClient, err := kubernetes.NewForConfig(cfg) - if err != nil { - glog.Fatalf("Error building kubernetes clientset: %s", err.Error()) - } + // Start informers + iargs.ControllerManager.AddInformerProvider(&v1.Deployment{}, iargs.KubernetesInformers.Apps().V1().Deployments()) + iargs.ControllerManager.AddInformerProvider(&v1alpha1.Foo{}, iargs.Informers.Samplecontroller().V1alpha1().Foos()) - exampleClient, err := clientset.NewForConfig(cfg) - if err != nil { - glog.Fatalf("Error building example clientset: %s", err.Error()) - } - - kubeInformerFactory := kubeinformers.NewSharedInformerFactory(kubeClient, time.Second*30) - exampleInformerFactory := informers.NewSharedInformerFactory(exampleClient, time.Second*30) - - controller := NewController(kubeClient, exampleClient, kubeInformerFactory, exampleInformerFactory) - - go kubeInformerFactory.Start(stopCh) - go exampleInformerFactory.Start(stopCh) - - if err = controller.Run(2, stopCh); err != nil { - glog.Fatalf("Error running controller: %s", err.Error()) - } -} + // Add the Foo controller + iargs.ControllerManager.AddController(NewController(iargs)) -func init() { - flag.StringVar(&kubeconfig, "kubeconfig", "", "Path to a kubeconfig. Only required if out-of-cluster.") - flag.StringVar(&masterURL, "master", "", "The address of the Kubernetes API server. Overrides any value in kubeconfig. Only required if out-of-cluster.") + // Run the Informers and Controllers + iargs.ControllerManager.RunInformersAndControllers(rargs) + <-rargs.Stop } diff --git a/samples/controller/pkg/inject/args/args.go b/samples/controller/pkg/inject/args/args.go new file mode 100644 index 0000000000..6d261df3cd --- /dev/null +++ b/samples/controller/pkg/inject/args/args.go @@ -0,0 +1,29 @@ +package args + +import ( + "time" + + "github.com/kubernetes-sigs/kubebuilder/pkg/inject/args" + "k8s.io/client-go/rest" + + "github.com/kubernetes-sigs/kubebuilder/samples/controller/pkg/client/clientset/versioned" + "github.com/kubernetes-sigs/kubebuilder/samples/controller/pkg/client/informers/externalversions" +) + +// InjectArgs are the arguments need to initialize controllers +type InjectArgs struct { + args.InjectArgs + + Clientset *versioned.Clientset + Informers externalversions.SharedInformerFactory +} + +// CreateInjectArgs returns new controller args +func CreateInjectArgs(config *rest.Config) InjectArgs { + cs := versioned.NewForConfigOrDie(config) + return InjectArgs{ + InjectArgs: args.CreateInjectArgs(config), + Clientset: cs, + Informers: externalversions.NewSharedInformerFactory(cs, 2*time.Minute), + } +} diff --git a/samples/controller/pkg/signals/signal.go b/samples/controller/pkg/signals/signal.go deleted file mode 100644 index 6bddfddb4f..0000000000 --- a/samples/controller/pkg/signals/signal.go +++ /dev/null @@ -1,43 +0,0 @@ -/* -Copyright 2017 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package signals - -import ( - "os" - "os/signal" -) - -var onlyOneSignalHandler = make(chan struct{}) - -// SetupSignalHandler registered for SIGTERM and SIGINT. A stop channel is returned -// which is closed on one of these signals. If a second signal is caught, the program -// is terminated with exit code 1. -func SetupSignalHandler() (stopCh <-chan struct{}) { - close(onlyOneSignalHandler) // panics when called twice - - stop := make(chan struct{}) - c := make(chan os.Signal, 2) - signal.Notify(c, shutdownSignals...) - go func() { - <-c - close(stop) - <-c - os.Exit(1) // second signal. Exit directly. - }() - - return stop -} diff --git a/samples/controller/pkg/signals/signal_posix.go b/samples/controller/pkg/signals/signal_posix.go deleted file mode 100644 index 9bdb4e7418..0000000000 --- a/samples/controller/pkg/signals/signal_posix.go +++ /dev/null @@ -1,26 +0,0 @@ -// +build !windows - -/* -Copyright 2017 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package signals - -import ( - "os" - "syscall" -) - -var shutdownSignals = []os.Signal{os.Interrupt, syscall.SIGTERM} diff --git a/samples/controller/pkg/signals/signal_windows.go b/samples/controller/pkg/signals/signal_windows.go deleted file mode 100644 index 4907d573fe..0000000000 --- a/samples/controller/pkg/signals/signal_windows.go +++ /dev/null @@ -1,23 +0,0 @@ -/* -Copyright 2017 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package signals - -import ( - "os" -) - -var shutdownSignals = []os.Signal{os.Interrupt}