Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🏃 ff release-0.2 branch to master HEAD #512

Merged
merged 8 commits into from
Jul 2, 2019
1 change: 1 addition & 0 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

47 changes: 40 additions & 7 deletions pkg/client/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (

"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
clientcmdapi "k8s.io/client-go/tools/clientcmd/api"
logf "sigs.k8s.io/controller-runtime/pkg/internal/log"
)

Expand Down Expand Up @@ -61,7 +62,27 @@ func init() {
//
// * $HOME/.kube/config if exists
func GetConfig() (*rest.Config, error) {
cfg, err := loadConfig()
return GetConfigWithContext("")
}

// GetConfigWithContext creates a *rest.Config for talking to a Kubernetes API server with a specific context.
// If --kubeconfig is set, will use the kubeconfig file at that location. Otherwise will assume running
// in cluster and use the cluster provided kubeconfig.
//
// It also applies saner defaults for QPS and burst based on the Kubernetes
// controller manager defaults (20 QPS, 30 burst)
//
// Config precedence
//
// * --kubeconfig flag pointing at a file
//
// * KUBECONFIG environment variable pointing at a file
//
// * In-cluster config if running in cluster
//
// * $HOME/.kube/config if exists
func GetConfigWithContext(context string) (*rest.Config, error) {
cfg, err := loadConfig(context)
if err != nil {
return nil, err
}
Expand All @@ -75,30 +96,42 @@ func GetConfig() (*rest.Config, error) {
}

// loadConfig loads a REST Config as per the rules specified in GetConfig
func loadConfig() (*rest.Config, error) {
func loadConfig(context string) (*rest.Config, error) {

// If a flag is specified with the config location, use that
if len(kubeconfig) > 0 {
return clientcmd.BuildConfigFromFlags(apiServerURL, kubeconfig)
return loadConfigWithContext(apiServerURL, kubeconfig, context)
}
// If an env variable is specified with the config locaiton, use that
// If an env variable is specified with the config location, use that
if len(os.Getenv("KUBECONFIG")) > 0 {
return clientcmd.BuildConfigFromFlags(apiServerURL, os.Getenv("KUBECONFIG"))
return loadConfigWithContext(apiServerURL, os.Getenv("KUBECONFIG"), context)
}
// If no explicit location, try the in-cluster config
if c, err := rest.InClusterConfig(); err == nil {
return c, nil
}
// If no in-cluster config, try the default location in the user's home directory
if usr, err := user.Current(); err == nil {
if c, err := clientcmd.BuildConfigFromFlags(
"", filepath.Join(usr.HomeDir, ".kube", "config")); err == nil {
if c, err := loadConfigWithContext(apiServerURL, filepath.Join(usr.HomeDir, ".kube", "config"),
context); err == nil {
return c, nil
}
}

return nil, fmt.Errorf("could not locate a kubeconfig")
}

func loadConfigWithContext(apiServerURL, kubeconfig, context string) (*rest.Config, error) {
return clientcmd.NewNonInteractiveDeferredLoadingClientConfig(
&clientcmd.ClientConfigLoadingRules{ExplicitPath: kubeconfig},
&clientcmd.ConfigOverrides{
ClusterInfo: clientcmdapi.Cluster{
Server: apiServerURL,
},
CurrentContext: context,
}).ClientConfig()
}

// GetConfigOrDie creates a *rest.Config for talking to a Kubernetes apiserver.
// If --kubeconfig is set, will use the kubeconfig file at that location. Otherwise will assume running
// in cluster and use the cluster provided kubeconfig.
Expand Down
19 changes: 14 additions & 5 deletions pkg/manager/internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,9 +90,10 @@ type controllerManager struct {
// metricsListener is used to serve prometheus metrics
metricsListener net.Listener

mu sync.Mutex
started bool
errChan chan error
mu sync.Mutex
started bool
startedLeader bool
errChan chan error

// internalStop is the stop channel *actually* used by everything involved
// with the manager as a stop channel, so that we can pass a stop channel
Expand Down Expand Up @@ -134,14 +135,18 @@ func (cm *controllerManager) Add(r Runnable) error {
return err
}

var shouldStart bool

// Add the runnable to the leader election or the non-leaderelection list
if leRunnable, ok := r.(LeaderElectionRunnable); ok && !leRunnable.NeedLeaderElection() {
shouldStart = cm.started
cm.nonLeaderElectionRunnables = append(cm.nonLeaderElectionRunnables, r)
} else {
shouldStart = cm.startedLeader
cm.leaderElectionRunnables = append(cm.leaderElectionRunnables, r)
}

if cm.started {
if shouldStart {
// If already started, start the controller
go func() {
cm.errChan <- r.Start(cm.internalStop)
Expand Down Expand Up @@ -225,17 +230,19 @@ func (cm *controllerManager) GetWebhookServer() *webhook.Server {
}

func (cm *controllerManager) serveMetrics(stop <-chan struct{}) {
var metricsPath = "/metrics"
handler := promhttp.HandlerFor(metrics.Registry, promhttp.HandlerOpts{
ErrorHandling: promhttp.HTTPErrorOnError,
})
// TODO(JoelSpeed): Use existing Kubernetes machinery for serving metrics
mux := http.NewServeMux()
mux.Handle("/metrics", handler)
mux.Handle(metricsPath, handler)
server := http.Server{
Handler: mux,
}
// Run the server
go func() {
log.Info("starting metrics server", "path", metricsPath)
if err := server.Serve(cm.metricsListener); err != nil && err != http.ErrServerClosed {
cm.errChan <- err
}
Expand Down Expand Up @@ -314,6 +321,8 @@ func (cm *controllerManager) startLeaderElectionRunnables() {
cm.errChan <- ctrl.Start(cm.internalStop)
}()
}

cm.startedLeader = true
}

func (cm *controllerManager) waitForCache() {
Expand Down
3 changes: 2 additions & 1 deletion pkg/manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,8 @@ type Options struct {
Namespace string

// MetricsBindAddress is the TCP address that the controller should bind to
// for serving prometheus metrics
// for serving prometheus metrics.
// It can be set to "0" to disable the metrics serving.
MetricsBindAddress string

// Port is the port that the webhook server serves at.
Expand Down
13 changes: 7 additions & 6 deletions pkg/metrics/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,9 @@ import (
"net"
)

// DefaultBindAddress sets the default bind address for the metrics
// listener
// The metrics is off by default.
// TODO: Flip the default by changing DefaultBindAddress back to ":8080" in the v0.2.0.
var DefaultBindAddress = "0"
// DefaultBindAddress sets the default bind address for the metrics listener
// The metrics is on by default.
var DefaultBindAddress = ":8080"

// NewListener creates a new TCP listener bound to the given address.
func NewListener(addr string) (net.Listener, error) {
Expand All @@ -39,9 +37,12 @@ func NewListener(addr string) (net.Listener, error) {
return nil, nil
}

log.Info("metrics server is starting to listen", "addr", addr)
ln, err := net.Listen("tcp", addr)
if err != nil {
return nil, fmt.Errorf("error listening on %s: %v", addr, err)
er := fmt.Errorf("error listening on %s: %v", addr, err)
log.Error(er, "metrics server failed to listen. You may want to disable the metrics server or use another port if it is due to conflicts")
return nil, er
}
return ln, nil
}
29 changes: 22 additions & 7 deletions pkg/webhook/conversion/conversion.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,10 @@ func (wh *Webhook) convertViaHub(src, dst conversion.Convertible) error {

// getHub returns an instance of the Hub for passed-in object's group/kind.
func (wh *Webhook) getHub(obj runtime.Object) (conversion.Hub, error) {
gvks := objectGVKs(wh.scheme, obj)
gvks, err := objectGVKs(wh.scheme, obj)
if err != nil {
return nil, err
}
if len(gvks) == 0 {
return nil, fmt.Errorf("error retrieving gvks for object : %v", obj)
}
Expand Down Expand Up @@ -223,7 +226,10 @@ func (wh *Webhook) allocateDstObject(apiVersion, kind string) (runtime.Object, e
func IsConvertible(scheme *runtime.Scheme, obj runtime.Object) (bool, error) {
var hubs, spokes, nonSpokes []runtime.Object

gvks := objectGVKs(scheme, obj)
gvks, err := objectGVKs(scheme, obj)
if err != nil {
return false, err
}
if len(gvks) == 0 {
return false, fmt.Errorf("error retrieving gvks for object : %v", obj)
}
Expand Down Expand Up @@ -273,18 +279,27 @@ func IsConvertible(scheme *runtime.Scheme, obj runtime.Object) (bool, error) {
}

// objectGVKs returns all (Group,Version,Kind) for the Group/Kind of given object.
func objectGVKs(scheme *runtime.Scheme, obj runtime.Object) []schema.GroupVersionKind {
var gvks []schema.GroupVersionKind

objGVK := obj.GetObjectKind().GroupVersionKind()
func objectGVKs(scheme *runtime.Scheme, obj runtime.Object) ([]schema.GroupVersionKind, error) {
// NB: we should not use `obj.GetObjectKind().GroupVersionKind()` to get the
// GVK here, since it is parsed from apiVersion and kind fields and it may
// return empty GVK if obj is an uninitialized object.
objGVKs, _, err := scheme.ObjectKinds(obj)
if err != nil {
return nil, err
}
if len(objGVKs) != 1 {
return nil, fmt.Errorf("expect to get only one GVK for %v", obj)
}
objGVK := objGVKs[0]
knownTypes := scheme.AllKnownTypes()

var gvks []schema.GroupVersionKind
for gvk := range knownTypes {
if objGVK.GroupKind() == gvk.GroupKind() {
gvks = append(gvks, gvk)
}
}
return gvks
return gvks, nil
}

// PartialImplementationError represents an error due to partial conversion
Expand Down
22 changes: 22 additions & 0 deletions pkg/webhook/conversion/conversion_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
appsv1beta1 "k8s.io/api/apps/v1beta1"
apix "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
kscheme "k8s.io/client-go/kubernetes/scheme"

Expand Down Expand Up @@ -312,6 +313,27 @@ var _ = Describe("IsConvertible", func() {
Expect(jobsv3.AddToScheme(scheme)).To(Succeed())
})

It("should not error for uninitialized types", func() {
obj := &jobsv2.ExternalJob{}

ok, err := IsConvertible(scheme, obj)
Expect(err).NotTo(HaveOccurred())
Expect(ok).To(BeTrue())
})

It("should not error for unstructured types", func() {
obj := &unstructured.Unstructured{
Object: map[string]interface{}{
"kind": "ExternalJob",
"apiVersion": "jobs.testprojects.kb.io/v2",
},
}

ok, err := IsConvertible(scheme, obj)
Expect(err).NotTo(HaveOccurred())
Expect(ok).To(BeTrue())
})

It("should return true for convertible types", func() {
obj := &jobsv2.ExternalJob{
TypeMeta: metav1.TypeMeta{
Expand Down