Skip to content

Commit

Permalink
use kubernetes informer framework
Browse files Browse the repository at this point in the history
  • Loading branch information
brancz committed Oct 17, 2016
1 parent f689aa5 commit 9f53ecc
Show file tree
Hide file tree
Showing 7 changed files with 93 additions and 156 deletions.
14 changes: 4 additions & 10 deletions deployment.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ limitations under the License.
package main

import (
"github.com/golang/glog"
"github.com/prometheus/client_golang/prometheus"
"k8s.io/client-go/1.5/pkg/apis/extensions/v1beta1"
)
Expand Down Expand Up @@ -70,7 +69,7 @@ var (
)

type deploymentStore interface {
List() (deployments []v1beta1.Deployment, err error)
List() []interface{}
}

// deploymentCollector collects metrics about all deployments in the cluster.
Expand All @@ -92,17 +91,12 @@ func (dc *deploymentCollector) Describe(ch chan<- *prometheus.Desc) {

// Collect implements the prometheus.Collector interface.
func (dc *deploymentCollector) Collect(ch chan<- prometheus.Metric) {
dpls, err := dc.store.List()
if err != nil {
glog.Errorf("listing deployments failed: %s", err)
return
}
for _, d := range dpls {
dc.collectDeployment(ch, d)
for _, d := range dc.store.List() {
dc.collectDeployment(ch, d.(*v1beta1.Deployment))
}
}

func (dc *deploymentCollector) collectDeployment(ch chan<- prometheus.Metric, d v1beta1.Deployment) {
func (dc *deploymentCollector) collectDeployment(ch chan<- prometheus.Metric, d *v1beta1.Deployment) {
addGauge := func(desc *prometheus.Desc, v float64, lv ...string) {
lv = append([]string{d.Namespace, d.Name}, lv...)
ch <- prometheus.MustNewConstMetric(desc, prometheus.GaugeValue, v, lv...)
Expand Down
15 changes: 8 additions & 7 deletions deployment_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,10 @@ var (
)

type mockDeploymentStore struct {
f func() ([]v1beta1.Deployment, error)
f func() []interface{}
}

func (ds mockDeploymentStore) List() (deployments []v1beta1.Deployment, err error) {
func (ds mockDeploymentStore) List() []interface{} {
return ds.f()
}

Expand All @@ -67,12 +67,12 @@ func TestDeploymentCollector(t *testing.T) {
# TYPE kube_deployment_status_observed_generation gauge
`
cases := []struct {
depls []v1beta1.Deployment
depls []interface{}
want string
}{
{
depls: []v1beta1.Deployment{
{
depls: []interface{}{
&v1beta1.Deployment{
ObjectMeta: v1.ObjectMeta{
Name: "depl1",
Namespace: "ns1",
Expand All @@ -88,7 +88,8 @@ func TestDeploymentCollector(t *testing.T) {
Spec: v1beta1.DeploymentSpec{
Replicas: &depl1Replicas,
},
}, {
},
&v1beta1.Deployment{
ObjectMeta: v1.ObjectMeta{
Name: "depl2",
Namespace: "ns2",
Expand Down Expand Up @@ -130,7 +131,7 @@ func TestDeploymentCollector(t *testing.T) {
for _, c := range cases {
dc := &deploymentCollector{
store: mockDeploymentStore{
f: func() ([]v1beta1.Deployment, error) { return c.depls, nil },
f: func() []interface{} { return c.depls },
},
}
if err := gatherAndCompare(dc, c.want, nil); err != nil {
Expand Down
87 changes: 16 additions & 71 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,11 @@ import (
"github.com/golang/glog"
"github.com/openshift/origin/pkg/util/proc"
flag "github.com/spf13/pflag"
"golang.org/x/net/context"
clientset "k8s.io/client-go/1.5/kubernetes"
"k8s.io/client-go/1.5/pkg/api"
"k8s.io/client-go/1.5/pkg/api/v1"
"k8s.io/client-go/1.5/pkg/apis/extensions/v1beta1"
"k8s.io/client-go/1.5/pkg/runtime"
"k8s.io/client-go/1.5/pkg/watch"
restclient "k8s.io/client-go/1.5/rest"
"k8s.io/client-go/1.5/tools/cache"
"k8s.io/client-go/1.5/tools/clientcmd"
Expand Down Expand Up @@ -156,78 +155,24 @@ func metricsServer() {
log.Fatal(http.ListenAndServe(listenAddress, nil))
}

type DeploymentLister func() ([]v1beta1.Deployment, error)

func (l DeploymentLister) List() ([]v1beta1.Deployment, error) {
return l()
}

type PodLister func() ([]v1.Pod, error)

func (l PodLister) List() ([]v1.Pod, error) {
return l()
}

type NodeLister func() (v1.NodeList, error)

func (l NodeLister) List() (v1.NodeList, error) {
return l()
}

// initializeMetrics creates a new controller from the given config.
func initializeMetrics(kubeClient clientset.Interface) {
dplStore, dplController := cache.NewNamespaceKeyedIndexerAndReflector(
&cache.ListWatch{
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
return kubeClient.Extensions().Deployments(api.NamespaceAll).List(options)
},
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
return kubeClient.Extensions().Deployments(api.NamespaceAll).Watch(options)
},
}, &v1beta1.Deployment{}, resyncPeriod)

podStore, podController := cache.NewNamespaceKeyedIndexerAndReflector(
cache.NewListWatchFromClient(
kubeClient.Core().GetRESTClient(),
"pods",
api.NamespaceAll,
nil,
), &v1.Pod{}, resyncPeriod)

nodeStore, nodeController := cache.NewNamespaceKeyedIndexerAndReflector(
cache.NewListWatchFromClient(
kubeClient.Core().GetRESTClient(),
"nodes",
api.NamespaceAll,
nil,
), &v1.Node{}, resyncPeriod)

go dplController.Run()
go podController.Run()
go nodeController.Run()

dplLister := DeploymentLister(func() (deployments []v1beta1.Deployment, err error) {
for _, c := range dplStore.List() {
deployments = append(deployments, *(c.(*v1beta1.Deployment)))
}
return deployments, nil
})
cclient := kubeClient.Core().GetRESTClient()
eclient := kubeClient.Extensions().GetRESTClient()

podLister := PodLister(func() (pods []v1.Pod, err error) {
for _, m := range podStore.List() {
pods = append(pods, *m.(*v1.Pod))
}
return pods, nil
})
dlw := cache.NewListWatchFromClient(eclient, "deployments", api.NamespaceAll, nil)
plw := cache.NewListWatchFromClient(cclient, "pods", api.NamespaceAll, nil)
nlw := cache.NewListWatchFromClient(cclient, "nodes", api.NamespaceAll, nil)

nodeLister := NodeLister(func() (machines v1.NodeList, err error) {
for _, m := range nodeStore.List() {
machines.Items = append(machines.Items, *(m.(*v1.Node)))
}
return machines, nil
})
dinf := cache.NewSharedInformer(dlw, &v1beta1.Deployment{}, resyncPeriod)
pinf := cache.NewSharedInformer(plw, &v1.Pod{}, resyncPeriod)
ninf := cache.NewSharedInformer(nlw, &v1.Node{}, resyncPeriod)

prometheus.MustRegister(&deploymentCollector{store: dinf.GetStore()})
prometheus.MustRegister(&podCollector{store: pinf.GetStore()})
prometheus.MustRegister(&nodeCollector{store: ninf.GetStore()})

prometheus.MustRegister(&deploymentCollector{store: dplLister})
prometheus.MustRegister(&podCollector{store: podLister})
prometheus.MustRegister(&nodeCollector{store: nodeLister})
go dinf.Run(context.TODO().Done())
go pinf.Run(context.TODO().Done())
go ninf.Run(context.TODO().Done())
}
14 changes: 4 additions & 10 deletions node.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ limitations under the License.
package main

import (
"github.com/golang/glog"
"github.com/prometheus/client_golang/prometheus"
"k8s.io/client-go/1.5/pkg/api/v1"
)
Expand Down Expand Up @@ -92,7 +91,7 @@ var (
)

type nodeStore interface {
List() (v1.NodeList, error)
List() []interface{}
}

// nodeCollector collects metrics about all nodes in the cluster.
Expand All @@ -117,17 +116,12 @@ func (nc *nodeCollector) Describe(ch chan<- *prometheus.Desc) {

// Collect implements the prometheus.Collector interface.
func (nc *nodeCollector) Collect(ch chan<- prometheus.Metric) {
nodes, err := nc.store.List()
if err != nil {
glog.Errorf("listing nodes failed: %s", err)
return
}
for _, n := range nodes.Items {
nc.collectNode(ch, n)
for _, n := range nc.store.List() {
nc.collectNode(ch, n.(*v1.Node))
}
}

func (nc *nodeCollector) collectNode(ch chan<- prometheus.Metric, n v1.Node) {
func (nc *nodeCollector) collectNode(ch chan<- prometheus.Metric, n *v1.Node) {
addGauge := func(desc *prometheus.Desc, v float64, lv ...string) {
lv = append([]string{n.Name}, lv...)
ch <- prometheus.MustNewConstMetric(desc, prometheus.GaugeValue, v, lv...)
Expand Down
34 changes: 17 additions & 17 deletions node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,10 @@ import (
)

type mockNodeStore struct {
list func() (v1.NodeList, error)
list func() []interface{}
}

func (ns mockNodeStore) List() (v1.NodeList, error) {
func (ns mockNodeStore) List() []interface{} {
return ns.list()
}

Expand Down Expand Up @@ -57,14 +57,14 @@ func TestNodeCollector(t *testing.T) {
# HELP kube_node_status_allocateable_memory_bytes The memory resources of a node that are available for scheduling.
`
cases := []struct {
nodes []v1.Node
nodes []interface{}
metrics []string // which metrics should be checked
want string
}{
// Verify populating base metrics and that metrics for unset fields are skipped.
{
nodes: []v1.Node{
{
nodes: []interface{}{
&v1.Node{
ObjectMeta: v1.ObjectMeta{
Name: "127.0.0.1",
},
Expand All @@ -86,8 +86,8 @@ func TestNodeCollector(t *testing.T) {
},
// Verify resource metrics.
{
nodes: []v1.Node{
{
nodes: []interface{}{
&v1.Node{
ObjectMeta: v1.ObjectMeta{
Name: "127.0.0.1",
},
Expand Down Expand Up @@ -128,8 +128,8 @@ func TestNodeCollector(t *testing.T) {
},
// Verify condition enumerations.
{
nodes: []v1.Node{
{
nodes: []interface{}{
&v1.Node{
ObjectMeta: v1.ObjectMeta{
Name: "127.0.0.1",
},
Expand All @@ -139,7 +139,7 @@ func TestNodeCollector(t *testing.T) {
},
},
},
{
&v1.Node{
ObjectMeta: v1.ObjectMeta{
Name: "127.0.0.2",
},
Expand All @@ -149,7 +149,7 @@ func TestNodeCollector(t *testing.T) {
},
},
},
{
&v1.Node{
ObjectMeta: v1.ObjectMeta{
Name: "127.0.0.3",
},
Expand All @@ -175,24 +175,24 @@ func TestNodeCollector(t *testing.T) {
},
// Verify phase enumerations.
{
nodes: []v1.Node{
{
nodes: []interface{}{
&v1.Node{
ObjectMeta: v1.ObjectMeta{
Name: "127.0.0.1",
},
Status: v1.NodeStatus{
Phase: v1.NodeRunning,
},
},
{
&v1.Node{
ObjectMeta: v1.ObjectMeta{
Name: "127.0.0.2",
},
Status: v1.NodeStatus{
Phase: v1.NodePending,
},
},
{
&v1.Node{
ObjectMeta: v1.ObjectMeta{
Name: "127.0.0.3",
},
Expand All @@ -218,8 +218,8 @@ func TestNodeCollector(t *testing.T) {
for _, c := range cases {
dc := &nodeCollector{
store: &mockNodeStore{
list: func() (v1.NodeList, error) {
return v1.NodeList{Items: c.nodes}, nil
list: func() []interface{} {
return c.nodes
},
},
}
Expand Down
14 changes: 4 additions & 10 deletions pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ limitations under the License.
package main

import (
"github.com/golang/glog"
"github.com/prometheus/client_golang/prometheus"
"k8s.io/client-go/1.5/pkg/api/v1"
)
Expand Down Expand Up @@ -88,7 +87,7 @@ var (
)

type podStore interface {
List() (pods []v1.Pod, err error)
List() []interface{}
}

// podCollector collects metrics about all pods in the cluster.
Expand All @@ -114,17 +113,12 @@ func (pc *podCollector) Describe(ch chan<- *prometheus.Desc) {

// Collect implements the prometheus.Collector interface.
func (pc *podCollector) Collect(ch chan<- prometheus.Metric) {
pods, err := pc.store.List()
if err != nil {
glog.Errorf("listing pods failed: %s", err)
return
}
for _, p := range pods {
pc.collectPod(ch, p)
for _, p := range pc.store.List() {
pc.collectPod(ch, p.(*v1.Pod))
}
}

func (pc *podCollector) collectPod(ch chan<- prometheus.Metric, p v1.Pod) {
func (pc *podCollector) collectPod(ch chan<- prometheus.Metric, p *v1.Pod) {
addConstMetric := func(desc *prometheus.Desc, t prometheus.ValueType, v float64, lv ...string) {
lv = append([]string{p.Namespace, p.Name}, lv...)
ch <- prometheus.MustNewConstMetric(desc, t, v, lv...)
Expand Down
Loading

0 comments on commit 9f53ecc

Please sign in to comment.