diff --git a/plugins/inputs/kube_inventory/README.md b/plugins/inputs/kube_inventory/README.md index 8f76fb33c049e..be28fedd5f220 100644 --- a/plugins/inputs/kube_inventory/README.md +++ b/plugins/inputs/kube_inventory/README.md @@ -52,6 +52,9 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details. ## If empty in-cluster config with POD's service account token will be used. # url = "" + ## URL for the kubelet, if set it will be used to collect the pods resource metrics + # url_kubelet = "http://127.0.0.1:10255" + ## Namespace to use. Set to "" to use all namespaces. # namespace = "default" diff --git a/plugins/inputs/kube_inventory/client.go b/plugins/inputs/kube_inventory/client.go index ef854a92f999f..6673c857b1419 100644 --- a/plugins/inputs/kube_inventory/client.go +++ b/plugins/inputs/kube_inventory/client.go @@ -2,6 +2,7 @@ package kube_inventory import ( "context" + "net/http" "time" appsv1 "k8s.io/api/apps/v1" @@ -12,6 +13,7 @@ import ( "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" + "github.com/influxdata/telegraf/config" "github.com/influxdata/telegraf/plugins/common/tls" ) @@ -22,16 +24,16 @@ type client struct { } func newClient(baseURL, namespace, bearerTokenFile string, bearerToken string, timeout time.Duration, tlsConfig tls.ClientConfig) (*client, error) { - var config *rest.Config + var clientConfig *rest.Config var err error if baseURL == "" { - config, err = rest.InClusterConfig() + clientConfig, err = rest.InClusterConfig() if err != nil { return nil, err } } else { - config = &rest.Config{ + clientConfig = &rest.Config{ TLSClientConfig: rest.TLSClientConfig{ ServerName: tlsConfig.ServerName, Insecure: tlsConfig.InsecureSkipVerify, @@ -44,13 +46,13 @@ func newClient(baseURL, namespace, bearerTokenFile string, bearerToken string, t } if bearerTokenFile != "" { - config.BearerTokenFile = bearerTokenFile + clientConfig.BearerTokenFile = bearerTokenFile } else if bearerToken != "" { - config.BearerToken = bearerToken + clientConfig.BearerToken = bearerToken } } - c, err := kubernetes.NewForConfig(config) + c, err := kubernetes.NewForConfig(clientConfig) if err != nil { return nil, err } @@ -62,6 +64,21 @@ func newClient(baseURL, namespace, bearerTokenFile string, bearerToken string, t }, nil } +func newHTTPClient(tlsConfig tls.ClientConfig, bearerTokenFile string, responseTimeout config.Duration) (*http.Client, error) { + tlsCfg, err := tlsConfig.TLSConfig() + if err != nil { + return nil, err + } + clientConfig := &rest.Config{ + Transport: &http.Transport{ + TLSClientConfig: tlsCfg, + }, + ContentConfig: rest.ContentConfig{}, + Timeout: time.Duration(responseTimeout), + BearerTokenFile: bearerTokenFile, + } + return rest.HTTPClientFor(clientConfig) +} func (c *client) getDaemonSets(ctx context.Context) (*appsv1.DaemonSetList, error) { ctx, cancel := context.WithTimeout(ctx, c.timeout) defer cancel() @@ -111,6 +128,7 @@ func (c *client) getPersistentVolumeClaims(ctx context.Context) (*corev1.Persist func (c *client) getPods(ctx context.Context, nodeName string) (*corev1.PodList, error) { ctx, cancel := context.WithTimeout(ctx, c.timeout) defer cancel() + var fieldSelector string if nodeName != "" { fieldSelector = "spec.nodeName=" + nodeName diff --git a/plugins/inputs/kube_inventory/kube_inventory.go b/plugins/inputs/kube_inventory/kube_inventory.go index d578f825ef7f8..b25a264de80c8 100644 --- a/plugins/inputs/kube_inventory/kube_inventory.go +++ b/plugins/inputs/kube_inventory/kube_inventory.go @@ -4,7 +4,9 @@ package kube_inventory import ( "context" _ "embed" + "encoding/json" "fmt" + "net/http" "strconv" "sync" "time" @@ -28,6 +30,7 @@ const ( // KubernetesInventory represents the config object for the plugin. type KubernetesInventory struct { URL string `toml:"url"` + KubeletURL string `toml:"url_kubelet"` BearerToken string `toml:"bearer_token"` BearerTokenString string `toml:"bearer_token_string" deprecated:"1.24.0;use 'BearerToken' with a file instead"` Namespace string `toml:"namespace"` @@ -36,13 +39,15 @@ type KubernetesInventory struct { ResourceInclude []string `toml:"resource_include"` MaxConfigMapAge config.Duration `toml:"max_config_map_age"` - SelectorInclude []string `toml:"selector_include"` - SelectorExclude []string `toml:"selector_exclude"` - NodeName string `toml:"node_name"` - Log telegraf.Logger `toml:"-"` + SelectorInclude []string `toml:"selector_include"` + SelectorExclude []string `toml:"selector_exclude"` + + NodeName string `toml:"node_name"` + Log telegraf.Logger `toml:"-"` tls.ClientConfig - client *client + client *client + httpClient *http.Client selectorFilter filter.Filter } @@ -67,7 +72,17 @@ func (ki *KubernetesInventory) Init() error { if err != nil { return err } + if ki.ResponseTimeout < config.Duration(time.Second) { + ki.ResponseTimeout = config.Duration(time.Second * 5) + } + // Only create an http client if we have a kubelet url + if ki.KubeletURL != "" { + ki.httpClient, err = newHTTPClient(ki.ClientConfig, ki.BearerToken, ki.ResponseTimeout) + if err != nil { + ki.Log.Warnf("unable to create http client: %v", err) + } + } return nil } @@ -140,6 +155,28 @@ func (ki *KubernetesInventory) convertQuantity(s string, m float64) int64 { } return int64(f * m) } +func (ki *KubernetesInventory) queryPodsFromKubelet(url string, v interface{}) error { + req, err := http.NewRequest("GET", url, nil) + if err != nil { + return fmt.Errorf("creating new http request for url %s failed: %w", url, err) + } + + req.Header.Add("Accept", "application/json") + resp, err := ki.httpClient.Do(req) + if err != nil { + return fmt.Errorf("error making HTTP request to %q: %w", url, err) + } + defer resp.Body.Close() + if resp.StatusCode != http.StatusOK { + return fmt.Errorf("%s returned HTTP status %s", url, resp.Status) + } + + if err := json.NewDecoder(resp.Body).Decode(v); err != nil { + return fmt.Errorf("error parsing response: %w", err) + } + + return nil +} func (ki *KubernetesInventory) createSelectorFilters() error { selectorFilter, err := filter.NewIncludeExcludeFilter(ki.SelectorInclude, ki.SelectorExclude) diff --git a/plugins/inputs/kube_inventory/pod.go b/plugins/inputs/kube_inventory/pod.go index 56e6113b4f7ee..960ef1f68b8e0 100644 --- a/plugins/inputs/kube_inventory/pod.go +++ b/plugins/inputs/kube_inventory/pod.go @@ -2,6 +2,7 @@ package kube_inventory import ( "context" + "fmt" "strings" corev1 "k8s.io/api/core/v1" @@ -10,13 +11,21 @@ import ( ) func collectPods(ctx context.Context, acc telegraf.Accumulator, ki *KubernetesInventory) { - list, err := ki.client.getPods(ctx, ki.NodeName) + var list corev1.PodList + listRef := &list + var err error + + if ki.KubeletURL != "" { + err = ki.queryPodsFromKubelet(fmt.Sprintf("%s/pods", ki.KubeletURL), listRef) + } else { + listRef, err = ki.client.getPods(ctx, ki.NodeName) + } if err != nil { acc.AddError(err) return } - for _, p := range list.Items { + for _, p := range listRef.Items { ki.gatherPod(p, acc) } } diff --git a/plugins/inputs/kube_inventory/sample.conf b/plugins/inputs/kube_inventory/sample.conf index 2dfaaf9b73aa1..4c060fb291465 100644 --- a/plugins/inputs/kube_inventory/sample.conf +++ b/plugins/inputs/kube_inventory/sample.conf @@ -4,6 +4,9 @@ ## If empty in-cluster config with POD's service account token will be used. # url = "" + ## URL for the kubelet, if set it will be used to collect the pods resource metrics + # url_kubelet = "http://127.0.0.1:10255" + ## Namespace to use. Set to "" to use all namespaces. # namespace = "default"