From 2cd2d8034dfc87b8cf1f3273a05eaf27f4b6f1ec Mon Sep 17 00:00:00 2001 From: greg linton Date: Wed, 10 Jul 2019 13:29:07 -0600 Subject: [PATCH 01/11] Collect ingress information --- plugins/inputs/kube_inventory/client.go | 22 ++++++++ plugins/inputs/kube_inventory/ingress.go | 59 +++++++++++++++++++++ plugins/inputs/kube_inventory/kube_state.go | 2 + 3 files changed, 83 insertions(+) create mode 100644 plugins/inputs/kube_inventory/ingress.go diff --git a/plugins/inputs/kube_inventory/client.go b/plugins/inputs/kube_inventory/client.go index bf207b0ad46d6..5bb2baf5ce412 100644 --- a/plugins/inputs/kube_inventory/client.go +++ b/plugins/inputs/kube_inventory/client.go @@ -8,6 +8,7 @@ import ( "github.com/ericchiang/k8s/apis/apps/v1beta1" "github.com/ericchiang/k8s/apis/apps/v1beta2" "github.com/ericchiang/k8s/apis/core/v1" + v1beta1EXT "github.com/ericchiang/k8s/apis/extensions/v1beta1" "github.com/influxdata/telegraf/internal/tls" ) @@ -61,6 +62,20 @@ func (c *client) getDeployments(ctx context.Context) (*v1beta1.DeploymentList, e return list, c.List(ctx, c.namespace, list) } +func (c *client) getEndpoints(ctx context.Context) (*v1.EndpointsList, error) { + list := new(v1.EndpointsList) + ctx, cancel := context.WithTimeout(ctx, c.timeout) + defer cancel() + return list, c.List(ctx, c.namespace, list) +} + +func (c *client) getIngress(ctx context.Context) (*v1beta1EXT.IngressList, error) { + list := new(v1beta1EXT.IngressList) + ctx, cancel := context.WithTimeout(ctx, c.timeout) + defer cancel() + return list, c.List(ctx, c.namespace, list) +} + func (c *client) getNodes(ctx context.Context) (*v1.NodeList, error) { list := new(v1.NodeList) ctx, cancel := context.WithTimeout(ctx, c.timeout) @@ -89,6 +104,13 @@ func (c *client) getPods(ctx context.Context) (*v1.PodList, error) { return list, c.List(ctx, c.namespace, list) } +func (c *client) getServices(ctx context.Context) (*v1.ServiceList, error) { + list := new(v1.ServiceList) + ctx, cancel := context.WithTimeout(ctx, c.timeout) + defer cancel() + return list, c.List(ctx, c.namespace, list) +} + func (c *client) getStatefulSets(ctx context.Context) (*v1beta1.StatefulSetList, error) { list := new(v1beta1.StatefulSetList) ctx, cancel := context.WithTimeout(ctx, c.timeout) diff --git a/plugins/inputs/kube_inventory/ingress.go b/plugins/inputs/kube_inventory/ingress.go new file mode 100644 index 0000000000000..6fe7cfb0b53ec --- /dev/null +++ b/plugins/inputs/kube_inventory/ingress.go @@ -0,0 +1,59 @@ +package kube_inventory + +import ( + "context" + "errors" + "time" + + v1beta1EXT "github.com/ericchiang/k8s/apis/extensions/v1beta1" + + "github.com/influxdata/telegraf" +) + +func collectIngress(ctx context.Context, acc telegraf.Accumulator, ki *KubernetesInventory) { + list, err := ki.client.getIngress(ctx) + if err != nil { + acc.AddError(err) + return + } + for _, i := range list.Items { + if err = ki.gatherIngress(*i, acc); err != nil { + acc.AddError(err) + return + } + } +} + +// todo: do we want to add cardinality and collect values from `i.GetStatus().GetLoadBalancer().GetIngress()` +func (ki *KubernetesInventory) gatherIngress(i v1beta1EXT.Ingress, acc telegraf.Accumulator) error { + if i.Metadata.CreationTimestamp.GetSeconds() == 0 && i.Metadata.CreationTimestamp.GetNanos() == 0 { + return nil + } + + if i.Status.LoadBalancer == nil { + return errors.New("invalid nil loadbalancer") + } + + fields := map[string]interface{}{ + "created": time.Unix(i.Metadata.CreationTimestamp.GetSeconds(), int64(i.Metadata.CreationTimestamp.GetNanos())).UnixNano(), + "generation": i.Metadata.GetGeneration(), + } + + tags := map[string]string{ + "name": i.Metadata.GetName(), + "namespace": i.Metadata.GetNamespace(), + } + + for _, rule := range i.GetSpec().GetRules() { + for _, path := range rule.GetIngressRuleValue().GetHttp().GetPaths() { + fields["backend_service_port"] = path.GetBackend().GetServicePort().GetIntVal() + + tags["backend_service_name"] = path.GetBackend().GetServiceName() + tags["path"] = path.GetPath() + + acc.AddFields(ingressMeasurement, fields, tags) + } + } + + return nil +} diff --git a/plugins/inputs/kube_inventory/kube_state.go b/plugins/inputs/kube_inventory/kube_state.go index 57d31908d2bf1..6c332c7813852 100644 --- a/plugins/inputs/kube_inventory/kube_state.go +++ b/plugins/inputs/kube_inventory/kube_state.go @@ -111,6 +111,7 @@ func (ki *KubernetesInventory) Gather(acc telegraf.Accumulator) (err error) { var availableCollectors = map[string]func(ctx context.Context, acc telegraf.Accumulator, ki *KubernetesInventory){ "daemonsets": collectDaemonSets, "deployments": collectDeployments, + "ingress": collectIngress, "nodes": collectNodes, "persistentvolumes": collectPersistentVolumes, "persistentvolumeclaims": collectPersistentVolumeClaims, @@ -158,6 +159,7 @@ func convertQuantity(s string, m float64) int64 { var ( daemonSetMeasurement = "kubernetes_daemonset" deploymentMeasurement = "kubernetes_deployment" + ingressMeasurement = "kubernetes_ingress" nodeMeasurement = "kubernetes_node" persistentVolumeMeasurement = "kubernetes_persistentvolume" persistentVolumeClaimMeasurement = "kubernetes_persistentvolumeclaim" From 54229bb19c5e9b94cdd66b745f2ff1790e48faae Mon Sep 17 00:00:00 2001 From: greg linton Date: Wed, 10 Jul 2019 13:54:52 -0600 Subject: [PATCH 02/11] Collect endpoint information --- plugins/inputs/kube_inventory/endpoint.go | 104 ++++++++++++++++++++ plugins/inputs/kube_inventory/ingress.go | 49 +++++++-- plugins/inputs/kube_inventory/kube_state.go | 6 +- 3 files changed, 151 insertions(+), 8 deletions(-) create mode 100644 plugins/inputs/kube_inventory/endpoint.go diff --git a/plugins/inputs/kube_inventory/endpoint.go b/plugins/inputs/kube_inventory/endpoint.go new file mode 100644 index 0000000000000..540f5612b1ec5 --- /dev/null +++ b/plugins/inputs/kube_inventory/endpoint.go @@ -0,0 +1,104 @@ +package kube_inventory + +import ( + "context" + "time" + + "github.com/ericchiang/k8s/apis/core/v1" + + "github.com/influxdata/telegraf" +) + +func collectEndpoints(ctx context.Context, acc telegraf.Accumulator, ki *KubernetesInventory) { + list, err := ki.client.getEndpoints(ctx) + if err != nil { + acc.AddError(err) + return + } + for _, i := range list.Items { + if err = ki.gatherEndpoint(*i, acc); err != nil { + acc.AddError(err) + return + } + } +} + +func (ki *KubernetesInventory) gatherEndpoint(e v1.Endpoints, acc telegraf.Accumulator) error { + if e.Metadata.CreationTimestamp.GetSeconds() == 0 && e.Metadata.CreationTimestamp.GetNanos() == 0 { + return nil + } + + fields := map[string]interface{}{ + "created": time.Unix(e.Metadata.CreationTimestamp.GetSeconds(), int64(e.Metadata.CreationTimestamp.GetNanos())).UnixNano(), + "generation": e.Metadata.GetGeneration(), + } + + tags := map[string]string{ + "name": e.Metadata.GetName(), + "namespace": e.Metadata.GetNamespace(), + } + + for _, endpoint := range e.GetSubsets() { + for _, port := range endpoint.GetPorts() { + fields["port"] = port.GetPort() + + tags["port_name"] = port.GetName() + tags["port_protocol"] = port.GetProtocol() + + acc.AddFields(endpointMeasurement, fields, tags) + } + } + + return nil +} + +// todo: do we want to add cardinality and collect hostnames/ready? +func (ki *KubernetesInventory) gatherEndpointWithHosts(e v1.Endpoints, acc telegraf.Accumulator) error { + if e.Metadata.CreationTimestamp.GetSeconds() == 0 && e.Metadata.CreationTimestamp.GetNanos() == 0 { + return nil + } + + fields := map[string]interface{}{ + "created": time.Unix(e.Metadata.CreationTimestamp.GetSeconds(), int64(e.Metadata.CreationTimestamp.GetNanos())).UnixNano(), + "generation": e.Metadata.GetGeneration(), + } + + tags := map[string]string{ + "name": e.Metadata.GetName(), + "namespace": e.Metadata.GetNamespace(), + } + + for _, endpoint := range e.GetSubsets() { + for _, readyAddr := range endpoint.GetAddresses() { + fields["ready"] = true + + // tags["hostname"]readyAddr.GetHostname() + tags["hostname"] = readyAddr.GetIp() + + for _, port := range endpoint.GetPorts() { + fields["port"] = port.GetPort() + + tags["port_name"] = port.GetName() + tags["port_protocol"] = port.GetProtocol() + + acc.AddFields(endpointMeasurement, fields, tags) + } + } + for _, notReadyAddr := range endpoint.GetNotReadyAddresses() { + fields["ready"] = false + + tags["hostname"] = notReadyAddr.GetIp() + + for _, port := range endpoint.GetPorts() { + fields["port"] = port.GetPort() + + tags["port_name"] = port.GetName() + tags["port_protocol"] = port.GetProtocol() + + acc.AddFields(endpointMeasurement, fields, tags) + } + } + } + + return nil +} diff --git a/plugins/inputs/kube_inventory/ingress.go b/plugins/inputs/kube_inventory/ingress.go index 6fe7cfb0b53ec..b75493181c0e4 100644 --- a/plugins/inputs/kube_inventory/ingress.go +++ b/plugins/inputs/kube_inventory/ingress.go @@ -2,9 +2,9 @@ package kube_inventory import ( "context" - "errors" "time" + "github.com/ericchiang/k8s/apis/core/v1" v1beta1EXT "github.com/ericchiang/k8s/apis/extensions/v1beta1" "github.com/influxdata/telegraf" @@ -24,16 +24,11 @@ func collectIngress(ctx context.Context, acc telegraf.Accumulator, ki *Kubernete } } -// todo: do we want to add cardinality and collect values from `i.GetStatus().GetLoadBalancer().GetIngress()` func (ki *KubernetesInventory) gatherIngress(i v1beta1EXT.Ingress, acc telegraf.Accumulator) error { if i.Metadata.CreationTimestamp.GetSeconds() == 0 && i.Metadata.CreationTimestamp.GetNanos() == 0 { return nil } - if i.Status.LoadBalancer == nil { - return errors.New("invalid nil loadbalancer") - } - fields := map[string]interface{}{ "created": time.Unix(i.Metadata.CreationTimestamp.GetSeconds(), int64(i.Metadata.CreationTimestamp.GetNanos())).UnixNano(), "generation": i.Metadata.GetGeneration(), @@ -57,3 +52,45 @@ func (ki *KubernetesInventory) gatherIngress(i v1beta1EXT.Ingress, acc telegraf. return nil } + +// todo: do we want to add cardinality and collect values from `i.GetStatus().GetLoadBalancer().GetIngress()` +func (ki *KubernetesInventory) gatherIngressWithIps(i v1beta1EXT.Ingress, acc telegraf.Accumulator) error { + if i.Metadata.CreationTimestamp.GetSeconds() == 0 && i.Metadata.CreationTimestamp.GetNanos() == 0 { + return nil + } + + fields := map[string]interface{}{ + "created": time.Unix(i.Metadata.CreationTimestamp.GetSeconds(), int64(i.Metadata.CreationTimestamp.GetNanos())).UnixNano(), + "generation": i.Metadata.GetGeneration(), + } + + tags := map[string]string{ + "name": i.Metadata.GetName(), + "namespace": i.Metadata.GetNamespace(), + } + + for _, ingress := range i.GetStatus().GetLoadBalancer().GetIngress() { + tags["ip"] = getHostOrIP(ingress) + + for _, rule := range i.GetSpec().GetRules() { + for _, path := range rule.GetIngressRuleValue().GetHttp().GetPaths() { + fields["backend_service_port"] = path.GetBackend().GetServicePort().GetIntVal() + + tags["backend_service_name"] = path.GetBackend().GetServiceName() + tags["path"] = path.GetPath() + + acc.AddFields(ingressMeasurement, fields, tags) + } + } + } + + return nil +} + +func getHostOrIP(ingress *v1.LoadBalancerIngress) string { + if name := ingress.GetHostname(); name != "" { + return name + } + + return ingress.GetIp() +} diff --git a/plugins/inputs/kube_inventory/kube_state.go b/plugins/inputs/kube_inventory/kube_state.go index 6c332c7813852..415ca6ed88e88 100644 --- a/plugins/inputs/kube_inventory/kube_state.go +++ b/plugins/inputs/kube_inventory/kube_state.go @@ -111,12 +111,13 @@ func (ki *KubernetesInventory) Gather(acc telegraf.Accumulator) (err error) { var availableCollectors = map[string]func(ctx context.Context, acc telegraf.Accumulator, ki *KubernetesInventory){ "daemonsets": collectDaemonSets, "deployments": collectDeployments, + "endpoints": collectEndpoints, "ingress": collectIngress, "nodes": collectNodes, "persistentvolumes": collectPersistentVolumes, "persistentvolumeclaims": collectPersistentVolumeClaims, - "pods": collectPods, - "statefulsets": collectStatefulSets, + "pods": collectPods, + "statefulsets": collectStatefulSets, } func (ki *KubernetesInventory) initClient() (*client, error) { @@ -159,6 +160,7 @@ func convertQuantity(s string, m float64) int64 { var ( daemonSetMeasurement = "kubernetes_daemonset" deploymentMeasurement = "kubernetes_deployment" + endpointMeasurement = "kubernetes_endpoint" ingressMeasurement = "kubernetes_ingress" nodeMeasurement = "kubernetes_node" persistentVolumeMeasurement = "kubernetes_persistentvolume" From ee963ad8848bdd9bf940d02f7c4f356e6a5ce69f Mon Sep 17 00:00:00 2001 From: greg linton Date: Wed, 10 Jul 2019 14:12:35 -0600 Subject: [PATCH 03/11] Collect service information --- plugins/inputs/kube_inventory/kube_state.go | 2 + plugins/inputs/kube_inventory/service.go | 112 ++++++++++++++++++++ 2 files changed, 114 insertions(+) create mode 100644 plugins/inputs/kube_inventory/service.go diff --git a/plugins/inputs/kube_inventory/kube_state.go b/plugins/inputs/kube_inventory/kube_state.go index 415ca6ed88e88..8c8bb7bf0b2ee 100644 --- a/plugins/inputs/kube_inventory/kube_state.go +++ b/plugins/inputs/kube_inventory/kube_state.go @@ -117,6 +117,7 @@ var availableCollectors = map[string]func(ctx context.Context, acc telegraf.Accu "persistentvolumes": collectPersistentVolumes, "persistentvolumeclaims": collectPersistentVolumeClaims, "pods": collectPods, + "services": collectServices, "statefulsets": collectStatefulSets, } @@ -166,6 +167,7 @@ var ( persistentVolumeMeasurement = "kubernetes_persistentvolume" persistentVolumeClaimMeasurement = "kubernetes_persistentvolumeclaim" podContainerMeasurement = "kubernetes_pod_container" + serviceMeasurement = "kubernetes_service" statefulSetMeasurement = "kubernetes_statefulset" ) diff --git a/plugins/inputs/kube_inventory/service.go b/plugins/inputs/kube_inventory/service.go new file mode 100644 index 0000000000000..ca1904e2f83fa --- /dev/null +++ b/plugins/inputs/kube_inventory/service.go @@ -0,0 +1,112 @@ +package kube_inventory + +import ( + "context" + "time" + + "github.com/ericchiang/k8s/apis/core/v1" + + "github.com/influxdata/telegraf" +) + +func collectServices(ctx context.Context, acc telegraf.Accumulator, ki *KubernetesInventory) { + list, err := ki.client.getServices(ctx) + if err != nil { + acc.AddError(err) + return + } + for _, i := range list.Items { + if err = ki.gatherService(*i, acc); err != nil { + acc.AddError(err) + return + } + } +} + +func (ki *KubernetesInventory) gatherService(s v1.Service, acc telegraf.Accumulator) error { + if s.Metadata.CreationTimestamp.GetSeconds() == 0 && s.Metadata.CreationTimestamp.GetNanos() == 0 { + return nil + } + + fields := map[string]interface{}{ + "created": time.Unix(s.Metadata.CreationTimestamp.GetSeconds(), int64(s.Metadata.CreationTimestamp.GetNanos())).UnixNano(), + "generation": s.Metadata.GetGeneration(), + } + + tags := map[string]string{ + "name": s.Metadata.GetName(), + "namespace": s.Metadata.GetNamespace(), + } + + for _, port := range s.GetSpec().GetPorts() { + fields["port"] = port.GetPort() + + tags["port_name"] = port.GetName() + tags["port_protocol"] = port.GetProtocol() + + if s.GetSpec().GetType() == "ExternalName" { + tags["external_name"] = s.GetSpec().GetExternalName() + } else { + tags["cluster_ip"] = s.GetSpec().GetClusterIP() + } + + acc.AddFields(endpointMeasurement, fields, tags) + } + + return nil +} + +// todo: do we want to add cardinality and collect external_ips? +func (ki *KubernetesInventory) gatherServiceWithExternIps(s v1.Service, acc telegraf.Accumulator) error { + if s.Metadata.CreationTimestamp.GetSeconds() == 0 && s.Metadata.CreationTimestamp.GetNanos() == 0 { + return nil + } + + fields := map[string]interface{}{ + "created": time.Unix(s.Metadata.CreationTimestamp.GetSeconds(), int64(s.Metadata.CreationTimestamp.GetNanos())).UnixNano(), + "generation": s.Metadata.GetGeneration(), + } + + tags := map[string]string{ + "name": s.Metadata.GetName(), + "namespace": s.Metadata.GetNamespace(), + } + + if externIPs := s.GetSpec().GetExternalIPs(); externIPs != nil { + for _, ip := range externIPs { + tags["ip"] = ip + + for _, port := range s.GetSpec().GetPorts() { + fields["port"] = port.GetPort() + + tags["port_name"] = port.GetName() + tags["port_protocol"] = port.GetProtocol() + + if s.GetSpec().GetType() == "ExternalName" { + tags["external_name"] = s.GetSpec().GetExternalName() + } else { + tags["cluster_ip"] = s.GetSpec().GetClusterIP() + } + + acc.AddFields(endpointMeasurement, fields, tags) + } + } + } else { + for _, port := range s.GetSpec().GetPorts() { + fields["port"] = port.GetPort() + + tags["port_name"] = port.GetName() + tags["port_protocol"] = port.GetProtocol() + + if s.GetSpec().GetType() == "ExternalName" { + tags["external_name"] = s.GetSpec().GetExternalName() + } else { + tags["cluster_ip"] = s.GetSpec().GetClusterIP() + } + + acc.AddFields(endpointMeasurement, fields, tags) + } + } + + return nil +} From 0248587499b6dad55b6d6525cb0bcfb5fc469593 Mon Sep 17 00:00:00 2001 From: greg linton Date: Wed, 10 Jul 2019 14:40:16 -0600 Subject: [PATCH 04/11] Set correct measurement --- plugins/inputs/kube_inventory/service.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/plugins/inputs/kube_inventory/service.go b/plugins/inputs/kube_inventory/service.go index ca1904e2f83fa..8395d7adde779 100644 --- a/plugins/inputs/kube_inventory/service.go +++ b/plugins/inputs/kube_inventory/service.go @@ -50,7 +50,7 @@ func (ki *KubernetesInventory) gatherService(s v1.Service, acc telegraf.Accumula tags["cluster_ip"] = s.GetSpec().GetClusterIP() } - acc.AddFields(endpointMeasurement, fields, tags) + acc.AddFields(serviceMeasurement, fields, tags) } return nil @@ -88,7 +88,7 @@ func (ki *KubernetesInventory) gatherServiceWithExternIps(s v1.Service, acc tele tags["cluster_ip"] = s.GetSpec().GetClusterIP() } - acc.AddFields(endpointMeasurement, fields, tags) + acc.AddFields(serviceMeasurement, fields, tags) } } } else { @@ -104,7 +104,7 @@ func (ki *KubernetesInventory) gatherServiceWithExternIps(s v1.Service, acc tele tags["cluster_ip"] = s.GetSpec().GetClusterIP() } - acc.AddFields(endpointMeasurement, fields, tags) + acc.AddFields(serviceMeasurement, fields, tags) } } From bf376fbf35371666061e10e17223b3ec78f75341 Mon Sep 17 00:00:00 2001 From: greg linton Date: Wed, 10 Jul 2019 14:44:29 -0600 Subject: [PATCH 05/11] Add dependency --- Gopkg.lock | 2 ++ 1 file changed, 2 insertions(+) diff --git a/Gopkg.lock b/Gopkg.lock index bcdf6cd07ec17..c1df0dae5fc6d 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -403,6 +403,7 @@ "apis/apps/v1beta1", "apis/apps/v1beta2", "apis/core/v1", + "apis/extensions/v1beta1", "apis/meta/v1", "apis/policy/v1beta1", "apis/resource", @@ -1599,6 +1600,7 @@ "github.com/ericchiang/k8s/apis/apps/v1beta1", "github.com/ericchiang/k8s/apis/apps/v1beta2", "github.com/ericchiang/k8s/apis/core/v1", + "github.com/ericchiang/k8s/apis/extensions/v1beta1", "github.com/ericchiang/k8s/apis/meta/v1", "github.com/ericchiang/k8s/apis/resource", "github.com/ericchiang/k8s/util/intstr", From 39ec0822cf56bd0c34101330f82ca64fd9dab69e Mon Sep 17 00:00:00 2001 From: greg linton Date: Wed, 10 Jul 2019 14:46:12 -0600 Subject: [PATCH 06/11] make fmt --- plugins/inputs/kube_inventory/kube_state.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/plugins/inputs/kube_inventory/kube_state.go b/plugins/inputs/kube_inventory/kube_state.go index 8c8bb7bf0b2ee..9ffe0765e71e3 100644 --- a/plugins/inputs/kube_inventory/kube_state.go +++ b/plugins/inputs/kube_inventory/kube_state.go @@ -116,9 +116,9 @@ var availableCollectors = map[string]func(ctx context.Context, acc telegraf.Accu "nodes": collectNodes, "persistentvolumes": collectPersistentVolumes, "persistentvolumeclaims": collectPersistentVolumeClaims, - "pods": collectPods, - "services": collectServices, - "statefulsets": collectStatefulSets, + "pods": collectPods, + "services": collectServices, + "statefulsets": collectStatefulSets, } func (ki *KubernetesInventory) initClient() (*client, error) { From 8cd7707d174731480d2a54372748654ed8117536 Mon Sep 17 00:00:00 2001 From: greg linton Date: Thu, 11 Jul 2019 10:59:48 -0600 Subject: [PATCH 07/11] Add more requested tags/fields --- plugins/inputs/kube_inventory/endpoint.go | 18 ++++++++++++++---- plugins/inputs/kube_inventory/ingress.go | 18 +++++++----------- plugins/inputs/kube_inventory/service.go | 4 +++- 3 files changed, 24 insertions(+), 16 deletions(-) diff --git a/plugins/inputs/kube_inventory/endpoint.go b/plugins/inputs/kube_inventory/endpoint.go index 540f5612b1ec5..9561829de1e53 100644 --- a/plugins/inputs/kube_inventory/endpoint.go +++ b/plugins/inputs/kube_inventory/endpoint.go @@ -2,6 +2,7 @@ package kube_inventory import ( "context" + "strings" "time" "github.com/ericchiang/k8s/apis/core/v1" @@ -16,7 +17,7 @@ func collectEndpoints(ctx context.Context, acc telegraf.Accumulator, ki *Kuberne return } for _, i := range list.Items { - if err = ki.gatherEndpoint(*i, acc); err != nil { + if err = ki.gatherEndpointWithHosts(*i, acc); err != nil { acc.AddError(err) return } @@ -72,8 +73,12 @@ func (ki *KubernetesInventory) gatherEndpointWithHosts(e v1.Endpoints, acc teleg for _, readyAddr := range endpoint.GetAddresses() { fields["ready"] = true - // tags["hostname"]readyAddr.GetHostname() - tags["hostname"] = readyAddr.GetIp() + // todo: do we want tags["hostname"] = readyAddr.GetHostname() + tags["ip"] = readyAddr.GetIp() + tags["node_name"] = readyAddr.GetNodeName() + if readyAddr.TargetRef != nil { + tags[strings.ToLower(readyAddr.GetTargetRef().GetKind())] = readyAddr.GetTargetRef().GetName() + } for _, port := range endpoint.GetPorts() { fields["port"] = port.GetPort() @@ -87,7 +92,12 @@ func (ki *KubernetesInventory) gatherEndpointWithHosts(e v1.Endpoints, acc teleg for _, notReadyAddr := range endpoint.GetNotReadyAddresses() { fields["ready"] = false - tags["hostname"] = notReadyAddr.GetIp() + // todo: do we want tags["hostname"] = readyAddr.GetHostname() + tags["ip"] = notReadyAddr.GetIp() + tags["node_name"] = notReadyAddr.GetNodeName() + if notReadyAddr.TargetRef != nil { + tags[strings.ToLower(notReadyAddr.GetTargetRef().GetKind())] = notReadyAddr.GetTargetRef().GetName() + } for _, port := range endpoint.GetPorts() { fields["port"] = port.GetPort() diff --git a/plugins/inputs/kube_inventory/ingress.go b/plugins/inputs/kube_inventory/ingress.go index b75493181c0e4..7bd8680e97424 100644 --- a/plugins/inputs/kube_inventory/ingress.go +++ b/plugins/inputs/kube_inventory/ingress.go @@ -4,7 +4,6 @@ import ( "context" "time" - "github.com/ericchiang/k8s/apis/core/v1" v1beta1EXT "github.com/ericchiang/k8s/apis/extensions/v1beta1" "github.com/influxdata/telegraf" @@ -17,7 +16,7 @@ func collectIngress(ctx context.Context, acc telegraf.Accumulator, ki *Kubernete return } for _, i := range list.Items { - if err = ki.gatherIngress(*i, acc); err != nil { + if err = ki.gatherIngressWithIps(*i, acc); err != nil { acc.AddError(err) return } @@ -42,9 +41,11 @@ func (ki *KubernetesInventory) gatherIngress(i v1beta1EXT.Ingress, acc telegraf. for _, rule := range i.GetSpec().GetRules() { for _, path := range rule.GetIngressRuleValue().GetHttp().GetPaths() { fields["backend_service_port"] = path.GetBackend().GetServicePort().GetIntVal() + fields["tls"] = i.GetSpec().GetTls() != nil tags["backend_service_name"] = path.GetBackend().GetServiceName() tags["path"] = path.GetPath() + tags["host"] = rule.GetHost() acc.AddFields(ingressMeasurement, fields, tags) } @@ -70,14 +71,17 @@ func (ki *KubernetesInventory) gatherIngressWithIps(i v1beta1EXT.Ingress, acc te } for _, ingress := range i.GetStatus().GetLoadBalancer().GetIngress() { - tags["ip"] = getHostOrIP(ingress) + tags["hostname"] = ingress.GetHostname() + tags["ip"] = ingress.GetIp() for _, rule := range i.GetSpec().GetRules() { for _, path := range rule.GetIngressRuleValue().GetHttp().GetPaths() { fields["backend_service_port"] = path.GetBackend().GetServicePort().GetIntVal() + fields["tls"] = i.GetSpec().GetTls() != nil tags["backend_service_name"] = path.GetBackend().GetServiceName() tags["path"] = path.GetPath() + tags["host"] = rule.GetHost() acc.AddFields(ingressMeasurement, fields, tags) } @@ -86,11 +90,3 @@ func (ki *KubernetesInventory) gatherIngressWithIps(i v1beta1EXT.Ingress, acc te return nil } - -func getHostOrIP(ingress *v1.LoadBalancerIngress) string { - if name := ingress.GetHostname(); name != "" { - return name - } - - return ingress.GetIp() -} diff --git a/plugins/inputs/kube_inventory/service.go b/plugins/inputs/kube_inventory/service.go index 8395d7adde779..800de757542a8 100644 --- a/plugins/inputs/kube_inventory/service.go +++ b/plugins/inputs/kube_inventory/service.go @@ -16,7 +16,7 @@ func collectServices(ctx context.Context, acc telegraf.Accumulator, ki *Kubernet return } for _, i := range list.Items { - if err = ki.gatherService(*i, acc); err != nil { + if err = ki.gatherServiceWithExternIps(*i, acc); err != nil { acc.AddError(err) return } @@ -40,6 +40,7 @@ func (ki *KubernetesInventory) gatherService(s v1.Service, acc telegraf.Accumula for _, port := range s.GetSpec().GetPorts() { fields["port"] = port.GetPort() + fields["target_port"] = port.GetTargetPort() tags["port_name"] = port.GetName() tags["port_protocol"] = port.GetProtocol() @@ -78,6 +79,7 @@ func (ki *KubernetesInventory) gatherServiceWithExternIps(s v1.Service, acc tele for _, port := range s.GetSpec().GetPorts() { fields["port"] = port.GetPort() + fields["target_port"] = port.GetTargetPort() tags["port_name"] = port.GetName() tags["port_protocol"] = port.GetProtocol() From 75b1c0d79a31b12001f5a85f34c8d3db9534bc68 Mon Sep 17 00:00:00 2001 From: greg linton Date: Tue, 16 Jul 2019 11:40:51 -0600 Subject: [PATCH 08/11] Test endpoints --- plugins/inputs/kube_inventory/endpoint.go | 42 +--- .../inputs/kube_inventory/endpoint_test.go | 194 ++++++++++++++++++ plugins/inputs/kube_inventory/ingress.go | 8 +- plugins/inputs/kube_inventory/service.go | 8 +- 4 files changed, 207 insertions(+), 45 deletions(-) create mode 100644 plugins/inputs/kube_inventory/endpoint_test.go diff --git a/plugins/inputs/kube_inventory/endpoint.go b/plugins/inputs/kube_inventory/endpoint.go index 9561829de1e53..7298789da8e08 100644 --- a/plugins/inputs/kube_inventory/endpoint.go +++ b/plugins/inputs/kube_inventory/endpoint.go @@ -17,7 +17,7 @@ func collectEndpoints(ctx context.Context, acc telegraf.Accumulator, ki *Kuberne return } for _, i := range list.Items { - if err = ki.gatherEndpointWithHosts(*i, acc); err != nil { + if err = ki.gatherEndpoint(*i, acc); err != nil { acc.AddError(err) return } @@ -35,46 +35,15 @@ func (ki *KubernetesInventory) gatherEndpoint(e v1.Endpoints, acc telegraf.Accum } tags := map[string]string{ - "name": e.Metadata.GetName(), - "namespace": e.Metadata.GetNamespace(), - } - - for _, endpoint := range e.GetSubsets() { - for _, port := range endpoint.GetPorts() { - fields["port"] = port.GetPort() - - tags["port_name"] = port.GetName() - tags["port_protocol"] = port.GetProtocol() - - acc.AddFields(endpointMeasurement, fields, tags) - } - } - - return nil -} - -// todo: do we want to add cardinality and collect hostnames/ready? -func (ki *KubernetesInventory) gatherEndpointWithHosts(e v1.Endpoints, acc telegraf.Accumulator) error { - if e.Metadata.CreationTimestamp.GetSeconds() == 0 && e.Metadata.CreationTimestamp.GetNanos() == 0 { - return nil - } - - fields := map[string]interface{}{ - "created": time.Unix(e.Metadata.CreationTimestamp.GetSeconds(), int64(e.Metadata.CreationTimestamp.GetNanos())).UnixNano(), - "generation": e.Metadata.GetGeneration(), - } - - tags := map[string]string{ - "name": e.Metadata.GetName(), - "namespace": e.Metadata.GetNamespace(), + "endpoint_name": e.Metadata.GetName(), + "namespace": e.Metadata.GetNamespace(), } for _, endpoint := range e.GetSubsets() { for _, readyAddr := range endpoint.GetAddresses() { fields["ready"] = true - // todo: do we want tags["hostname"] = readyAddr.GetHostname() - tags["ip"] = readyAddr.GetIp() + tags["hostname"] = readyAddr.GetHostname() tags["node_name"] = readyAddr.GetNodeName() if readyAddr.TargetRef != nil { tags[strings.ToLower(readyAddr.GetTargetRef().GetKind())] = readyAddr.GetTargetRef().GetName() @@ -92,8 +61,7 @@ func (ki *KubernetesInventory) gatherEndpointWithHosts(e v1.Endpoints, acc teleg for _, notReadyAddr := range endpoint.GetNotReadyAddresses() { fields["ready"] = false - // todo: do we want tags["hostname"] = readyAddr.GetHostname() - tags["ip"] = notReadyAddr.GetIp() + tags["hostname"] = notReadyAddr.GetHostname() tags["node_name"] = notReadyAddr.GetNodeName() if notReadyAddr.TargetRef != nil { tags[strings.ToLower(notReadyAddr.GetTargetRef().GetKind())] = notReadyAddr.GetTargetRef().GetName() diff --git a/plugins/inputs/kube_inventory/endpoint_test.go b/plugins/inputs/kube_inventory/endpoint_test.go new file mode 100644 index 0000000000000..b88c388162bd2 --- /dev/null +++ b/plugins/inputs/kube_inventory/endpoint_test.go @@ -0,0 +1,194 @@ +package kube_inventory + +import ( + "testing" + "time" + + "github.com/ericchiang/k8s/apis/core/v1" + metav1 "github.com/ericchiang/k8s/apis/meta/v1" + "github.com/influxdata/telegraf/testutil" +) + +func TestEndpoint(t *testing.T) { + cli := &client{} + + now := time.Now() + now = time.Date(now.Year(), now.Month(), now.Day(), now.Hour(), 1, 36, 0, now.Location()) + + tests := []struct { + name string + handler *mockHandler + output *testutil.Accumulator + hasError bool + }{ + { + name: "no endpoints", + handler: &mockHandler{ + responseMap: map[string]interface{}{ + "/endpoints/": &v1.EndpointsList{}, + }, + }, + hasError: false, + }, + { + name: "collect ready endpoints", + handler: &mockHandler{ + responseMap: map[string]interface{}{ + "/endpoints/": &v1.EndpointsList{ + Items: []*v1.Endpoints{ + { + Subsets: []*v1.EndpointSubset{ + { + Addresses: []*v1.EndpointAddress{ + { + Hostname: toStrPtr("storage-6"), + NodeName: toStrPtr("b.storage.internal"), + TargetRef: &v1.ObjectReference{ + Kind: toStrPtr("pod"), + Name: toStrPtr("storage-6"), + }, + }, + }, + Ports: []*v1.EndpointPort{ + { + Name: toStrPtr("server"), + Protocol: toStrPtr("TCP"), + Port: toInt32Ptr(8080), + }, + }, + }, + }, + Metadata: &metav1.ObjectMeta{ + Generation: toInt64Ptr(12), + Namespace: toStrPtr("ns1"), + Name: toStrPtr("storage"), + CreationTimestamp: &metav1.Time{Seconds: toInt64Ptr(now.Unix())}, + }, + }, + }, + }, + }, + }, + output: &testutil.Accumulator{ + Metrics: []*testutil.Metric{ + { + Fields: map[string]interface{}{ + "ready": true, + "port": int32(8080), + "generation": int64(12), + "created": now.UnixNano(), + }, + Tags: map[string]string{ + "endpoint_name": "storage", + "namespace": "ns1", + "hostname": "storage-6", + "node_name": "b.storage.internal", + "port_name": "server", + "port_protocol": "TCP", + "pod": "storage-6", + }, + }, + }, + }, + hasError: false, + }, + { + name: "collect notready endpoints", + handler: &mockHandler{ + responseMap: map[string]interface{}{ + "/endpoints/": &v1.EndpointsList{ + Items: []*v1.Endpoints{ + { + Subsets: []*v1.EndpointSubset{ + { + NotReadyAddresses: []*v1.EndpointAddress{ + { + Hostname: toStrPtr("storage-6"), + NodeName: toStrPtr("b.storage.internal"), + TargetRef: &v1.ObjectReference{ + Kind: toStrPtr("pod"), + Name: toStrPtr("storage-6"), + }, + }, + }, + Ports: []*v1.EndpointPort{ + { + Name: toStrPtr("server"), + Protocol: toStrPtr("TCP"), + Port: toInt32Ptr(8080), + }, + }, + }, + }, + Metadata: &metav1.ObjectMeta{ + Generation: toInt64Ptr(12), + Namespace: toStrPtr("ns1"), + Name: toStrPtr("storage"), + CreationTimestamp: &metav1.Time{Seconds: toInt64Ptr(now.Unix())}, + }, + }, + }, + }, + }, + }, + output: &testutil.Accumulator{ + Metrics: []*testutil.Metric{ + { + Fields: map[string]interface{}{ + "ready": false, + "port": int32(8080), + "generation": int64(12), + "created": now.UnixNano(), + }, + Tags: map[string]string{ + "endpoint_name": "storage", + "namespace": "ns1", + "hostname": "storage-6", + "node_name": "b.storage.internal", + "port_name": "server", + "port_protocol": "TCP", + "pod": "storage-6", + }, + }, + }, + }, + hasError: false, + }, + } + + for _, v := range tests { + ks := &KubernetesInventory{ + client: cli, + } + acc := new(testutil.Accumulator) + for _, endpoint := range ((v.handler.responseMap["/endpoints/"]).(*v1.EndpointsList)).Items { + err := ks.gatherEndpoint(*endpoint, acc) + if err != nil { + t.Errorf("Failed to gather endpoint - %s", err.Error()) + } + } + + err := acc.FirstError() + if err == nil && v.hasError { + t.Fatalf("%s failed, should have error", v.name) + } else if err != nil && !v.hasError { + t.Fatalf("%s failed, err: %v", v.name, err) + } + if v.output == nil && len(acc.Metrics) > 0 { + t.Fatalf("%s: collected extra data", v.name) + } else if v.output != nil && len(v.output.Metrics) > 0 { + for i := range v.output.Metrics { + for k, m := range v.output.Metrics[i].Tags { + if acc.Metrics[i].Tags[k] != m { + t.Fatalf("%s: tag %s metrics unmatch Expected %s, got '%v'\n", v.name, k, m, acc.Metrics[i].Tags[k]) + } + } + for k, m := range v.output.Metrics[i].Fields { + if acc.Metrics[i].Fields[k] != m { + t.Fatalf("%s: field %s metrics unmatch Expected %v(%T), got %v(%T)\n", v.name, k, m, m, acc.Metrics[i].Fields[k], acc.Metrics[i].Fields[k]) + } + } + } + } + } +} diff --git a/plugins/inputs/kube_inventory/ingress.go b/plugins/inputs/kube_inventory/ingress.go index 7bd8680e97424..76b79a7c0e17f 100644 --- a/plugins/inputs/kube_inventory/ingress.go +++ b/plugins/inputs/kube_inventory/ingress.go @@ -34,8 +34,8 @@ func (ki *KubernetesInventory) gatherIngress(i v1beta1EXT.Ingress, acc telegraf. } tags := map[string]string{ - "name": i.Metadata.GetName(), - "namespace": i.Metadata.GetNamespace(), + "ingress_name": i.Metadata.GetName(), + "namespace": i.Metadata.GetNamespace(), } for _, rule := range i.GetSpec().GetRules() { @@ -66,8 +66,8 @@ func (ki *KubernetesInventory) gatherIngressWithIps(i v1beta1EXT.Ingress, acc te } tags := map[string]string{ - "name": i.Metadata.GetName(), - "namespace": i.Metadata.GetNamespace(), + "ingress_name": i.Metadata.GetName(), + "namespace": i.Metadata.GetNamespace(), } for _, ingress := range i.GetStatus().GetLoadBalancer().GetIngress() { diff --git a/plugins/inputs/kube_inventory/service.go b/plugins/inputs/kube_inventory/service.go index 800de757542a8..6369068171d99 100644 --- a/plugins/inputs/kube_inventory/service.go +++ b/plugins/inputs/kube_inventory/service.go @@ -34,8 +34,8 @@ func (ki *KubernetesInventory) gatherService(s v1.Service, acc telegraf.Accumula } tags := map[string]string{ - "name": s.Metadata.GetName(), - "namespace": s.Metadata.GetNamespace(), + "service_name": s.Metadata.GetName(), + "namespace": s.Metadata.GetNamespace(), } for _, port := range s.GetSpec().GetPorts() { @@ -69,8 +69,8 @@ func (ki *KubernetesInventory) gatherServiceWithExternIps(s v1.Service, acc tele } tags := map[string]string{ - "name": s.Metadata.GetName(), - "namespace": s.Metadata.GetNamespace(), + "service_name": s.Metadata.GetName(), + "namespace": s.Metadata.GetNamespace(), } if externIPs := s.GetSpec().GetExternalIPs(); externIPs != nil { From 61d3c464a0dd0e5c062dc0ad68027c446cd70636 Mon Sep 17 00:00:00 2001 From: greg linton Date: Tue, 16 Jul 2019 12:23:47 -0600 Subject: [PATCH 09/11] Test ingress --- plugins/inputs/kube_inventory/client_test.go | 8 + plugins/inputs/kube_inventory/ingress.go | 34 +---- plugins/inputs/kube_inventory/ingress_test.go | 142 ++++++++++++++++++ 3 files changed, 151 insertions(+), 33 deletions(-) create mode 100644 plugins/inputs/kube_inventory/ingress_test.go diff --git a/plugins/inputs/kube_inventory/client_test.go b/plugins/inputs/kube_inventory/client_test.go index 4f54755b02362..3e4eaf7522249 100644 --- a/plugins/inputs/kube_inventory/client_test.go +++ b/plugins/inputs/kube_inventory/client_test.go @@ -4,6 +4,7 @@ import ( "testing" "time" + "github.com/ericchiang/k8s/util/intstr" "github.com/influxdata/telegraf/internal/tls" ) @@ -27,6 +28,13 @@ func toBoolPtr(b bool) *bool { return &b } +func toIntStrPtrS(s string) *intstr.IntOrString { + return &intstr.IntOrString{StrVal: &s} +} + +func toIntStrPtrI(i int32) *intstr.IntOrString { + return &intstr.IntOrString{IntVal: &i} +} func TestNewClient(t *testing.T) { _, err := newClient("https://127.0.0.1:443/", "default", "abc123", time.Second, tls.ClientConfig{}) if err != nil { diff --git a/plugins/inputs/kube_inventory/ingress.go b/plugins/inputs/kube_inventory/ingress.go index 76b79a7c0e17f..6d5c8019927cf 100644 --- a/plugins/inputs/kube_inventory/ingress.go +++ b/plugins/inputs/kube_inventory/ingress.go @@ -16,7 +16,7 @@ func collectIngress(ctx context.Context, acc telegraf.Accumulator, ki *Kubernete return } for _, i := range list.Items { - if err = ki.gatherIngressWithIps(*i, acc); err != nil { + if err = ki.gatherIngress(*i, acc); err != nil { acc.AddError(err) return } @@ -38,38 +38,6 @@ func (ki *KubernetesInventory) gatherIngress(i v1beta1EXT.Ingress, acc telegraf. "namespace": i.Metadata.GetNamespace(), } - for _, rule := range i.GetSpec().GetRules() { - for _, path := range rule.GetIngressRuleValue().GetHttp().GetPaths() { - fields["backend_service_port"] = path.GetBackend().GetServicePort().GetIntVal() - fields["tls"] = i.GetSpec().GetTls() != nil - - tags["backend_service_name"] = path.GetBackend().GetServiceName() - tags["path"] = path.GetPath() - tags["host"] = rule.GetHost() - - acc.AddFields(ingressMeasurement, fields, tags) - } - } - - return nil -} - -// todo: do we want to add cardinality and collect values from `i.GetStatus().GetLoadBalancer().GetIngress()` -func (ki *KubernetesInventory) gatherIngressWithIps(i v1beta1EXT.Ingress, acc telegraf.Accumulator) error { - if i.Metadata.CreationTimestamp.GetSeconds() == 0 && i.Metadata.CreationTimestamp.GetNanos() == 0 { - return nil - } - - fields := map[string]interface{}{ - "created": time.Unix(i.Metadata.CreationTimestamp.GetSeconds(), int64(i.Metadata.CreationTimestamp.GetNanos())).UnixNano(), - "generation": i.Metadata.GetGeneration(), - } - - tags := map[string]string{ - "ingress_name": i.Metadata.GetName(), - "namespace": i.Metadata.GetNamespace(), - } - for _, ingress := range i.GetStatus().GetLoadBalancer().GetIngress() { tags["hostname"] = ingress.GetHostname() tags["ip"] = ingress.GetIp() diff --git a/plugins/inputs/kube_inventory/ingress_test.go b/plugins/inputs/kube_inventory/ingress_test.go new file mode 100644 index 0000000000000..55369dea9887b --- /dev/null +++ b/plugins/inputs/kube_inventory/ingress_test.go @@ -0,0 +1,142 @@ +package kube_inventory + +import ( + "testing" + "time" + + "github.com/ericchiang/k8s/apis/core/v1" + v1beta1EXT "github.com/ericchiang/k8s/apis/extensions/v1beta1" + metav1 "github.com/ericchiang/k8s/apis/meta/v1" + "github.com/influxdata/telegraf/testutil" +) + +func TestIngress(t *testing.T) { + cli := &client{} + + now := time.Now() + now = time.Date(now.Year(), now.Month(), now.Day(), now.Hour(), 1, 36, 0, now.Location()) + + tests := []struct { + name string + handler *mockHandler + output *testutil.Accumulator + hasError bool + }{ + { + name: "no ingress", + handler: &mockHandler{ + responseMap: map[string]interface{}{ + "/ingress/": &v1beta1EXT.IngressList{}, + }, + }, + hasError: false, + }, + { + name: "collect ingress", + handler: &mockHandler{ + responseMap: map[string]interface{}{ + "/ingress/": &v1beta1EXT.IngressList{ + Items: []*v1beta1EXT.Ingress{ + { + Status: &v1beta1EXT.IngressStatus{ + LoadBalancer: &v1.LoadBalancerStatus{ + Ingress: []*v1.LoadBalancerIngress{ + { + Hostname: toStrPtr("chron-1"), + Ip: toStrPtr("1.0.0.127"), + }, + }, + }, + }, + Spec: &v1beta1EXT.IngressSpec{ + Rules: []*v1beta1EXT.IngressRule{ + { + Host: toStrPtr("ui.internal"), + IngressRuleValue: &v1beta1EXT.IngressRuleValue{ + Http: &v1beta1EXT.HTTPIngressRuleValue{ + Paths: []*v1beta1EXT.HTTPIngressPath{ + { + Path: toStrPtr("/"), + Backend: &v1beta1EXT.IngressBackend{ + ServiceName: toStrPtr("chronografd"), + ServicePort: toIntStrPtrI(8080), + }, + }, + }, + }, + }, + }, + }, + }, + Metadata: &metav1.ObjectMeta{ + Generation: toInt64Ptr(12), + Namespace: toStrPtr("ns1"), + Name: toStrPtr("ui-lb"), + CreationTimestamp: &metav1.Time{Seconds: toInt64Ptr(now.Unix())}, + }, + }, + }, + }, + }, + }, + output: &testutil.Accumulator{ + Metrics: []*testutil.Metric{ + { + Fields: map[string]interface{}{ + "tls": false, + "backend_service_port": int32(8080), + "generation": int64(12), + "created": now.UnixNano(), + }, + Tags: map[string]string{ + "ingress_name": "ui-lb", + "namespace": "ns1", + "ip": "1.0.0.127", + "hostname": "chron-1", + "backend_service_name": "chronografd", + "host": "ui.internal", + "path": "/", + }, + }, + }, + }, + hasError: false, + }, + } + + for _, v := range tests { + ks := &KubernetesInventory{ + client: cli, + } + acc := new(testutil.Accumulator) + for _, ingress := range ((v.handler.responseMap["/ingress/"]).(*v1beta1EXT.IngressList)).Items { + err := ks.gatherIngress(*ingress, acc) + if err != nil { + t.Errorf("Failed to gather ingress - %s", err.Error()) + } + } + + err := acc.FirstError() + if err == nil && v.hasError { + t.Fatalf("%s failed, should have error", v.name) + } else if err != nil && !v.hasError { + t.Fatalf("%s failed, err: %v", v.name, err) + } + if v.output == nil && len(acc.Metrics) > 0 { + t.Fatalf("%s: collected extra data", v.name) + } else if v.output != nil && len(v.output.Metrics) > 0 { + for i := range v.output.Metrics { + for k, m := range v.output.Metrics[i].Tags { + if acc.Metrics[i].Tags[k] != m { + t.Fatalf("%s: tag %s metrics unmatch Expected %s, got '%v'\n", v.name, k, m, acc.Metrics[i].Tags[k]) + } + } + for k, m := range v.output.Metrics[i].Fields { + if acc.Metrics[i].Fields[k] != m { + t.Fatalf("%s: field %s metrics unmatch Expected %v(%T), got %v(%T)\n", v.name, k, m, m, acc.Metrics[i].Fields[k], acc.Metrics[i].Fields[k]) + } + } + } + } + } +} From 6dc4647d05eddcbde5dc7c071c3016d7c267ccd4 Mon Sep 17 00:00:00 2001 From: greg linton Date: Tue, 16 Jul 2019 12:38:41 -0600 Subject: [PATCH 10/11] Test services --- plugins/inputs/kube_inventory/service.go | 70 ++-------- plugins/inputs/kube_inventory/service_test.go | 123 ++++++++++++++++++ 2 files changed, 136 insertions(+), 57 deletions(-) create mode 100644 plugins/inputs/kube_inventory/service_test.go diff --git a/plugins/inputs/kube_inventory/service.go b/plugins/inputs/kube_inventory/service.go index 6369068171d99..4b0cc08452e23 100644 --- a/plugins/inputs/kube_inventory/service.go +++ b/plugins/inputs/kube_inventory/service.go @@ -16,7 +16,7 @@ func collectServices(ctx context.Context, acc telegraf.Accumulator, ki *Kubernet return } for _, i := range list.Items { - if err = ki.gatherServiceWithExternIps(*i, acc); err != nil { + if err = ki.gatherService(*i, acc); err != nil { acc.AddError(err) return } @@ -38,64 +38,10 @@ func (ki *KubernetesInventory) gatherService(s v1.Service, acc telegraf.Accumula "namespace": s.Metadata.GetNamespace(), } - for _, port := range s.GetSpec().GetPorts() { - fields["port"] = port.GetPort() - fields["target_port"] = port.GetTargetPort() - - tags["port_name"] = port.GetName() - tags["port_protocol"] = port.GetProtocol() - - if s.GetSpec().GetType() == "ExternalName" { - tags["external_name"] = s.GetSpec().GetExternalName() - } else { - tags["cluster_ip"] = s.GetSpec().GetClusterIP() - } - - acc.AddFields(serviceMeasurement, fields, tags) - } - - return nil -} - -// todo: do we want to add cardinality and collect external_ips? -func (ki *KubernetesInventory) gatherServiceWithExternIps(s v1.Service, acc telegraf.Accumulator) error { - if s.Metadata.CreationTimestamp.GetSeconds() == 0 && s.Metadata.CreationTimestamp.GetNanos() == 0 { - return nil - } - - fields := map[string]interface{}{ - "created": time.Unix(s.Metadata.CreationTimestamp.GetSeconds(), int64(s.Metadata.CreationTimestamp.GetNanos())).UnixNano(), - "generation": s.Metadata.GetGeneration(), - } - - tags := map[string]string{ - "service_name": s.Metadata.GetName(), - "namespace": s.Metadata.GetNamespace(), - } - - if externIPs := s.GetSpec().GetExternalIPs(); externIPs != nil { - for _, ip := range externIPs { - tags["ip"] = ip - - for _, port := range s.GetSpec().GetPorts() { - fields["port"] = port.GetPort() - fields["target_port"] = port.GetTargetPort() - - tags["port_name"] = port.GetName() - tags["port_protocol"] = port.GetProtocol() - - if s.GetSpec().GetType() == "ExternalName" { - tags["external_name"] = s.GetSpec().GetExternalName() - } else { - tags["cluster_ip"] = s.GetSpec().GetClusterIP() - } - - acc.AddFields(serviceMeasurement, fields, tags) - } - } - } else { + var getPorts = func() { for _, port := range s.GetSpec().GetPorts() { fields["port"] = port.GetPort() + fields["target_port"] = port.GetTargetPort().GetIntVal() tags["port_name"] = port.GetName() tags["port_protocol"] = port.GetProtocol() @@ -110,5 +56,15 @@ func (ki *KubernetesInventory) gatherServiceWithExternIps(s v1.Service, acc tele } } + if externIPs := s.GetSpec().GetExternalIPs(); externIPs != nil { + for _, ip := range externIPs { + tags["ip"] = ip + + getPorts() + } + } else { + getPorts() + } + return nil } diff --git a/plugins/inputs/kube_inventory/service_test.go b/plugins/inputs/kube_inventory/service_test.go new file mode 100644 index 0000000000000..6c0c8787adb8e --- /dev/null +++ b/plugins/inputs/kube_inventory/service_test.go @@ -0,0 +1,123 @@ +package kube_inventory + +import ( + "testing" + "time" + + "github.com/ericchiang/k8s/apis/core/v1" + metav1 "github.com/ericchiang/k8s/apis/meta/v1" + "github.com/influxdata/telegraf/testutil" +) + +func TestService(t *testing.T) { + cli := &client{} + + now := time.Now() + now = time.Date(now.Year(), now.Month(), now.Day(), now.Hour(), 1, 36, 0, now.Location()) + + tests := []struct { + name string + handler *mockHandler + output *testutil.Accumulator + hasError bool + }{ + { + name: "no service", + handler: &mockHandler{ + responseMap: map[string]interface{}{ + "/service/": &v1.ServiceList{}, + }, + }, + hasError: false, + }, + { + name: "collect service", + handler: &mockHandler{ + responseMap: map[string]interface{}{ + "/service/": &v1.ServiceList{ + Items: []*v1.Service{ + { + Spec: &v1.ServiceSpec{ + Ports: []*v1.ServicePort{ + { + Port: toInt32Ptr(8080), + TargetPort: toIntStrPtrI(1234), + Name: toStrPtr("diagnostic"), + Protocol: toStrPtr("TCP"), + }, + }, + ExternalIPs: []string{"1.0.0.127"}, + ClusterIP: toStrPtr("127.0.0.1"), + }, + Metadata: &metav1.ObjectMeta{ + Generation: toInt64Ptr(12), + Namespace: toStrPtr("ns1"), + Name: toStrPtr("checker"), + CreationTimestamp: &metav1.Time{Seconds: toInt64Ptr(now.Unix())}, + }, + }, + }, + }, + }, + }, + + output: &testutil.Accumulator{ + Metrics: []*testutil.Metric{ + { + Fields: map[string]interface{}{ + "port": int32(8080), + "target_port": int32(1234), + "generation": int64(12), + "created": now.UnixNano(), + }, + Tags: map[string]string{ + "service_name": "checker", + "namespace": "ns1", + "port_name": "diagnostic", + "port_protocol": "TCP", + "cluster_ip": "127.0.0.1", + "ip": "1.0.0.127", + }, + }, + }, + }, + hasError: false, + }, + } + + for _, v := range tests { + ks := &KubernetesInventory{ + client: cli, + } + acc := new(testutil.Accumulator) + for _, service := range ((v.handler.responseMap["/service/"]).(*v1.ServiceList)).Items { + err := ks.gatherService(*service, acc) + if err != nil { + t.Errorf("Failed to gather service - %s", err.Error()) + } + } + + err := acc.FirstError() + if err == nil && v.hasError { + t.Fatalf("%s failed, should have error", v.name) + } else if err != nil && !v.hasError { + t.Fatalf("%s failed, err: %v", v.name, err) + } + if v.output == nil && len(acc.Metrics) > 0 { + t.Fatalf("%s: collected extra data", v.name) + } else if v.output != nil && len(v.output.Metrics) > 0 { + for i := range v.output.Metrics { + for k, m := range v.output.Metrics[i].Tags { + if acc.Metrics[i].Tags[k] != m { + t.Fatalf("%s: tag %s metrics unmatch Expected %s, got '%v'\n", v.name, k, m, acc.Metrics[i].Tags[k]) + } + } + for k, m := range v.output.Metrics[i].Fields { + if acc.Metrics[i].Fields[k] != m { + t.Fatalf("%s: field %s metrics unmatch Expected %v(%T), got %v(%T)\n", v.name, k, m, m, acc.Metrics[i].Fields[k], acc.Metrics[i].Fields[k]) + } + } + } + } + } +} From 473efe344965cb7198faf341d60a77335a43928e Mon Sep 17 00:00:00 2001 From: greg linton Date: Tue, 16 Jul 2019 12:39:49 -0600 Subject: [PATCH 11/11] Make fmt --- plugins/inputs/kube_inventory/ingress_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/plugins/inputs/kube_inventory/ingress_test.go b/plugins/inputs/kube_inventory/ingress_test.go index 55369dea9887b..e3b44512cc11f 100644 --- a/plugins/inputs/kube_inventory/ingress_test.go +++ b/plugins/inputs/kube_inventory/ingress_test.go @@ -83,7 +83,7 @@ func TestIngress(t *testing.T) { Metrics: []*testutil.Metric{ { Fields: map[string]interface{}{ - "tls": false, + "tls": false, "backend_service_port": int32(8080), "generation": int64(12), "created": now.UnixNano(),