Skip to content

Commit

Permalink
Merge pull request #2985 from weaveworks/report-namespaces
Browse files Browse the repository at this point in the history
Probe reports namespaces
  • Loading branch information
rbruggem authored Jan 3, 2018
2 parents 90cbd8d + 1fb23dd commit e2e0496
Show file tree
Hide file tree
Showing 10 changed files with 149 additions and 28 deletions.
25 changes: 6 additions & 19 deletions app/api_topologies.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,26 +95,13 @@ func updateSwarmFilters(rpt report.Report, topologies []APITopologyDesc) []APITo
}

func updateKubeFilters(rpt report.Report, topologies []APITopologyDesc) []APITopologyDesc {
namespaces := map[string]struct{}{}
// We exclude ReplicaSets since we don't show them anywhere.
for _, t := range []report.Topology{rpt.Pod, rpt.Service, rpt.Deployment, rpt.DaemonSet, rpt.StatefulSet, rpt.CronJob} {
for _, n := range t.Nodes {
if state, ok := n.Latest.Lookup(kubernetes.State); ok && state == kubernetes.StateDeleted {
continue
}
if namespace, ok := n.Latest.Lookup(kubernetes.Namespace); ok {
namespaces[namespace] = struct{}{}
}
}
}
if len(namespaces) == 0 {
// We only want to apply k8s filters when we have k8s-related nodes,
// so if we don't then return early
return topologies
}
ns := []string{}
for namespace := range namespaces {
ns = append(ns, namespace)
for _, n := range rpt.Namespace.Nodes {
name, ok := n.Latest.Lookup(kubernetes.Name)
if !ok {
continue
}
ns = append(ns, name)
}
sort.Strings(ns)
topologies = append([]APITopologyDesc{}, topologies...) // Make a copy so we can make changes safely
Expand Down
12 changes: 7 additions & 5 deletions probe/kubernetes/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ type Client interface {
WalkStatefulSets(f func(StatefulSet) error) error
WalkCronJobs(f func(CronJob) error) error
WalkReplicationControllers(f func(ReplicationController) error) error
WalkNodes(f func(*apiv1.Node) error) error
WalkNamespaces(f func(NamespaceResource) error) error

WatchPods(f func(Event, Pod))

Expand All @@ -59,6 +59,7 @@ type client struct {
cronJobStore cache.Store
replicationControllerStore cache.Store
nodeStore cache.Store
namespaceStore cache.Store

podWatchesMutex sync.Mutex
podWatches []func(Event, Pod)
Expand Down Expand Up @@ -158,6 +159,7 @@ func NewClient(config ClientConfig) (Client, error) {
result.serviceStore = result.setupStore(c.CoreV1Client.RESTClient(), "services", &apiv1.Service{}, nil)
result.replicationControllerStore = result.setupStore(c.CoreV1Client.RESTClient(), "replicationcontrollers", &apiv1.ReplicationController{}, nil)
result.nodeStore = result.setupStore(c.CoreV1Client.RESTClient(), "nodes", &apiv1.Node{}, nil)
result.namespaceStore = result.setupStore(c.CoreV1Client.RESTClient(), "namespaces", &apiv1.Namespace{}, nil)

// We list deployments here to check if this version of kubernetes is >= 1.2.
// We would use NegotiateVersion, but Kubernetes 1.1 "supports"
Expand Down Expand Up @@ -316,10 +318,10 @@ func (c *client) WalkCronJobs(f func(CronJob) error) error {
return nil
}

func (c *client) WalkNodes(f func(*apiv1.Node) error) error {
for _, m := range c.nodeStore.List() {
node := m.(*apiv1.Node)
if err := f(node); err != nil {
func (c *client) WalkNamespaces(f func(NamespaceResource) error) error {
for _, m := range c.namespaceStore.List() {
namespace := m.(*apiv1.Namespace)
if err := f(NewNamespace(namespace)); err != nil {
return err
}
}
Expand Down
33 changes: 33 additions & 0 deletions probe/kubernetes/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,3 +58,36 @@ func (m meta) MetaNode(id string) report.Node {
Created: m.Created(),
}).AddPrefixPropertyList(LabelPrefix, m.Labels())
}

type namespaceMeta struct {
ObjectMeta metav1.ObjectMeta
}

func (m namespaceMeta) UID() string {
return string(m.ObjectMeta.UID)
}

func (m namespaceMeta) Name() string {
return m.ObjectMeta.Name
}

func (m namespaceMeta) Namespace() string {
return m.ObjectMeta.Namespace
}

func (m namespaceMeta) Created() string {
return m.ObjectMeta.CreationTimestamp.Format(time.RFC3339Nano)
}

func (m namespaceMeta) Labels() map[string]string {
return m.ObjectMeta.Labels
}

// MetaNode gets the node metadata
// For namespaces, ObjectMeta.Namespace is not set
func (m namespaceMeta) MetaNode(id string) report.Node {
return report.MakeNodeWith(id, map[string]string{
Name: m.Name(),
Created: m.Created(),
}).AddPrefixPropertyList(LabelPrefix, m.Labels())
}
28 changes: 28 additions & 0 deletions probe/kubernetes/namespace.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package kubernetes

import (
"github.com/weaveworks/scope/report"

apiv1 "k8s.io/client-go/pkg/api/v1"
)

// NamespaceResource represents a Kubernetes namespace
// `Namespace` is already taken in meta.go
type NamespaceResource interface {
Meta
GetNode() report.Node
}

type namespace struct {
ns *apiv1.Namespace
Meta
}

// NewNamespace creates a new Namespace
func NewNamespace(ns *apiv1.Namespace) NamespaceResource {
return &namespace{ns: ns, Meta: namespaceMeta{ns.ObjectMeta}}
}

func (ns *namespace) GetNode() report.Node {
return ns.MetaNode(report.MakeNamespaceNodeID(ns.UID()))
}
14 changes: 14 additions & 0 deletions probe/kubernetes/reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,13 +264,18 @@ func (r *Reporter) Report() (report.Report, error) {
if err != nil {
return result, err
}
namespaceTopology, err := r.namespaceTopology()
if err != nil {
return result, err
}
result.Pod = result.Pod.Merge(podTopology)
result.Service = result.Service.Merge(serviceTopology)
result.Host = result.Host.Merge(hostTopology)
result.DaemonSet = result.DaemonSet.Merge(daemonSetTopology)
result.StatefulSet = result.StatefulSet.Merge(statefulSetTopology)
result.CronJob = result.CronJob.Merge(cronJobTopology)
result.Deployment = result.Deployment.Merge(deploymentTopology)
result.Namespace = result.Namespace.Merge(namespaceTopology)
return result, nil
}

Expand Down Expand Up @@ -500,3 +505,12 @@ func (r *Reporter) podTopology(services []Service, deployments []Deployment, dae
})
return pods, err
}

func (r *Reporter) namespaceTopology() (report.Topology, error) {
result := report.MakeTopology()
err := r.client.WalkNamespaces(func(ns NamespaceResource) error {
result = result.AddNode(ns.GetNode())
return nil
})
return result, err
}
2 changes: 1 addition & 1 deletion probe/kubernetes/reporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ func (c *mockClient) WalkReplicaSets(f func(kubernetes.ReplicaSet) error) error
func (c *mockClient) WalkReplicationControllers(f func(kubernetes.ReplicationController) error) error {
return nil
}
func (*mockClient) WalkNodes(f func(*apiv1.Node) error) error {
func (c *mockClient) WalkNamespaces(f func(kubernetes.NamespaceResource) error) error {
return nil
}
func (*mockClient) WatchPods(func(kubernetes.Event, kubernetes.Pod)) {}
Expand Down
6 changes: 6 additions & 0 deletions report/id.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,12 @@ var (
// ParseCronJobNodeID parses a cronjob node ID
ParseCronJobNodeID = parseSingleComponentID("cronjob")

// MakeNamespaceNodeID produces a namespace node ID from its composite parts.
MakeNamespaceNodeID = makeSingleComponentID("namespace")

// ParseNamespaceNodeID parses a namespace set node ID
ParseNamespaceNodeID = parseSingleComponentID("namespace")

// MakeECSTaskNodeID produces a ECSTask node ID from its composite parts.
MakeECSTaskNodeID = makeSingleComponentID("ecs_task")

Expand Down
1 change: 1 addition & 0 deletions report/map_keys.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ const (
KubernetesSuspended = "kubernetes_suspended"
KubernetesLastScheduled = "kubernetes_last_scheduled"
KubernetesActiveJobs = "kubernetes_active_jobs"
KubernetesStateDeleted = "deleted"
// probe/awsecs
ECSCluster = "ecs_cluster"
ECSCreatedAt = "ecs_created_at"
Expand Down
43 changes: 42 additions & 1 deletion report/report.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ const (
DaemonSet = "daemon_set"
StatefulSet = "stateful_set"
CronJob = "cron_job"
Namespace = "namespace"
ContainerImage = "container_image"
Host = "host"
Overlay = "overlay"
Expand Down Expand Up @@ -56,6 +57,7 @@ var topologyNames = []string{
DaemonSet,
StatefulSet,
CronJob,
Namespace,
Host,
Overlay,
ECSTask,
Expand Down Expand Up @@ -115,6 +117,11 @@ type Report struct {
// present.
CronJob Topology

// Namespace nodes represent all Kubernetes Namespaces running on hosts running probes.
// Metadata includes things like Namespace id, name, etc. Edges are not
// present.
Namespace Topology

// ContainerImages nodes represent all Docker containers images on
// hosts running probes. Metadata includes things like image id, name etc.
// Edges are not present.
Expand Down Expand Up @@ -217,6 +224,8 @@ func MakeReport() Report {
WithShape(Triangle).
WithLabel("cron job", "cron jobs"),

Namespace: MakeTopology(),

Overlay: MakeTopology().
WithShape(Circle).
WithLabel("peer", "peers"),
Expand Down Expand Up @@ -317,6 +326,8 @@ func (r *Report) topology(name string) *Topology {
return &r.StatefulSet
case CronJob:
return &r.CronJob
case Namespace:
return &r.Namespace
case Host:
return &r.Host
case Overlay:
Expand Down Expand Up @@ -360,7 +371,7 @@ func (r Report) Validate() error {
//
// This for now creates node's LatestControls from Controls.
func (r Report) Upgrade() Report {
return r.upgradeLatestControls().upgradePodNodes()
return r.upgradeLatestControls().upgradePodNodes().upgradeNamespaces()
}

func (r Report) upgradeLatestControls() Report {
Expand Down Expand Up @@ -428,6 +439,36 @@ func (r Report) upgradePodNodes() Report {
return r
}

func (r Report) upgradeNamespaces() Report {
if len(r.Namespace.Nodes) > 0 {
return r
}

namespaces := map[string]struct{}{}
for _, t := range []Topology{r.Pod, r.Service, r.Deployment, r.DaemonSet, r.StatefulSet, r.CronJob} {
for _, n := range t.Nodes {
if state, ok := n.Latest.Lookup(KubernetesState); ok && state == KubernetesStateDeleted {
continue
}
if namespace, ok := n.Latest.Lookup(KubernetesNamespace); ok {
namespaces[namespace] = struct{}{}
}
}
}

nodes := make(Nodes, len(namespaces))
for ns := range namespaces {
// Namespace ID:
// Probes did not use to report namespace ids, but since creating a report node requires an id,
// the namespace name, which is unique, is passed to `MakeNamespaceNodeID`
namespaceID := MakeNamespaceNodeID(ns)
nodes[namespaceID] = MakeNodeWith(namespaceID, map[string]string{KubernetesName: ns})
}
r.Namespace.Nodes = nodes

return r
}

// BackwardCompatible returns a new backward-compatible report.
//
// This for now creates node's Controls from LatestControls.
Expand Down
13 changes: 11 additions & 2 deletions report/report_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,14 @@ func TestReportUpgrade(t *testing.T) {
mtime.NowForce(time.Now())
defer mtime.NowReset()
parentsWithDeployment := report.MakeSets().Add(report.Deployment, report.MakeStringSet("id"))
rsNode := report.MakeNode("bar").WithParents(parentsWithDeployment)
podNode := report.MakeNode("foo").WithControls("alive").WithParents(report.MakeSets().Add(report.ReplicaSet, report.MakeStringSet("bar")))
rsNode := report.MakeNode("bar").
WithParents(parentsWithDeployment)
namespaceName := "ns"
namespaceID := report.MakeNamespaceNodeID(namespaceName)
podNode := report.MakeNode("foo").
WithLatests(map[string]string{report.KubernetesNamespace: namespaceName}).
WithControls("alive").
WithParents(report.MakeSets().Add(report.ReplicaSet, report.MakeStringSet("bar")))
controls := map[string]report.NodeControlData{
"alive": {
Dead: false,
Expand All @@ -110,9 +116,12 @@ func TestReportUpgrade(t *testing.T) {
rpt := report.MakeReport()
rpt.ReplicaSet.AddNode(rsNode)
rpt.Pod.AddNode(podNode)
namespaceNode := report.MakeNode(namespaceID).
WithLatests(map[string]string{report.KubernetesName: namespaceName})
expected := report.MakeReport()
expected.ReplicaSet.AddNode(rsNode)
expected.Pod.AddNode(expectedPodNode)
expected.Namespace.AddNode(namespaceNode)
got := rpt.Upgrade()
if !s_reflect.DeepEqual(expected, got) {
t.Error(test.Diff(expected, got))
Expand Down

0 comments on commit e2e0496

Please sign in to comment.