From 13e6cadaf39bf54ede6e626169ffafc43b23cab9 Mon Sep 17 00:00:00 2001 From: Josh Powers Date: Tue, 30 Jul 2024 10:11:49 -0600 Subject: [PATCH] feat(inptus.elasticsearch): Gather enrich stats fixes: #15685 --- plugins/inputs/elasticsearch/README.md | 5 +- plugins/inputs/elasticsearch/elasticsearch.go | 65 ++++++++++++++++++ .../elasticsearch/elasticsearch_test.go | 16 +++++ plugins/inputs/elasticsearch/sample.conf | 5 +- plugins/inputs/elasticsearch/testdata_test.go | 66 +++++++++++++++++++ 5 files changed, 155 insertions(+), 2 deletions(-) diff --git a/plugins/inputs/elasticsearch/README.md b/plugins/inputs/elasticsearch/README.md index e6e25aad68a83..c75586710a1a8 100644 --- a/plugins/inputs/elasticsearch/README.md +++ b/plugins/inputs/elasticsearch/README.md @@ -74,6 +74,9 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details. ## To work this require local = true cluster_stats_only_from_master = true + ## Gather stats from the enrich API + # enrich_stats = false + ## Indices to collect; can be one or more indices names or _all ## Use of wildcards is allowed. Use a wildcard at the end to retrieve index ## names that end with a changing value, like a date. @@ -98,7 +101,7 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details. # tls_key = "/etc/telegraf/key.pem" ## Use TLS but skip chain & host verification # insecure_skip_verify = false - + ## If 'use_system_proxy' is set to true, Telegraf will check env vars such as ## HTTP_PROXY, HTTPS_PROXY, and NO_PROXY (or their lowercase counterparts). ## If 'use_system_proxy' is set to false (default) and 'http_proxy_url' is diff --git a/plugins/inputs/elasticsearch/elasticsearch.go b/plugins/inputs/elasticsearch/elasticsearch.go index 48e2d4104466f..f5cc5dd7e3504 100644 --- a/plugins/inputs/elasticsearch/elasticsearch.go +++ b/plugins/inputs/elasticsearch/elasticsearch.go @@ -68,6 +68,23 @@ type clusterHealth struct { Indices map[string]indexHealth `json:"indices"` } +type enrichStats struct { + CoordinatorStats []struct { + NodeID string `json:"node_id"` + QueueSize int `json:"queue_size"` + RemoteRequestsCurrent int `json:"remote_requests_current"` + RemoteRequestsTotal int `json:"remote_requests_total"` + ExecutedSearchesTotal int `json:"executed_searches_total"` + } `json:"coordinator_stats"` + CacheStats []struct { + NodeID string `json:"node_id"` + Count int `json:"count"` + Hits int64 `json:"hits"` + Misses int `json:"misses"` + Evictions int `json:"evictions"` + } `json:"cache_stats"` +} + type indexHealth struct { ActivePrimaryShards int `json:"active_primary_shards"` ActiveShards int `json:"active_shards"` @@ -104,6 +121,7 @@ type Elasticsearch struct { ClusterHealthLevel string `toml:"cluster_health_level"` ClusterStats bool `toml:"cluster_stats"` ClusterStatsOnlyFromMaster bool `toml:"cluster_stats_only_from_master"` + EnrichStats bool `toml:"enrich_stats"` IndicesInclude []string `toml:"indices_include"` IndicesLevel string `toml:"indices_level"` NodeStats []string `toml:"node_stats"` @@ -280,6 +298,13 @@ func (e *Elasticsearch) Gather(acc telegraf.Accumulator) error { } } } + + if e.EnrichStats { + if err := e.gatherEnrichStats(s+"/_enrich/stats", acc); err != nil { + acc.AddError(errors.New(mask.ReplaceAllString(err.Error(), "http(s)://XXX:XXX@"))) + return + } + } }(serv, acc) } @@ -440,6 +465,46 @@ func (e *Elasticsearch) gatherClusterHealth(url string, acc telegraf.Accumulator return nil } +func (e *Elasticsearch) gatherEnrichStats(url string, acc telegraf.Accumulator) error { + enrichStats := &enrichStats{} + if err := e.gatherJSONData(url, enrichStats); err != nil { + return err + } + measurementTime := time.Now() + + for _, coordinator := range enrichStats.CoordinatorStats { + coordinatorFields := map[string]interface{}{ + "queue_size": coordinator.QueueSize, + "remote_requests_current": coordinator.RemoteRequestsCurrent, + "remote_requests_total": coordinator.RemoteRequestsTotal, + "executed_searches_total": coordinator.ExecutedSearchesTotal, + } + acc.AddFields( + "elasticsearch_enrich_stats_coordinator", + coordinatorFields, + map[string]string{"node_id": coordinator.NodeID}, + measurementTime, + ) + } + + for _, cache := range enrichStats.CacheStats { + cacheFields := map[string]interface{}{ + "count": cache.Count, + "hits": cache.Hits, + "misses": cache.Misses, + "evictions": cache.Evictions, + } + acc.AddFields( + "elasticsearch_enrich_stats_cache", + cacheFields, + map[string]string{"node_id": cache.NodeID}, + measurementTime, + ) + } + + return nil +} + func (e *Elasticsearch) gatherClusterStats(url string, acc telegraf.Accumulator) error { clusterStats := &clusterStats{} if err := e.gatherJSONData(url, clusterStats); err != nil { diff --git a/plugins/inputs/elasticsearch/elasticsearch_test.go b/plugins/inputs/elasticsearch/elasticsearch_test.go index 1ed61e731ce1f..2ee1160492fdb 100644 --- a/plugins/inputs/elasticsearch/elasticsearch_test.go +++ b/plugins/inputs/elasticsearch/elasticsearch_test.go @@ -100,6 +100,22 @@ func TestGatherIndividualStats(t *testing.T) { acc.AssertDoesNotContainsTaggedFields(t, "elasticsearch_breakers", nodestatsBreakersExpected, tags) } +func TestGatherEnrichStats(t *testing.T) { + es := newElasticsearchWithClient() + es.Servers = []string{"http://example.com:9200"} + es.EnrichStats = true + es.client.Transport = newTransportMock(enrichStatsResponse) + es.serverInfo = make(map[string]serverInfo) + es.serverInfo["http://example.com:9200"] = defaultServerInfo() + + var acc testutil.Accumulator + require.NoError(t, acc.GatherError(es.Gather)) + require.False(t, es.serverInfo[es.Servers[0]].isMaster(), "IsMaster set incorrectly") + + metrics := acc.GetTelegrafMetrics() + require.Len(t, metrics, 8) +} + func TestGatherNodeStats(t *testing.T) { es := newElasticsearchWithClient() es.Servers = []string{"http://example.com:9200"} diff --git a/plugins/inputs/elasticsearch/sample.conf b/plugins/inputs/elasticsearch/sample.conf index d8196d1a782d4..759017f639a73 100644 --- a/plugins/inputs/elasticsearch/sample.conf +++ b/plugins/inputs/elasticsearch/sample.conf @@ -33,6 +33,9 @@ ## To work this require local = true cluster_stats_only_from_master = true + ## Gather stats from the enrich API + # enrich_stats = false + ## Indices to collect; can be one or more indices names or _all ## Use of wildcards is allowed. Use a wildcard at the end to retrieve index ## names that end with a changing value, like a date. @@ -57,7 +60,7 @@ # tls_key = "/etc/telegraf/key.pem" ## Use TLS but skip chain & host verification # insecure_skip_verify = false - + ## If 'use_system_proxy' is set to true, Telegraf will check env vars such as ## HTTP_PROXY, HTTPS_PROXY, and NO_PROXY (or their lowercase counterparts). ## If 'use_system_proxy' is set to false (default) and 'http_proxy_url' is diff --git a/plugins/inputs/elasticsearch/testdata_test.go b/plugins/inputs/elasticsearch/testdata_test.go index 69224ea02f6b7..a0928f57e2dc9 100644 --- a/plugins/inputs/elasticsearch/testdata_test.go +++ b/plugins/inputs/elasticsearch/testdata_test.go @@ -62,6 +62,72 @@ const clusterHealthResponseWithIndices = ` } ` +const enrichStatsResponse = ` +{ + "executing_policies": [], + "coordinator_stats": [ + { + "node_id": "RWkDKDRu_aV1fISRA7PIkg", + "queue_size": 0, + "remote_requests_current": 0, + "remote_requests_total": 101636700, + "executed_searches_total": 102230925 + }, + { + "node_id": "2BOvel8nrXRjmSMAMBSUp3", + "queue_size": 0, + "remote_requests_current": 0, + "remote_requests_total": 242051423, + "executed_searches_total": 242752071 + }, + { + "node_id": "smkOUPQOK1pymt8MCoglZJ", + "queue_size": 0, + "remote_requests_current": 0, + "remote_requests_total": 248009084, + "executed_searches_total": 248735550 + }, + { + "node_id": "g5EUAaS-6-z5w27OtGQeTI", + "queue_size": 0, + "remote_requests_current": 0, + "remote_requests_total": 233693129, + "executed_searches_total": 234476004 + } + ], + "cache_stats": [ + { + "node_id": "RWkDKDRu_aV1fISRA7PIkg", + "count": 2500, + "hits": 6044497858, + "misses": 102230925, + "evictions": 92663663 + }, + { + "node_id": "2BOvel8nrXRjmSMAMBSUp3", + "count": 2500, + "hits": 14640821136, + "misses": 242752071, + "evictions": 226826313 + }, + { + "node_id": "smkOUPQOK1pymt8MCoglZJ", + "count": 2500, + "hits": 14145580115, + "misses": 248735550, + "evictions": 233860968 + }, + { + "node_id": "g5EUAaS-6-z5w27OtGQeTI", + "count": 2500, + "hits": 11016000946, + "misses": 234476004, + "evictions": 217698127 + } + ] +} +` + var clusterHealthExpected = map[string]interface{}{ "status": "green", "status_code": 1,