Skip to content

Commit

Permalink
feat(inptus.elasticsearch): Gather enrich stats
Browse files Browse the repository at this point in the history
fixes: #15685
  • Loading branch information
powersj committed Jul 30, 2024
1 parent 4e397c6 commit 13e6cad
Show file tree
Hide file tree
Showing 5 changed files with 155 additions and 2 deletions.
5 changes: 4 additions & 1 deletion plugins/inputs/elasticsearch/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,9 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details.
## To work this require local = true
cluster_stats_only_from_master = true

## Gather stats from the enrich API
# enrich_stats = false

## Indices to collect; can be one or more indices names or _all
## Use of wildcards is allowed. Use a wildcard at the end to retrieve index
## names that end with a changing value, like a date.
Expand All @@ -98,7 +101,7 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details.
# tls_key = "/etc/telegraf/key.pem"
## Use TLS but skip chain & host verification
# insecure_skip_verify = false

## If 'use_system_proxy' is set to true, Telegraf will check env vars such as
## HTTP_PROXY, HTTPS_PROXY, and NO_PROXY (or their lowercase counterparts).
## If 'use_system_proxy' is set to false (default) and 'http_proxy_url' is
Expand Down
65 changes: 65 additions & 0 deletions plugins/inputs/elasticsearch/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,23 @@ type clusterHealth struct {
Indices map[string]indexHealth `json:"indices"`
}

type enrichStats struct {
CoordinatorStats []struct {
NodeID string `json:"node_id"`
QueueSize int `json:"queue_size"`
RemoteRequestsCurrent int `json:"remote_requests_current"`
RemoteRequestsTotal int `json:"remote_requests_total"`
ExecutedSearchesTotal int `json:"executed_searches_total"`
} `json:"coordinator_stats"`
CacheStats []struct {
NodeID string `json:"node_id"`
Count int `json:"count"`
Hits int64 `json:"hits"`
Misses int `json:"misses"`
Evictions int `json:"evictions"`
} `json:"cache_stats"`
}

type indexHealth struct {
ActivePrimaryShards int `json:"active_primary_shards"`
ActiveShards int `json:"active_shards"`
Expand Down Expand Up @@ -104,6 +121,7 @@ type Elasticsearch struct {
ClusterHealthLevel string `toml:"cluster_health_level"`
ClusterStats bool `toml:"cluster_stats"`
ClusterStatsOnlyFromMaster bool `toml:"cluster_stats_only_from_master"`
EnrichStats bool `toml:"enrich_stats"`
IndicesInclude []string `toml:"indices_include"`
IndicesLevel string `toml:"indices_level"`
NodeStats []string `toml:"node_stats"`
Expand Down Expand Up @@ -280,6 +298,13 @@ func (e *Elasticsearch) Gather(acc telegraf.Accumulator) error {
}
}
}

if e.EnrichStats {
if err := e.gatherEnrichStats(s+"/_enrich/stats", acc); err != nil {
acc.AddError(errors.New(mask.ReplaceAllString(err.Error(), "http(s)://XXX:XXX@")))
return
}
}
}(serv, acc)
}

Expand Down Expand Up @@ -440,6 +465,46 @@ func (e *Elasticsearch) gatherClusterHealth(url string, acc telegraf.Accumulator
return nil
}

func (e *Elasticsearch) gatherEnrichStats(url string, acc telegraf.Accumulator) error {
enrichStats := &enrichStats{}
if err := e.gatherJSONData(url, enrichStats); err != nil {
return err
}
measurementTime := time.Now()

for _, coordinator := range enrichStats.CoordinatorStats {
coordinatorFields := map[string]interface{}{
"queue_size": coordinator.QueueSize,
"remote_requests_current": coordinator.RemoteRequestsCurrent,
"remote_requests_total": coordinator.RemoteRequestsTotal,
"executed_searches_total": coordinator.ExecutedSearchesTotal,
}
acc.AddFields(
"elasticsearch_enrich_stats_coordinator",
coordinatorFields,
map[string]string{"node_id": coordinator.NodeID},
measurementTime,
)
}

for _, cache := range enrichStats.CacheStats {
cacheFields := map[string]interface{}{
"count": cache.Count,
"hits": cache.Hits,
"misses": cache.Misses,
"evictions": cache.Evictions,
}
acc.AddFields(
"elasticsearch_enrich_stats_cache",
cacheFields,
map[string]string{"node_id": cache.NodeID},
measurementTime,
)
}

return nil
}

func (e *Elasticsearch) gatherClusterStats(url string, acc telegraf.Accumulator) error {
clusterStats := &clusterStats{}
if err := e.gatherJSONData(url, clusterStats); err != nil {
Expand Down
16 changes: 16 additions & 0 deletions plugins/inputs/elasticsearch/elasticsearch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,22 @@ func TestGatherIndividualStats(t *testing.T) {
acc.AssertDoesNotContainsTaggedFields(t, "elasticsearch_breakers", nodestatsBreakersExpected, tags)
}

func TestGatherEnrichStats(t *testing.T) {
es := newElasticsearchWithClient()
es.Servers = []string{"http://example.com:9200"}
es.EnrichStats = true
es.client.Transport = newTransportMock(enrichStatsResponse)
es.serverInfo = make(map[string]serverInfo)
es.serverInfo["http://example.com:9200"] = defaultServerInfo()

var acc testutil.Accumulator
require.NoError(t, acc.GatherError(es.Gather))
require.False(t, es.serverInfo[es.Servers[0]].isMaster(), "IsMaster set incorrectly")

metrics := acc.GetTelegrafMetrics()
require.Len(t, metrics, 8)
}

func TestGatherNodeStats(t *testing.T) {
es := newElasticsearchWithClient()
es.Servers = []string{"http://example.com:9200"}
Expand Down
5 changes: 4 additions & 1 deletion plugins/inputs/elasticsearch/sample.conf
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@
## To work this require local = true
cluster_stats_only_from_master = true

## Gather stats from the enrich API
# enrich_stats = false

## Indices to collect; can be one or more indices names or _all
## Use of wildcards is allowed. Use a wildcard at the end to retrieve index
## names that end with a changing value, like a date.
Expand All @@ -57,7 +60,7 @@
# tls_key = "/etc/telegraf/key.pem"
## Use TLS but skip chain & host verification
# insecure_skip_verify = false

## If 'use_system_proxy' is set to true, Telegraf will check env vars such as
## HTTP_PROXY, HTTPS_PROXY, and NO_PROXY (or their lowercase counterparts).
## If 'use_system_proxy' is set to false (default) and 'http_proxy_url' is
Expand Down
66 changes: 66 additions & 0 deletions plugins/inputs/elasticsearch/testdata_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,72 @@ const clusterHealthResponseWithIndices = `
}
`

const enrichStatsResponse = `
{
"executing_policies": [],
"coordinator_stats": [
{
"node_id": "RWkDKDRu_aV1fISRA7PIkg",
"queue_size": 0,
"remote_requests_current": 0,
"remote_requests_total": 101636700,
"executed_searches_total": 102230925
},
{
"node_id": "2BOvel8nrXRjmSMAMBSUp3",
"queue_size": 0,
"remote_requests_current": 0,
"remote_requests_total": 242051423,
"executed_searches_total": 242752071
},
{
"node_id": "smkOUPQOK1pymt8MCoglZJ",
"queue_size": 0,
"remote_requests_current": 0,
"remote_requests_total": 248009084,
"executed_searches_total": 248735550
},
{
"node_id": "g5EUAaS-6-z5w27OtGQeTI",
"queue_size": 0,
"remote_requests_current": 0,
"remote_requests_total": 233693129,
"executed_searches_total": 234476004
}
],
"cache_stats": [
{
"node_id": "RWkDKDRu_aV1fISRA7PIkg",
"count": 2500,
"hits": 6044497858,
"misses": 102230925,
"evictions": 92663663
},
{
"node_id": "2BOvel8nrXRjmSMAMBSUp3",
"count": 2500,
"hits": 14640821136,
"misses": 242752071,
"evictions": 226826313
},
{
"node_id": "smkOUPQOK1pymt8MCoglZJ",
"count": 2500,
"hits": 14145580115,
"misses": 248735550,
"evictions": 233860968
},
{
"node_id": "g5EUAaS-6-z5w27OtGQeTI",
"count": 2500,
"hits": 11016000946,
"misses": 234476004,
"evictions": 217698127
}
]
}
`

var clusterHealthExpected = map[string]interface{}{
"status": "green",
"status_code": 1,
Expand Down

0 comments on commit 13e6cad

Please sign in to comment.