diff --git a/Gopkg.lock b/Gopkg.lock index 5907ab15d1..0996047409 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -970,6 +970,7 @@ "k8s.io/client-go/testing", "k8s.io/client-go/tools/cache", "k8s.io/client-go/tools/clientcmd", + "k8s.io/client-go/tools/clientcmd/api", "k8s.io/client-go/tools/leaderelection", "k8s.io/client-go/tools/leaderelection/resourcelock", "k8s.io/client-go/tools/metrics", diff --git a/pkg/client/config/config.go b/pkg/client/config/config.go index 8d022c5cfa..25cd32ca18 100644 --- a/pkg/client/config/config.go +++ b/pkg/client/config/config.go @@ -25,6 +25,7 @@ import ( "k8s.io/client-go/rest" "k8s.io/client-go/tools/clientcmd" + clientcmdapi "k8s.io/client-go/tools/clientcmd/api" logf "sigs.k8s.io/controller-runtime/pkg/internal/log" ) @@ -61,7 +62,27 @@ func init() { // // * $HOME/.kube/config if exists func GetConfig() (*rest.Config, error) { - cfg, err := loadConfig() + return GetConfigWithContext("") +} + +// GetConfigWithContext creates a *rest.Config for talking to a Kubernetes API server with a specific context. +// If --kubeconfig is set, will use the kubeconfig file at that location. Otherwise will assume running +// in cluster and use the cluster provided kubeconfig. +// +// It also applies saner defaults for QPS and burst based on the Kubernetes +// controller manager defaults (20 QPS, 30 burst) +// +// Config precedence +// +// * --kubeconfig flag pointing at a file +// +// * KUBECONFIG environment variable pointing at a file +// +// * In-cluster config if running in cluster +// +// * $HOME/.kube/config if exists +func GetConfigWithContext(context string) (*rest.Config, error) { + cfg, err := loadConfig(context) if err != nil { return nil, err } @@ -75,14 +96,15 @@ func GetConfig() (*rest.Config, error) { } // loadConfig loads a REST Config as per the rules specified in GetConfig -func loadConfig() (*rest.Config, error) { +func loadConfig(context string) (*rest.Config, error) { + // If a flag is specified with the config location, use that if len(kubeconfig) > 0 { - return clientcmd.BuildConfigFromFlags(apiServerURL, kubeconfig) + return loadConfigWithContext(apiServerURL, kubeconfig, context) } - // If an env variable is specified with the config locaiton, use that + // If an env variable is specified with the config location, use that if len(os.Getenv("KUBECONFIG")) > 0 { - return clientcmd.BuildConfigFromFlags(apiServerURL, os.Getenv("KUBECONFIG")) + return loadConfigWithContext(apiServerURL, os.Getenv("KUBECONFIG"), context) } // If no explicit location, try the in-cluster config if c, err := rest.InClusterConfig(); err == nil { @@ -90,8 +112,8 @@ func loadConfig() (*rest.Config, error) { } // If no in-cluster config, try the default location in the user's home directory if usr, err := user.Current(); err == nil { - if c, err := clientcmd.BuildConfigFromFlags( - "", filepath.Join(usr.HomeDir, ".kube", "config")); err == nil { + if c, err := loadConfigWithContext(apiServerURL, filepath.Join(usr.HomeDir, ".kube", "config"), + context); err == nil { return c, nil } } @@ -99,6 +121,17 @@ func loadConfig() (*rest.Config, error) { return nil, fmt.Errorf("could not locate a kubeconfig") } +func loadConfigWithContext(apiServerURL, kubeconfig, context string) (*rest.Config, error) { + return clientcmd.NewNonInteractiveDeferredLoadingClientConfig( + &clientcmd.ClientConfigLoadingRules{ExplicitPath: kubeconfig}, + &clientcmd.ConfigOverrides{ + ClusterInfo: clientcmdapi.Cluster{ + Server: apiServerURL, + }, + CurrentContext: context, + }).ClientConfig() +} + // GetConfigOrDie creates a *rest.Config for talking to a Kubernetes apiserver. // If --kubeconfig is set, will use the kubeconfig file at that location. Otherwise will assume running // in cluster and use the cluster provided kubeconfig. diff --git a/pkg/manager/internal.go b/pkg/manager/internal.go index 19857bdd03..ed4ae806e3 100644 --- a/pkg/manager/internal.go +++ b/pkg/manager/internal.go @@ -90,9 +90,10 @@ type controllerManager struct { // metricsListener is used to serve prometheus metrics metricsListener net.Listener - mu sync.Mutex - started bool - errChan chan error + mu sync.Mutex + started bool + startedLeader bool + errChan chan error // internalStop is the stop channel *actually* used by everything involved // with the manager as a stop channel, so that we can pass a stop channel @@ -134,14 +135,18 @@ func (cm *controllerManager) Add(r Runnable) error { return err } + var shouldStart bool + // Add the runnable to the leader election or the non-leaderelection list if leRunnable, ok := r.(LeaderElectionRunnable); ok && !leRunnable.NeedLeaderElection() { + shouldStart = cm.started cm.nonLeaderElectionRunnables = append(cm.nonLeaderElectionRunnables, r) } else { + shouldStart = cm.startedLeader cm.leaderElectionRunnables = append(cm.leaderElectionRunnables, r) } - if cm.started { + if shouldStart { // If already started, start the controller go func() { cm.errChan <- r.Start(cm.internalStop) @@ -225,17 +230,19 @@ func (cm *controllerManager) GetWebhookServer() *webhook.Server { } func (cm *controllerManager) serveMetrics(stop <-chan struct{}) { + var metricsPath = "/metrics" handler := promhttp.HandlerFor(metrics.Registry, promhttp.HandlerOpts{ ErrorHandling: promhttp.HTTPErrorOnError, }) // TODO(JoelSpeed): Use existing Kubernetes machinery for serving metrics mux := http.NewServeMux() - mux.Handle("/metrics", handler) + mux.Handle(metricsPath, handler) server := http.Server{ Handler: mux, } // Run the server go func() { + log.Info("starting metrics server", "path", metricsPath) if err := server.Serve(cm.metricsListener); err != nil && err != http.ErrServerClosed { cm.errChan <- err } @@ -314,6 +321,8 @@ func (cm *controllerManager) startLeaderElectionRunnables() { cm.errChan <- ctrl.Start(cm.internalStop) }() } + + cm.startedLeader = true } func (cm *controllerManager) waitForCache() { diff --git a/pkg/manager/manager.go b/pkg/manager/manager.go index 670df79d08..5469455f1e 100644 --- a/pkg/manager/manager.go +++ b/pkg/manager/manager.go @@ -135,7 +135,8 @@ type Options struct { Namespace string // MetricsBindAddress is the TCP address that the controller should bind to - // for serving prometheus metrics + // for serving prometheus metrics. + // It can be set to "0" to disable the metrics serving. MetricsBindAddress string // Port is the port that the webhook server serves at. diff --git a/pkg/metrics/listener.go b/pkg/metrics/listener.go index e9db3a110c..693bff8413 100644 --- a/pkg/metrics/listener.go +++ b/pkg/metrics/listener.go @@ -21,11 +21,9 @@ import ( "net" ) -// DefaultBindAddress sets the default bind address for the metrics -// listener -// The metrics is off by default. -// TODO: Flip the default by changing DefaultBindAddress back to ":8080" in the v0.2.0. -var DefaultBindAddress = "0" +// DefaultBindAddress sets the default bind address for the metrics listener +// The metrics is on by default. +var DefaultBindAddress = ":8080" // NewListener creates a new TCP listener bound to the given address. func NewListener(addr string) (net.Listener, error) { @@ -39,9 +37,12 @@ func NewListener(addr string) (net.Listener, error) { return nil, nil } + log.Info("metrics server is starting to listen", "addr", addr) ln, err := net.Listen("tcp", addr) if err != nil { - return nil, fmt.Errorf("error listening on %s: %v", addr, err) + er := fmt.Errorf("error listening on %s: %v", addr, err) + log.Error(er, "metrics server failed to listen. You may want to disable the metrics server or use another port if it is due to conflicts") + return nil, er } return ln, nil } diff --git a/pkg/webhook/conversion/conversion.go b/pkg/webhook/conversion/conversion.go index f9bd5cb2db..05760cbbc4 100644 --- a/pkg/webhook/conversion/conversion.go +++ b/pkg/webhook/conversion/conversion.go @@ -174,7 +174,10 @@ func (wh *Webhook) convertViaHub(src, dst conversion.Convertible) error { // getHub returns an instance of the Hub for passed-in object's group/kind. func (wh *Webhook) getHub(obj runtime.Object) (conversion.Hub, error) { - gvks := objectGVKs(wh.scheme, obj) + gvks, err := objectGVKs(wh.scheme, obj) + if err != nil { + return nil, err + } if len(gvks) == 0 { return nil, fmt.Errorf("error retrieving gvks for object : %v", obj) } @@ -223,7 +226,10 @@ func (wh *Webhook) allocateDstObject(apiVersion, kind string) (runtime.Object, e func IsConvertible(scheme *runtime.Scheme, obj runtime.Object) (bool, error) { var hubs, spokes, nonSpokes []runtime.Object - gvks := objectGVKs(scheme, obj) + gvks, err := objectGVKs(scheme, obj) + if err != nil { + return false, err + } if len(gvks) == 0 { return false, fmt.Errorf("error retrieving gvks for object : %v", obj) } @@ -273,18 +279,27 @@ func IsConvertible(scheme *runtime.Scheme, obj runtime.Object) (bool, error) { } // objectGVKs returns all (Group,Version,Kind) for the Group/Kind of given object. -func objectGVKs(scheme *runtime.Scheme, obj runtime.Object) []schema.GroupVersionKind { - var gvks []schema.GroupVersionKind - - objGVK := obj.GetObjectKind().GroupVersionKind() +func objectGVKs(scheme *runtime.Scheme, obj runtime.Object) ([]schema.GroupVersionKind, error) { + // NB: we should not use `obj.GetObjectKind().GroupVersionKind()` to get the + // GVK here, since it is parsed from apiVersion and kind fields and it may + // return empty GVK if obj is an uninitialized object. + objGVKs, _, err := scheme.ObjectKinds(obj) + if err != nil { + return nil, err + } + if len(objGVKs) != 1 { + return nil, fmt.Errorf("expect to get only one GVK for %v", obj) + } + objGVK := objGVKs[0] knownTypes := scheme.AllKnownTypes() + var gvks []schema.GroupVersionKind for gvk := range knownTypes { if objGVK.GroupKind() == gvk.GroupKind() { gvks = append(gvks, gvk) } } - return gvks + return gvks, nil } // PartialImplementationError represents an error due to partial conversion diff --git a/pkg/webhook/conversion/conversion_test.go b/pkg/webhook/conversion/conversion_test.go index 1f2e1dc0e3..64ca7b575d 100644 --- a/pkg/webhook/conversion/conversion_test.go +++ b/pkg/webhook/conversion/conversion_test.go @@ -29,6 +29,7 @@ import ( appsv1beta1 "k8s.io/api/apps/v1beta1" apix "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime" kscheme "k8s.io/client-go/kubernetes/scheme" @@ -312,6 +313,27 @@ var _ = Describe("IsConvertible", func() { Expect(jobsv3.AddToScheme(scheme)).To(Succeed()) }) + It("should not error for uninitialized types", func() { + obj := &jobsv2.ExternalJob{} + + ok, err := IsConvertible(scheme, obj) + Expect(err).NotTo(HaveOccurred()) + Expect(ok).To(BeTrue()) + }) + + It("should not error for unstructured types", func() { + obj := &unstructured.Unstructured{ + Object: map[string]interface{}{ + "kind": "ExternalJob", + "apiVersion": "jobs.testprojects.kb.io/v2", + }, + } + + ok, err := IsConvertible(scheme, obj) + Expect(err).NotTo(HaveOccurred()) + Expect(ok).To(BeTrue()) + }) + It("should return true for convertible types", func() { obj := &jobsv2.ExternalJob{ TypeMeta: metav1.TypeMeta{