From ccfcc610428591ab51c53c8c62f998abba704094 Mon Sep 17 00:00:00 2001 From: Roberto Bruggemann Date: Thu, 14 Dec 2017 10:45:09 +0000 Subject: [PATCH 1/4] The probe reports namespaces --- probe/kubernetes/client.go | 13 ++++++++++++ probe/kubernetes/meta.go | 33 +++++++++++++++++++++++++++++++ probe/kubernetes/namespace.go | 28 ++++++++++++++++++++++++++ probe/kubernetes/reporter.go | 14 +++++++++++++ probe/kubernetes/reporter_test.go | 3 +++ report/id.go | 6 ++++++ report/report.go | 11 +++++++++++ 7 files changed, 108 insertions(+) create mode 100644 probe/kubernetes/namespace.go diff --git a/probe/kubernetes/client.go b/probe/kubernetes/client.go index c629ae0e03..7ca3abee64 100644 --- a/probe/kubernetes/client.go +++ b/probe/kubernetes/client.go @@ -36,6 +36,7 @@ type Client interface { WalkCronJobs(f func(CronJob) error) error WalkReplicationControllers(f func(ReplicationController) error) error WalkNodes(f func(*apiv1.Node) error) error + WalkNamespaces(f func(NamespaceResource) error) error WatchPods(f func(Event, Pod)) @@ -59,6 +60,7 @@ type client struct { cronJobStore cache.Store replicationControllerStore cache.Store nodeStore cache.Store + namespaceStore cache.Store podWatchesMutex sync.Mutex podWatches []func(Event, Pod) @@ -158,6 +160,7 @@ func NewClient(config ClientConfig) (Client, error) { result.serviceStore = result.setupStore(c.CoreV1Client.RESTClient(), "services", &apiv1.Service{}, nil) result.replicationControllerStore = result.setupStore(c.CoreV1Client.RESTClient(), "replicationcontrollers", &apiv1.ReplicationController{}, nil) result.nodeStore = result.setupStore(c.CoreV1Client.RESTClient(), "nodes", &apiv1.Node{}, nil) + result.namespaceStore = result.setupStore(c.CoreV1Client.RESTClient(), "namespaces", &apiv1.Namespace{}, nil) // We list deployments here to check if this version of kubernetes is >= 1.2. // We would use NegotiateVersion, but Kubernetes 1.1 "supports" @@ -326,6 +329,16 @@ func (c *client) WalkNodes(f func(*apiv1.Node) error) error { return nil } +func (c *client) WalkNamespaces(f func(NamespaceResource) error) error { + for _, m := range c.namespaceStore.List() { + namespace := m.(*apiv1.Namespace) + if err := f(NewNamespace(namespace)); err != nil { + return err + } + } + return nil +} + func (c *client) GetLogs(namespaceID, podID string) (io.ReadCloser, error) { req := c.client.CoreV1().Pods(namespaceID).GetLogs( podID, diff --git a/probe/kubernetes/meta.go b/probe/kubernetes/meta.go index b364d9bdd8..2b6a15bdd9 100644 --- a/probe/kubernetes/meta.go +++ b/probe/kubernetes/meta.go @@ -58,3 +58,36 @@ func (m meta) MetaNode(id string) report.Node { Created: m.Created(), }).AddPrefixPropertyList(LabelPrefix, m.Labels()) } + +type namespaceMeta struct { + ObjectMeta metav1.ObjectMeta +} + +func (m namespaceMeta) UID() string { + return string(m.ObjectMeta.UID) +} + +func (m namespaceMeta) Name() string { + return m.ObjectMeta.Name +} + +func (m namespaceMeta) Namespace() string { + return m.ObjectMeta.Namespace +} + +func (m namespaceMeta) Created() string { + return m.ObjectMeta.CreationTimestamp.Format(time.RFC3339Nano) +} + +func (m namespaceMeta) Labels() map[string]string { + return m.ObjectMeta.Labels +} + +// MetaNode gets the node metadata +// For namespaces, ObjectMeta.Namespace is not set +func (m namespaceMeta) MetaNode(id string) report.Node { + return report.MakeNodeWith(id, map[string]string{ + Name: m.Name(), + Created: m.Created(), + }).AddPrefixPropertyList(LabelPrefix, m.Labels()) +} diff --git a/probe/kubernetes/namespace.go b/probe/kubernetes/namespace.go new file mode 100644 index 0000000000..e1fa12782a --- /dev/null +++ b/probe/kubernetes/namespace.go @@ -0,0 +1,28 @@ +package kubernetes + +import ( + "github.com/weaveworks/scope/report" + + apiv1 "k8s.io/client-go/pkg/api/v1" +) + +// NamespaceResource represents a Kubernetes namespace +// `Namespace` is already taken in meta.go +type NamespaceResource interface { + Meta + GetNode() report.Node +} + +type namespace struct { + ns *apiv1.Namespace + Meta +} + +// NewNamespace creates a new Namespace +func NewNamespace(ns *apiv1.Namespace) NamespaceResource { + return &namespace{ns: ns, Meta: namespaceMeta{ns.ObjectMeta}} +} + +func (ns *namespace) GetNode() report.Node { + return ns.MetaNode(report.MakeNamespaceNodeID(ns.UID())) +} diff --git a/probe/kubernetes/reporter.go b/probe/kubernetes/reporter.go index e4000cae81..7fd6e2ed09 100644 --- a/probe/kubernetes/reporter.go +++ b/probe/kubernetes/reporter.go @@ -264,6 +264,10 @@ func (r *Reporter) Report() (report.Report, error) { if err != nil { return result, err } + namespaceTopology, err := r.namespaceTopology() + if err != nil { + return result, err + } result.Pod = result.Pod.Merge(podTopology) result.Service = result.Service.Merge(serviceTopology) result.Host = result.Host.Merge(hostTopology) @@ -271,6 +275,7 @@ func (r *Reporter) Report() (report.Report, error) { result.StatefulSet = result.StatefulSet.Merge(statefulSetTopology) result.CronJob = result.CronJob.Merge(cronJobTopology) result.Deployment = result.Deployment.Merge(deploymentTopology) + result.Namespace = result.Namespace.Merge(namespaceTopology) return result, nil } @@ -500,3 +505,12 @@ func (r *Reporter) podTopology(services []Service, deployments []Deployment, dae }) return pods, err } + +func (r *Reporter) namespaceTopology() (report.Topology, error) { + result := report.MakeTopology() + err := r.client.WalkNamespaces(func(ns NamespaceResource) error { + result = result.AddNode(ns.GetNode()) + return nil + }) + return result, err +} diff --git a/probe/kubernetes/reporter_test.go b/probe/kubernetes/reporter_test.go index f1bd1d6e5c..4c404c3a23 100644 --- a/probe/kubernetes/reporter_test.go +++ b/probe/kubernetes/reporter_test.go @@ -154,6 +154,9 @@ func (c *mockClient) WalkReplicationControllers(f func(kubernetes.ReplicationCon func (*mockClient) WalkNodes(f func(*apiv1.Node) error) error { return nil } +func (c *mockClient) WalkNamespaces(f func(kubernetes.NamespaceResource) error) error { + return nil +} func (*mockClient) WatchPods(func(kubernetes.Event, kubernetes.Pod)) {} func (c *mockClient) GetLogs(namespaceID, podName string) (io.ReadCloser, error) { r, ok := c.logs[namespaceID+";"+podName] diff --git a/report/id.go b/report/id.go index b935bb77ce..3b23df2f55 100644 --- a/report/id.go +++ b/report/id.go @@ -140,6 +140,12 @@ var ( // ParseCronJobNodeID parses a cronjob node ID ParseCronJobNodeID = parseSingleComponentID("cronjob") + // MakeNamespaceNodeID produces a namespace node ID from its composite parts. + MakeNamespaceNodeID = makeSingleComponentID("namespace") + + // ParseNamespaceNodeID parses a namespace set node ID + ParseNamespaceNodeID = parseSingleComponentID("namespace") + // MakeECSTaskNodeID produces a ECSTask node ID from its composite parts. MakeECSTaskNodeID = makeSingleComponentID("ecs_task") diff --git a/report/report.go b/report/report.go index aeb8adef05..2ea3dd596e 100644 --- a/report/report.go +++ b/report/report.go @@ -22,6 +22,7 @@ const ( DaemonSet = "daemon_set" StatefulSet = "stateful_set" CronJob = "cron_job" + Namespace = "namespace" ContainerImage = "container_image" Host = "host" Overlay = "overlay" @@ -56,6 +57,7 @@ var topologyNames = []string{ DaemonSet, StatefulSet, CronJob, + Namespace, Host, Overlay, ECSTask, @@ -115,6 +117,11 @@ type Report struct { // present. CronJob Topology + // Namespace nodes represent all Kubernetes Namespaces running on hosts running probes. + // Metadata includes things like Namespace id, name, etc. Edges are not + // present. + Namespace Topology + // ContainerImages nodes represent all Docker containers images on // hosts running probes. Metadata includes things like image id, name etc. // Edges are not present. @@ -217,6 +224,8 @@ func MakeReport() Report { WithShape(Triangle). WithLabel("cron job", "cron jobs"), + Namespace: MakeTopology(), + Overlay: MakeTopology(). WithShape(Circle). WithLabel("peer", "peers"), @@ -317,6 +326,8 @@ func (r *Report) topology(name string) *Topology { return &r.StatefulSet case CronJob: return &r.CronJob + case Namespace: + return &r.Namespace case Host: return &r.Host case Overlay: From 2c60d50f100e485ea1c606117480a01ba068a3e4 Mon Sep 17 00:00:00 2001 From: Roberto Bruggemann Date: Thu, 14 Dec 2017 10:49:30 +0000 Subject: [PATCH 2/4] Remove unused `WalkNodes` function --- probe/kubernetes/client.go | 11 ----------- probe/kubernetes/reporter_test.go | 3 --- 2 files changed, 14 deletions(-) diff --git a/probe/kubernetes/client.go b/probe/kubernetes/client.go index 7ca3abee64..6f2d31f94d 100644 --- a/probe/kubernetes/client.go +++ b/probe/kubernetes/client.go @@ -35,7 +35,6 @@ type Client interface { WalkStatefulSets(f func(StatefulSet) error) error WalkCronJobs(f func(CronJob) error) error WalkReplicationControllers(f func(ReplicationController) error) error - WalkNodes(f func(*apiv1.Node) error) error WalkNamespaces(f func(NamespaceResource) error) error WatchPods(f func(Event, Pod)) @@ -319,16 +318,6 @@ func (c *client) WalkCronJobs(f func(CronJob) error) error { return nil } -func (c *client) WalkNodes(f func(*apiv1.Node) error) error { - for _, m := range c.nodeStore.List() { - node := m.(*apiv1.Node) - if err := f(node); err != nil { - return err - } - } - return nil -} - func (c *client) WalkNamespaces(f func(NamespaceResource) error) error { for _, m := range c.namespaceStore.List() { namespace := m.(*apiv1.Namespace) diff --git a/probe/kubernetes/reporter_test.go b/probe/kubernetes/reporter_test.go index 4c404c3a23..b6bb7778c1 100644 --- a/probe/kubernetes/reporter_test.go +++ b/probe/kubernetes/reporter_test.go @@ -151,9 +151,6 @@ func (c *mockClient) WalkReplicaSets(f func(kubernetes.ReplicaSet) error) error func (c *mockClient) WalkReplicationControllers(f func(kubernetes.ReplicationController) error) error { return nil } -func (*mockClient) WalkNodes(f func(*apiv1.Node) error) error { - return nil -} func (c *mockClient) WalkNamespaces(f func(kubernetes.NamespaceResource) error) error { return nil } From 4233b75528368fca5a486879d0343d1e730b6570 Mon Sep 17 00:00:00 2001 From: Roberto Bruggemann Date: Thu, 14 Dec 2017 11:40:17 +0000 Subject: [PATCH 3/4] report.Upgrade: reconstruct namespaces Old probes do not report namespace topologies. `report.upgradeNamespaces()` recontructs namespace topologies using the data available from other kubernetes resources. Also, add a test. --- report/map_keys.go | 1 + report/report.go | 32 +++++++++++++++++++++++++++++++- report/report_test.go | 13 +++++++++++-- 3 files changed, 43 insertions(+), 3 deletions(-) diff --git a/report/map_keys.go b/report/map_keys.go index 865a3143b8..aec4a26cfb 100644 --- a/report/map_keys.go +++ b/report/map_keys.go @@ -69,6 +69,7 @@ const ( KubernetesSuspended = "kubernetes_suspended" KubernetesLastScheduled = "kubernetes_last_scheduled" KubernetesActiveJobs = "kubernetes_active_jobs" + KubernetesStateDeleted = "deleted" // probe/awsecs ECSCluster = "ecs_cluster" ECSCreatedAt = "ecs_created_at" diff --git a/report/report.go b/report/report.go index 2ea3dd596e..2cac43de1d 100644 --- a/report/report.go +++ b/report/report.go @@ -371,7 +371,7 @@ func (r Report) Validate() error { // // This for now creates node's LatestControls from Controls. func (r Report) Upgrade() Report { - return r.upgradeLatestControls().upgradePodNodes() + return r.upgradeLatestControls().upgradePodNodes().upgradeNamespaces() } func (r Report) upgradeLatestControls() Report { @@ -439,6 +439,36 @@ func (r Report) upgradePodNodes() Report { return r } +func (r Report) upgradeNamespaces() Report { + if len(r.Namespace.Nodes) > 0 { + return r + } + + namespaces := map[string]struct{}{} + for _, t := range []Topology{r.Pod, r.Service, r.Deployment, r.DaemonSet, r.StatefulSet, r.CronJob} { + for _, n := range t.Nodes { + if state, ok := n.Latest.Lookup(KubernetesState); ok && state == KubernetesStateDeleted { + continue + } + if namespace, ok := n.Latest.Lookup(KubernetesNamespace); ok { + namespaces[namespace] = struct{}{} + } + } + } + + nodes := make(Nodes, len(namespaces)) + for ns := range namespaces { + // Namespace ID: + // Probes did not use to report namespace ids, but since creating a report node requires an id, + // the namespace name, which is unique, is passed to `MakeNamespaceNodeID` + namespaceID := MakeNamespaceNodeID(ns) + nodes[namespaceID] = MakeNodeWith(namespaceID, map[string]string{KubernetesName: ns}) + } + r.Namespace.Nodes = nodes + + return r +} + // BackwardCompatible returns a new backward-compatible report. // // This for now creates node's Controls from LatestControls. diff --git a/report/report_test.go b/report/report_test.go index f65576e79d..a87ca74d8e 100644 --- a/report/report_test.go +++ b/report/report_test.go @@ -99,8 +99,14 @@ func TestReportUpgrade(t *testing.T) { mtime.NowForce(time.Now()) defer mtime.NowReset() parentsWithDeployment := report.MakeSets().Add(report.Deployment, report.MakeStringSet("id")) - rsNode := report.MakeNode("bar").WithParents(parentsWithDeployment) - podNode := report.MakeNode("foo").WithControls("alive").WithParents(report.MakeSets().Add(report.ReplicaSet, report.MakeStringSet("bar"))) + rsNode := report.MakeNode("bar"). + WithParents(parentsWithDeployment) + namespaceName := "ns" + namespaceID := report.MakeNamespaceNodeID(namespaceName) + podNode := report.MakeNode("foo"). + WithLatests(map[string]string{report.KubernetesNamespace: namespaceName}). + WithControls("alive"). + WithParents(report.MakeSets().Add(report.ReplicaSet, report.MakeStringSet("bar"))) controls := map[string]report.NodeControlData{ "alive": { Dead: false, @@ -110,9 +116,12 @@ func TestReportUpgrade(t *testing.T) { rpt := report.MakeReport() rpt.ReplicaSet.AddNode(rsNode) rpt.Pod.AddNode(podNode) + namespaceNode := report.MakeNode(namespaceID). + WithLatests(map[string]string{report.KubernetesName: namespaceName}) expected := report.MakeReport() expected.ReplicaSet.AddNode(rsNode) expected.Pod.AddNode(expectedPodNode) + expected.Namespace.AddNode(namespaceNode) got := rpt.Upgrade() if !s_reflect.DeepEqual(expected, got) { t.Error(test.Diff(expected, got)) From 1fb23dd8f9a8e5402f1e16e0a29e9f1229c5f313 Mon Sep 17 00:00:00 2001 From: Roberto Bruggemann Date: Thu, 14 Dec 2017 11:43:37 +0000 Subject: [PATCH 4/4] Extract namespaces from report.Namespace This avoids unnecessary loops. --- app/api_topologies.go | 25 ++++++------------------- 1 file changed, 6 insertions(+), 19 deletions(-) diff --git a/app/api_topologies.go b/app/api_topologies.go index 2a28a40ae7..8a1a598b0c 100644 --- a/app/api_topologies.go +++ b/app/api_topologies.go @@ -95,26 +95,13 @@ func updateSwarmFilters(rpt report.Report, topologies []APITopologyDesc) []APITo } func updateKubeFilters(rpt report.Report, topologies []APITopologyDesc) []APITopologyDesc { - namespaces := map[string]struct{}{} - // We exclude ReplicaSets since we don't show them anywhere. - for _, t := range []report.Topology{rpt.Pod, rpt.Service, rpt.Deployment, rpt.DaemonSet, rpt.StatefulSet, rpt.CronJob} { - for _, n := range t.Nodes { - if state, ok := n.Latest.Lookup(kubernetes.State); ok && state == kubernetes.StateDeleted { - continue - } - if namespace, ok := n.Latest.Lookup(kubernetes.Namespace); ok { - namespaces[namespace] = struct{}{} - } - } - } - if len(namespaces) == 0 { - // We only want to apply k8s filters when we have k8s-related nodes, - // so if we don't then return early - return topologies - } ns := []string{} - for namespace := range namespaces { - ns = append(ns, namespace) + for _, n := range rpt.Namespace.Nodes { + name, ok := n.Latest.Lookup(kubernetes.Name) + if !ok { + continue + } + ns = append(ns, name) } sort.Strings(ns) topologies = append([]APITopologyDesc{}, topologies...) // Make a copy so we can make changes safely