Skip to content

Commit

Permalink
Merge pull request #17 from brancz/k8s-client-go
Browse files Browse the repository at this point in the history
use k8s.io/client-go
  • Loading branch information
fabxc authored Sep 14, 2016
2 parents d138645 + 6af709b commit bfceb00
Show file tree
Hide file tree
Showing 1,189 changed files with 365,390 additions and 352,626 deletions.
804 changes: 231 additions & 573 deletions Godeps/Godeps.json

Large diffs are not rendered by default.

8 changes: 4 additions & 4 deletions deployment.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package main
import (
"github.com/golang/glog"
"github.com/prometheus/client_golang/prometheus"
"k8s.io/kubernetes/pkg/apis/extensions"
"k8s.io/client-go/1.4/pkg/apis/extensions/v1beta1"
)

var (
Expand Down Expand Up @@ -64,7 +64,7 @@ var (
)

type deploymentStore interface {
List() (deployments []extensions.Deployment, err error)
List() (deployments []v1beta1.Deployment, err error)
}

// deploymentCollector collects metrics about all deployments in the cluster.
Expand Down Expand Up @@ -95,7 +95,7 @@ func (dc *deploymentCollector) Collect(ch chan<- prometheus.Metric) {
}
}

func (dc *deploymentCollector) collectDeployment(ch chan<- prometheus.Metric, d extensions.Deployment) {
func (dc *deploymentCollector) collectDeployment(ch chan<- prometheus.Metric, d v1beta1.Deployment) {
addGauge := func(desc *prometheus.Desc, v float64, lv ...string) {
lv = append([]string{d.Namespace, d.Name}, lv...)
ch <- prometheus.MustNewConstMetric(desc, prometheus.GaugeValue, v, lv...)
Expand All @@ -106,5 +106,5 @@ func (dc *deploymentCollector) collectDeployment(ch chan<- prometheus.Metric, d
addGauge(descDeploymentStatusReplicasUpdated, float64(d.Status.UpdatedReplicas))
addGauge(descDeploymentStatusObservedGeneration, float64(d.Status.ObservedGeneration))
addGauge(descDeploymentSpecPaused, boolFloat64(d.Spec.Paused))
addGauge(descDeploymentSpecReplicas, float64(d.Spec.Replicas))
addGauge(descDeploymentSpecReplicas, float64(*d.Spec.Replicas))
}
35 changes: 20 additions & 15 deletions deployment_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,20 @@ import (
dto "github.com/prometheus/client_model/go"
"github.com/prometheus/common/expfmt"

"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/apis/extensions"
"k8s.io/client-go/1.4/pkg/api/v1"
"k8s.io/client-go/1.4/pkg/apis/extensions/v1beta1"
)

var (
depl1Replicas int32 = 200
depl2Replicas int32 = 5
)

type mockDeploymentStore struct {
f func() ([]extensions.Deployment, error)
f func() ([]v1beta1.Deployment, error)
}

func (ds mockDeploymentStore) List() (deployments []extensions.Deployment, err error) {
func (ds mockDeploymentStore) List() (deployments []v1beta1.Deployment, err error) {
return ds.f()
}

Expand All @@ -60,41 +65,41 @@ func TestDeploymentCollector(t *testing.T) {
# TYPE kube_deployment_status_observed_generation gauge
`
cases := []struct {
depls []extensions.Deployment
depls []v1beta1.Deployment
want string
}{
{
depls: []extensions.Deployment{
depls: []v1beta1.Deployment{
{
ObjectMeta: api.ObjectMeta{
ObjectMeta: v1.ObjectMeta{
Name: "depl1",
Namespace: "ns1",
},
Status: extensions.DeploymentStatus{
Status: v1beta1.DeploymentStatus{
Replicas: 15,
AvailableReplicas: 10,
UnavailableReplicas: 5,
UpdatedReplicas: 2,
ObservedGeneration: 111,
},
Spec: extensions.DeploymentSpec{
Replicas: 200,
Spec: v1beta1.DeploymentSpec{
Replicas: &depl1Replicas,
},
}, {
ObjectMeta: api.ObjectMeta{
ObjectMeta: v1.ObjectMeta{
Name: "depl2",
Namespace: "ns2",
},
Status: extensions.DeploymentStatus{
Status: v1beta1.DeploymentStatus{
Replicas: 10,
AvailableReplicas: 5,
UnavailableReplicas: 0,
UpdatedReplicas: 1,
ObservedGeneration: 1111,
},
Spec: extensions.DeploymentSpec{
Spec: v1beta1.DeploymentSpec{
Paused: true,
Replicas: 5,
Replicas: &depl2Replicas,
},
},
},
Expand All @@ -119,7 +124,7 @@ func TestDeploymentCollector(t *testing.T) {
for _, c := range cases {
dc := &deploymentCollector{
store: mockDeploymentStore{
f: func() ([]extensions.Deployment, error) { return c.depls, nil },
f: func() ([]v1beta1.Deployment, error) { return c.depls, nil },
},
}
if err := gatherAndCompare(dc, c.want, nil); err != nil {
Expand Down
164 changes: 82 additions & 82 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,15 @@ import (
"github.com/golang/glog"
"github.com/openshift/origin/pkg/util/proc"
flag "github.com/spf13/pflag"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/apis/extensions"
"k8s.io/kubernetes/pkg/client/cache"
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
"k8s.io/kubernetes/pkg/client/restclient"
"k8s.io/kubernetes/pkg/client/unversioned/clientcmd"
"k8s.io/kubernetes/pkg/controller/framework"
kubectl_util "k8s.io/kubernetes/pkg/kubectl/cmd/util"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/util/wait"
"k8s.io/kubernetes/pkg/watch"
clientset "k8s.io/client-go/1.4/kubernetes"
"k8s.io/client-go/1.4/pkg/api"
"k8s.io/client-go/1.4/pkg/api/v1"
"k8s.io/client-go/1.4/pkg/apis/extensions/v1beta1"
"k8s.io/client-go/1.4/pkg/runtime"
"k8s.io/client-go/1.4/pkg/watch"
restclient "k8s.io/client-go/1.4/rest"
"k8s.io/client-go/1.4/tools/cache"
"k8s.io/client-go/1.4/tools/clientcmd"

"github.com/prometheus/client_golang/prometheus"
)
Expand All @@ -55,12 +53,12 @@ var (

apiserver = flags.String("apiserver", "", `The URL of the apiserver to use as a master`)

kubeconfig = flags.String("kubeconfig", "./config", "absolute path to the kubeconfig file")

port = flags.Int("port", 80, `Port to expose metrics on.`)
)

func main() {
// Create kubernetes client.
clientConfig := kubectl_util.DefaultClientConfig(flags)
flags.Parse(os.Args)

if *apiserver == "" && !(*inCluster) {
Expand All @@ -70,16 +68,16 @@ func main() {

proc.StartReaper()

kubeClient, err := createKubeClient(clientConfig)
kubeClient, err := createKubeClient()
if err != nil {
glog.Fatalf("Failed to create client: %v", err)
}

runMetricsController(kubeClient)
initializeMetrics(kubeClient)
metricsServer()
}

func createKubeClient(clientConfig clientcmd.ClientConfig) (kubeClient clientset.Interface, err error) {
func createKubeClient() (kubeClient clientset.Interface, err error) {
glog.Infof("Creating client")
if *inCluster {
config, err := restclient.InClusterConfig()
Expand All @@ -101,7 +99,14 @@ func createKubeClient(clientConfig clientcmd.ClientConfig) (kubeClient clientset
return nil, err
}
} else {
config, err := clientConfig.ClientConfig()
loadingRules := clientcmd.NewDefaultClientConfigLoadingRules()
// if you want to change the loading rules (which files in which order), you can do so here
configOverrides := &clientcmd.ConfigOverrides{}
// if you want to change override values or bind them to flags, there are methods to help you
kubeConfig := clientcmd.NewNonInteractiveDeferredLoadingClientConfig(loadingRules, configOverrides)
config, err := kubeConfig.ClientConfig()
//config, err := clientcmd.BuildConfigFromFlags("", *kubeconfig)
//config, err := clientcmd.DefaultClientConfig.ClientConfig()
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -151,83 +156,78 @@ func metricsServer() {
log.Fatal(http.ListenAndServe(listenAddress, nil))
}

// metricsController watches the kubernetes api and adds/removes services
// from the loadbalancer, via loadBalancerConfig.
type metricsController struct {
client clientset.Interface
dplController *framework.Controller
dplStore cache.StoreToDeploymentLister
podController *framework.Controller
podStore cache.StoreToPodLister
nodeController *framework.Controller
nodeStore cache.StoreToNodeLister
type DeploymentLister func() ([]v1beta1.Deployment, error)

func (l DeploymentLister) List() ([]v1beta1.Deployment, error) {
return l()
}

// runMetricsController creates a new controller from the given config.
func runMetricsController(kubeClient clientset.Interface) *metricsController {
mc := &metricsController{
client: kubeClient,
}
type PodLister func() ([]v1.Pod, error)

mc.dplStore.Store, mc.dplController = framework.NewInformer(
&cache.ListWatch{
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
return mc.client.Extensions().Deployments(api.NamespaceAll).List(options)
},
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
return mc.client.Extensions().Deployments(api.NamespaceAll).Watch(options)
},
}, &extensions.Deployment{}, resyncPeriod, framework.ResourceEventHandlerFuncs{})
func (l PodLister) List() ([]v1.Pod, error) {
return l()
}

mc.podStore.Store, mc.podController = framework.NewInformer(
&cache.ListWatch{
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
return mc.client.Core().Pods(api.NamespaceAll).List(options)
},
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
return mc.client.Core().Pods(api.NamespaceAll).Watch(options)
},
}, &api.Pod{}, resyncPeriod, framework.ResourceEventHandlerFuncs{})
type NodeLister func() (v1.NodeList, error)

func (l NodeLister) List() (v1.NodeList, error) {
return l()
}

mc.nodeStore.Store, mc.nodeController = framework.NewInformer(
// initializeMetrics creates a new controller from the given config.
func initializeMetrics(kubeClient clientset.Interface) {
dplStore, dplController := cache.NewNamespaceKeyedIndexerAndReflector(
&cache.ListWatch{
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
return mc.client.Core().Nodes().List(options)
return kubeClient.Extensions().Deployments(api.NamespaceAll).List(options)
},
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
return mc.client.Core().Nodes().Watch(options)
return kubeClient.Extensions().Deployments(api.NamespaceAll).Watch(options)
},
}, &api.Node{}, resyncPeriod, framework.ResourceEventHandlerFuncs{})

go mc.dplController.Run(wait.NeverStop)
go mc.podController.Run(wait.NeverStop)
go mc.nodeController.Run(wait.NeverStop)

go func() {
for !mc.dplController.HasSynced() {
time.Sleep(100 * time.Millisecond)
}, &v1beta1.Deployment{}, resyncPeriod)

podStore, podController := cache.NewNamespaceKeyedIndexerAndReflector(
cache.NewListWatchFromClient(
kubeClient.Core().GetRESTClient(),
"pods",
api.NamespaceAll,
nil,
), &v1.Pod{}, resyncPeriod)

nodeStore, nodeController := cache.NewNamespaceKeyedIndexerAndReflector(
cache.NewListWatchFromClient(
kubeClient.Core().GetRESTClient(),
"nodes",
api.NamespaceAll,
nil,
), &v1.Node{}, resyncPeriod)

go dplController.Run()
go podController.Run()
go nodeController.Run()

dplLister := DeploymentLister(func() (deployments []v1beta1.Deployment, err error) {
for _, c := range dplStore.List() {
deployments = append(deployments, *(c.(*v1beta1.Deployment)))
}
prometheus.MustRegister(&deploymentCollector{
store: &mc.dplStore,
})
}()

go func() {
for !mc.podController.HasSynced() {
time.Sleep(100 * time.Millisecond)
return deployments, nil
})

podLister := PodLister(func() (pods []v1.Pod, err error) {
for _, m := range podStore.List() {
pods = append(pods, *m.(*v1.Pod))
}
prometheus.MustRegister(&podCollector{
store: &mc.podStore,
})
}()
go func() {
for !mc.nodeController.HasSynced() {
time.Sleep(100 * time.Millisecond)
return pods, nil
})

nodeLister := NodeLister(func() (machines v1.NodeList, err error) {
for _, m := range nodeStore.List() {
machines.Items = append(machines.Items, *(m.(*v1.Node)))
}
prometheus.MustRegister(&nodeCollector{
store: &mc.nodeStore,
})
}()
return machines, nil
})

return mc
prometheus.MustRegister(&deploymentCollector{store: dplLister})
prometheus.MustRegister(&podCollector{store: podLister})
prometheus.MustRegister(&nodeCollector{store: nodeLister})
}
Loading

0 comments on commit bfceb00

Please sign in to comment.