From 8b602401fe57139cd1cfae47469befaa977ccaab Mon Sep 17 00:00:00 2001 From: Adrian Lopez Date: Mon, 7 Aug 2023 17:20:21 +0200 Subject: [PATCH] feat: add Zabbix output plugin Add output plugin to support sending metrics to Zabbix (https://www.zabbix.com/). This output plugin handle sending metrics as traps, generating LLD data to feed discovery rules and is able to send autoregistration requests. --- docs/LICENSE_OF_DEPENDENCIES.md | 1 + go.mod | 1 + go.sum | 2 + plugins/outputs/all/zabbix.go | 5 + plugins/outputs/zabbix/README.md | 406 +++++++ plugins/outputs/zabbix/autoregister.go | 38 + plugins/outputs/zabbix/autoregister_test.go | 122 ++ plugins/outputs/zabbix/lld.go | 234 ++++ plugins/outputs/zabbix/lld_test.go | 1137 +++++++++++++++++++ plugins/outputs/zabbix/sample.conf | 33 + plugins/outputs/zabbix/zabbix.go | 240 ++++ plugins/outputs/zabbix/zabbix_test.go | 920 +++++++++++++++ 12 files changed, 3139 insertions(+) create mode 100644 plugins/outputs/all/zabbix.go create mode 100644 plugins/outputs/zabbix/README.md create mode 100644 plugins/outputs/zabbix/autoregister.go create mode 100644 plugins/outputs/zabbix/autoregister_test.go create mode 100644 plugins/outputs/zabbix/lld.go create mode 100644 plugins/outputs/zabbix/lld_test.go create mode 100644 plugins/outputs/zabbix/sample.conf create mode 100644 plugins/outputs/zabbix/zabbix.go create mode 100644 plugins/outputs/zabbix/zabbix_test.go diff --git a/docs/LICENSE_OF_DEPENDENCIES.md b/docs/LICENSE_OF_DEPENDENCIES.md index 16c87e5b29dbc..5129c5692b332 100644 --- a/docs/LICENSE_OF_DEPENDENCIES.md +++ b/docs/LICENSE_OF_DEPENDENCIES.md @@ -108,6 +108,7 @@ following works: - github.com/cpuguy83/dockercfg [MIT License](https://github.com/cpuguy83/dockercfg/blob/main/LICENSE) - github.com/cpuguy83/go-md2man [MIT License](https://github.com/cpuguy83/go-md2man/blob/master/LICENSE.md) - github.com/danieljoos/wincred [MIT License](https://github.com/danieljoos/wincred/blob/master/LICENSE) +- github.com/datadope-io/go-zabbix [MIT License](https://github.com/datadope-io/go-zabbix/blob/master/LICENSE) - github.com/davecgh/go-spew [ISC License](https://github.com/davecgh/go-spew/blob/master/LICENSE) - github.com/devigned/tab [MIT License](https://github.com/devigned/tab/blob/master/LICENSE) - github.com/dgryski/go-rendezvous [MIT License](https://github.com/dgryski/go-rendezvous/blob/master/LICENSE) diff --git a/go.mod b/go.mod index 249996ad4446e..c023158d89665 100644 --- a/go.mod +++ b/go.mod @@ -65,6 +65,7 @@ require ( github.com/coreos/go-systemd v0.0.0-20190719114852-fd7a80b32e1f github.com/coreos/go-systemd/v22 v22.5.0 github.com/couchbase/go-couchbase v0.1.1 + github.com/datadope-io/go-zabbix/v2 v2.0.1 github.com/digitalocean/go-libvirt v0.0.0-20221205150000-2939327a8519 github.com/dimchansky/utfbom v1.1.1 github.com/djherbis/times v1.6.0 diff --git a/go.sum b/go.sum index 569d4c67315d0..288176ba90aba 100644 --- a/go.sum +++ b/go.sum @@ -1046,6 +1046,8 @@ github.com/creack/pty v1.1.18/go.mod h1:MOBLtS5ELjhRRrroQr9kyvTxUAFNvYEK993ew/Vr github.com/cyphar/filepath-securejoin v0.2.3/go.mod h1:aPGpWjXOXUn2NCNjFvBE6aRxGGx79pTxQpKOJNYHHl4= github.com/danieljoos/wincred v1.2.0 h1:ozqKHaLK0W/ii4KVbbvluM91W2H3Sh0BncbUNPS7jLE= github.com/danieljoos/wincred v1.2.0/go.mod h1:FzQLLMKBFdvu+osBrnFODiv32YGwCfx0SkRa/eYHgec= +github.com/datadope-io/go-zabbix/v2 v2.0.1 h1:kGlyzfFqbwhMph4Mo0hpYxxBHI14eHuV5TVy+7uNonE= +github.com/datadope-io/go-zabbix/v2 v2.0.1/go.mod h1:hRbQWszykTUPoR6g5fJXfNwPFZpP3nDNSZ9HrEKuKCM= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM= diff --git a/plugins/outputs/all/zabbix.go b/plugins/outputs/all/zabbix.go new file mode 100644 index 0000000000000..2cef3d1560d6c --- /dev/null +++ b/plugins/outputs/all/zabbix.go @@ -0,0 +1,5 @@ +//go:build !custom || outputs || outputs.zabbix + +package all + +import _ "github.com/influxdata/telegraf/plugins/outputs/zabbix" // register plugin diff --git a/plugins/outputs/zabbix/README.md b/plugins/outputs/zabbix/README.md new file mode 100644 index 0000000000000..6d40db5c6ec9e --- /dev/null +++ b/plugins/outputs/zabbix/README.md @@ -0,0 +1,406 @@ +# Zabbix Output Plugin + +This plugin send metrics to [Zabbix](https://www.zabbix.com/) via +[traps][traps]. + +It has been tested with versions +[3.0](https://www.zabbix.com/documentation/3.0/en/manual/appendix/items/trapper) +, +[4.0](https://www.zabbix.com/documentation/4.0/en/manual/appendix/items/trapper) +and +[6.0](https://www.zabbix.com/documentation/6.0/en/manual/appendix/items/trapper) +. + +[traps]: https://www.zabbix.com/documentation/current/en/manual/appendix/items/trapper + +It should work with newer versions as long as Zabbix does not change the +protocol. + +## Global configuration options + +In addition to the plugin-specific configuration settings, plugins support +additional global and plugin configuration settings. These settings are used to +modify metrics, tags, and field or create aliases and configure ordering, etc. +See the [CONFIGURATION.md][CONFIGURATION.md] for more details. + +[CONFIGURATION.md]: ../../../docs/CONFIGURATION.md#plugins + +## Configuration + +```toml @sample.conf +# Send metrics to Zabbix +[[outputs.zabbix]] + ## Address and (optional) port of the Zabbix server + address = "zabbix.example.com:10051" + + ## Send metrics as type "Zabbix agent (active)" + # agent_active = false + + ## Add prefix to all keys sent to Zabbix. + # prefix = "telegraf." + + ## Name of the tag that contains the host name + # host_tag = "host" + + ## Skip measurement prefix to all keys sent to Zabbix. + # skip_measurement_prefix = false + + ## This field will be sent as HostMetadata to Zabbix Server to autoregister the host. + ## To enable this feature, this option must be set to a value other than "". + # autoregister = "" + + ## Interval to resend auto-registration data to Zabbix. + ## Only applies if autoregister feature is enabled. + ## This value is a lower limit, the actual resend should be triggered by the next flush interval. + # autoregister_resend_interval = "30m" + + ## Interval to send LLD data to Zabbix. + ## This value is a lower limit, the actual resend should be triggered by the next flush interval. + # lld_send_interval = "10m" + + ## Interval to delete stored LLD known data and start capturing it again. + ## This value is a lower limit, the actual resend should be triggered by the next flush interval. + # lld_clear_interval = "1h" +``` + +### agent_active + +The `request` value in the package sent to Zabbix should be different if the +items configured in Zabbix are [Zabbix trapper][zabbixtrapper] or +[Zabbix agent (active)][zabbixagentactive]. + +`agent_active = false` will send data as _sender data_, expecting trapper items. + +`agent_active = true` will send data as _agent data_, expecting active Zabbix +agent items. + +[zabbixtrapper]: https://www.zabbix.com/documentation/6.4/en/manual/config/items/itemtypes/trapper?hl=Trapper +[zabbixagentactive]: https://www.zabbix.com/documentation/6.4/en/manual/config/items/itemtypes/zabbix_agent + +### prefix + +We can set a prefix that should be added to all Zabbix keys. + +This is configurable with the option `prefix`, set by default to `telegraf.`. + +Example how the configuration `prefix = "telegraf."` will generate the Zabbix +keys given a Telegraf metric: + +```diff +- measurement,host=hostname valueA=0,valueB=1 ++ telegraf.measurement.valueA ++ telegraf.measurement.valueB +``` + +### skip_measurement_prefix + +We can skip the measurement prefix added to all Zabbix keys. + +Example with `skip_measurement_prefix = true"` and `prefix = "telegraf."`: + +```diff +- measurement,host=hostname valueA=0,valueB=1 ++ telegraf.valueA ++ telegraf.valueB +``` + +Example with `skip_measurement_prefix = true"` and `prefix = ""`: + +```diff +- measurement,host=hostname valueA=0,valueB=1 ++ valueA ++ valueB +``` + +### autoregister + +If this field is active, Telegraf will send an +[autoregister request][autoregisterrequest] to Zabbix, using the content of +this field as the [HostMetadata][hostmetadata]. + +One request is sent for each of the different values seen by Telegraf for the +`host` tag. + +[autoregisterrequest]: https://www.zabbix.com/documentation/current/en/manual/discovery/auto_registration?hl=autoregistration +[hostmetadata]: https://www.zabbix.com/documentation/current/en/manual/discovery/auto_registration?hl=autoregistration#using-host-metadata + +### autoregister_resend_interval + +If `autoregister` is defined, this field set the interval at which +autoregister requests are resend to Zabbix. + +The [telegraf interval format][intervals_format] should be used. + +The actual send of the autoregister request will happen in the next output flush +after this interval has been surpassed. + +[intervals_format]: ../../../docs/CONFIGURATION.md#intervals + +### lld_send_interval + +To reduce the number of LLD requests sent to Zabbix (LLD processing is +[expensive][lldexpensive]), this plugin will send only one per +`lld_send_interval`. + +When Telegraf is started, this plugin will start to collect the info needed to +generate this LLD packets (measurements, tags keys and values). + +Once this interval is surpassed, the next flush of this plugin will add the +packet with the LLD data. + +In the next interval, only new, or modified, LLDs will be sent. + +[lldexpensive]: https://www.zabbix.com/documentation/4.2/en/manual/introduction/whatsnew420#:~:text=Daemons-,Separate%20processing%20for%20low%2Dlevel%20discovery,-Processing%20low%2Dlevel + +### lld_clear_interval + +When this interval is surpassed, the next flush will clear all the LLD data +collected. + +This allows this plugin to forget about old data and resend LLDs to Zabbix, in +case the host has new discovery rules or the packet was lost. + +If we have `flush_interval = "1m"`, `lld_send_interval = "10m"` and +`lld_clear_interval = "1h"` and Telegraf is started at 00:00, the first LLD will +be sent at 00:10. At 01:00 the LLD data will be deleted and at 01:10 LLD data +will be resent. + +## Trap format + +For each new metric generated by Telegraf, this output plugin will send one +trap for each field. + +Given this Telegraf metric: + +```text +measurement,host=hostname valueA=0,valueB=1 +``` + +It will generate this Zabbix metrics: + +```json +{"host": "hostname", "key": "telegraf.measurement.valueA", "value": "0"} +{"host": "hostname", "key": "telegraf.measurement.valueB", "value": "1"} +``` + +If the metric has tags (aside from `host`), they will be added, in alphabetical +order using the format for LLD metrics: + +```text +measurement,host=hostname,tagA=keyA,tagB=keyB valueA=0,valueB=1 +``` + +Zabbix generated metrics: + +```json +{"host": "hostname", "key": "telegraf.measurement.valueA[keyA,keyB]", "value": "0"} +{"host": "hostname", "key": "telegraf.measurement.valueB[keyA,keyB]", "value": "1"} +``` + +This order is based on the tags keys, not the tag values, so, for example, this +Telegraf metric: + +```text +measurement,host=hostname,aaaTag=999,zzzTag=111 value=0 +``` + +Will generate this Zabbix metric: + +```json +{"host": "hostname", "key": "telegraf.measurement.value[999,111]", "value": "0"} +``` + +## Zabbix low level discovery + +Zabbix needs an `item` created before receiving any metric. In some cases we do +not know in advance what are we going to send, for example, the name of a +container to send its cpu and memory consumption. + +For this case Zabbix provides [low level discovery][lld] that allow to create +new items dinamically based on the parameters sent by the trap. + +As explained previously, this output plugin will format the Zabbix key using +the tags seen in the Telegraf metric following the LLD format. + +To create those _discovered items_ this plugin uses the same mechanism as the +Zabbix agent, collecting information about which tags has been seen for each +measurement and periodically sending a request to a discovery rule with the +collected data. + +[lld]: https://www.zabbix.com/documentation/current/manual/discovery/low_level_discovery + +### Design + +To explain how everything interconnects we will use an example with the +`net_response` input: + +```toml +[[inputs.net_response]] + protocol = "tcp" + address = "example.com:80" +``` + +This input will generate this metric: + +```text +$ telegraf -config example.conf -test +* Plugin: inputs.net_response, Collection 1 +> net_response,server=example.com,port=80,protocol=tcp,host=myhost result_type="success",response_time=0.091026869 1522741063000000000 +``` + +Here we have four tags: server, port, protocol and host (this one will be +assumed that is always present and treated differently). + +The values those three parameters could take are unknown to Zabbix, so we +cannot create trappers items in Zabbix to receive that values (at least without +mixing that metric with another `net_response` metric with different tags). + +To solve this problem we use a discovery rule in Zabbix, that will receive the +different groups of tag values and create the traps to gather the metrics. + +This plugin knows about three tags (excluding host) for the input +`net_response`, therefore it will generate this new Telegraf metric: + +```text +lld.host=myhost net_response.port.protocol.server="{\"data\":[{\"{#PORT}\":\"80\",\"{#PROTOCOL}\":\"tcp\",\"{#SERVER}\":\"example.com\"}]}" +``` + +Once sent, the final package will be: + +```json +{ + "request":"sender data", + "data":[ + { + "host":"myhost", + "key":"telegraf.lld.net_response.port.protocol.server", + "value":"{\"data\":[{\"{#PORT}\":\"80\",\"{#PROTOCOL}\":\"tcp\",\"{#SERVER}\":\"example.com\"}]}", + "clock":1519043805 + } + ], + "clock":1519043805 +} +``` + +The Zabbix key is generated joining `lld`, the input name and tags (keys) +alphabetically sorted. +Some inputs could use different groups of tags for different fields, that is +why the tags are added to the key, to allow having different discovery rules +for the same input. + +The tags used in `value` are changed to uppercase to match the format of Zabbix. + +In the Zabbix server we should have a discovery rule associated with that key +(telegraf.lld.net_response.port.protocol.server) and one item prototype for +each field, in this case `result_type` and `response_time`. + +The item prototypes will be Zabbix trappers with keys (data type should also +match and some values will be better stored as _delta_): + +```text +telegraf.net_response.response_time[{#PORT},{#PROTOCOL},{#SERVER}] +telegraf.net_response.result_type[{#PORT},{#PROTOCOL},{#SERVER}] +``` + +The macros in the item prototypes keys should be alphabetically sorted so they +can match the keys generated by this plugin. + +With that keys and the example trap, the host `myhost` will have two new items: + +```text +telegraf.net_response.response_time[80,tcp,example.com] +telegraf.net_response.result_type[80,tcp,example.com] +``` + +This plugin, for each metric, will send traps to the Zabbix server following +the same structure (INPUT.FIELD[tags sorted]...), filling the items created by +the discovery rule. + +In summary: + +- we need a discovery rule with the correct key and one item prototype for each +field +- this plugin will generate traps to create items based on the metrics seen in +Telegraf +- it will also send the traps to fill the new created items + +### Reducing the number of LLDs + +This plugin remembers which LLDs has been sent to Zabbix and avoid generating +the same metrics again, to avoid the cost of LLD processing in Zabbix. + +It will only send LLD data each `lld_send_interval`. + +But, could happen that package is lost or some host get new discovery rules, so +each `lld_clear_interval` the plugin will forget about the known data and start +collecting again. + +### Note on inputs configuration + +Which tags should expose each input should be controlled, because an unexpected +tag could modify the trap key and will not match the trapper defined in Zabbix. + +For example, in the docker input, each container label is a new tag. + +To control this we can add to the input a config like: + +```toml +taginclude = ["host", "container_name"] +``` + +Allowing only the tags "host" and "container_name" to be used to generate the +key (and loosing the information provided in the others tags). + +## Examples of metrics converted to traps + +### Without tags + +```text +mem,host=myHost available_percent=14.684620843239944,used=14246531072i 152276442800000000 +``` + +```json +{ + "request":"sender data", + "data":[ + { + "host":"myHost", + "key":"telegraf.mem.available_percent", + "value":"14.382719", + "clock":1522764428 + }, + { + "host":"myHost", + "key":"telegraf.mem.used", + "value":"14246531072", + "clock":1522764428 + } + ] +} +``` + +### With tags + +```text +docker_container_net,host=myHost,container_name=laughing_babbage rx_errors=0i,tx_errors=0i 1522764038000000000 +``` + +```json +{ + "request":"sender data", + "data": [ + { + "host":"myHost", + "key":"telegraf.docker_container_net.rx_errors[laughing_babbage]", + "value":"0", + "clock":15227640380 + }, + { + "host":"myHost", + "key":"telegraf.docker_container_net.tx_errors[laughing_babbage]", + "value":"0", + "clock":15227640380 + } + ] +} +``` diff --git a/plugins/outputs/zabbix/autoregister.go b/plugins/outputs/zabbix/autoregister.go new file mode 100644 index 0000000000000..159a01a6fa37a --- /dev/null +++ b/plugins/outputs/zabbix/autoregister.go @@ -0,0 +1,38 @@ +package zabbix + +import ( + "time" +) + +// Add adds a host to the list of hosts to send autoregister data to Zabbix. +// Only store information if autoregister is enabled (config Autoregister is not empty). +func (z *Zabbix) autoregisterAdd(hostname string) { + if z.Autoregister == "" { + return + } + + if _, exists := z.autoregisterLastSend[hostname]; !exists { + z.autoregisterLastSend[hostname] = time.Time{} + } +} + +// Push sends autoregister data to Zabbix for each host. +func (z *Zabbix) autoregisterPush() { + if z.Autoregister == "" { + return + } + + // For each "host" tag seen, send an autoregister request to Zabbix server. + // z.AutoregisterSendPeriod is the interval at which requests are resend. + for hostname, timeLastSend := range z.autoregisterLastSend { + if time.Since(timeLastSend) > time.Duration(z.AutoregisterResendInterval) { + z.Log.Debugf("Autoregistering host %q", hostname) + + if err := z.sender.RegisterHost(hostname, z.Autoregister); err != nil { + z.Log.Errorf("Autoregistering host %q: %v", hostname, err) + } + + z.autoregisterLastSend[hostname] = time.Now() + } + } +} diff --git a/plugins/outputs/zabbix/autoregister_test.go b/plugins/outputs/zabbix/autoregister_test.go new file mode 100644 index 0000000000000..dee621a62c09c --- /dev/null +++ b/plugins/outputs/zabbix/autoregister_test.go @@ -0,0 +1,122 @@ +package zabbix + +import ( + "testing" + "time" + + "github.com/datadope-io/go-zabbix/v2" + "github.com/stretchr/testify/require" + + "github.com/influxdata/telegraf/config" + "github.com/influxdata/telegraf/testutil" +) + +// TestZabbixAutoregisterDisabledAdd tests that Add does not store information if autoregister is disabled. +func TestZabbixAutoregisterDisabledAdd(t *testing.T) { + z := Zabbix{ + autoregisterLastSend: make(map[string]time.Time), + } + + z.autoregisterAdd("hostname") + require.Empty(t, z.autoregisterLastSend) +} + +// TestZabbixAutoregisterEnabledAdd tests that Add stores information if autoregister is enabled. +func TestZabbixAutoregisterEnabledAdd(t *testing.T) { + z := Zabbix{ + Autoregister: "autoregister", + autoregisterLastSend: make(map[string]time.Time), + } + + z.autoregisterAdd("hostname") + require.Len(t, z.autoregisterLastSend, 1) + + require.Contains(t, z.autoregisterLastSend, "hostname") +} + +// TestZabbixAutoregisterPush tests different cases of Push with a table oriented test. +func TestZabbixAutoregisterPush(t *testing.T) { + zabbixSender := &mockZabbixSender{} + z := Zabbix{ + Log: testutil.Logger{}, + AutoregisterResendInterval: config.Duration(1 * time.Second), + autoregisterLastSend: make(map[string]time.Time), + sender: zabbixSender, + } + + // Test that nothing is sent if autoregister is disabled. + z.autoregisterPush() + require.Empty(t, z.autoregisterLastSend) + + // Test that nothing is sent if autoregister is enabled but no host is added. + z.Autoregister = "autoregister" + z.autoregisterPush() + require.Empty(t, z.autoregisterLastSend) + + // Test that autoregister is sent if autoregister is enabled and a host is added. + z.Autoregister = "autoregister" + z.autoregisterAdd("hostname") + z.autoregisterPush() + require.Len(t, z.autoregisterLastSend, 1) + require.Equal(t, "hostname", zabbixSender.hostname) + require.Equal(t, "autoregister", zabbixSender.hostMetadata) + + // Test that autoregister is not sent if the last send was less than AutoregisterResendInterval ago. + z.Autoregister = "autoregister" + z.autoregisterAdd("hostname") + z.autoregisterLastSend["hostname"] = time.Now().Add(time.Hour) + zabbixSender.Reset() + z.autoregisterPush() + require.Len(t, z.autoregisterLastSend, 1) + require.Equal(t, "", zabbixSender.hostname) + require.Equal(t, "", zabbixSender.hostMetadata) + + // Test that autoregister is sent if last send was more than autoregisterSendPeriod ago. + z.Autoregister = "autoregister" + z.autoregisterAdd("hostname") + z.autoregisterLastSend["hostname"] = time.Now().Add(-24 * time.Hour) + zabbixSender.Reset() + z.autoregisterPush() + require.Len(t, z.autoregisterLastSend, 1) + require.Equal(t, "hostname", zabbixSender.hostname) + require.Equal(t, "autoregister", zabbixSender.hostMetadata) +} + +// mockZabbixSender is a mock of ZabbixAutoregisterSender. +type mockZabbixSender struct { + hostname string + hostMetadata string + sendMetrics []*zabbix.Metric + sendPackets []*zabbix.Packet +} + +// Reset resets the mock. +func (m *mockZabbixSender) Reset() { + m.hostname = "" + m.hostMetadata = "" + m.sendMetrics = nil + m.sendPackets = nil +} + +// RegisterHost is a mock of ZabbixAutoregisterSender.RegisterHost. +func (m *mockZabbixSender) RegisterHost(hostname, hostMetadata string) error { + m.hostname = hostname + m.hostMetadata = hostMetadata + + return nil +} + +// RegisterHost is a mock of ZabbixAutoregisterSender.RegisterHost. +func (m *mockZabbixSender) Send(packet *zabbix.Packet) (res zabbix.Response, err error) { + m.sendPackets = append(m.sendPackets, packet) + return zabbix.Response{}, nil +} + +func (m *mockZabbixSender) SendMetrics(metrics []*zabbix.Metric) ( + resActive zabbix.Response, + resTrapper zabbix.Response, + err error, +) { + m.sendMetrics = append(m.sendMetrics, metrics...) + return zabbix.Response{}, zabbix.Response{}, nil +} diff --git a/plugins/outputs/zabbix/lld.go b/plugins/outputs/zabbix/lld.go new file mode 100644 index 0000000000000..9faada1c16169 --- /dev/null +++ b/plugins/outputs/zabbix/lld.go @@ -0,0 +1,234 @@ +package zabbix + +import ( + "encoding/binary" + "encoding/json" + "fmt" + "hash/fnv" + "os" + "sort" + "strings" + "time" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/config" + "github.com/influxdata/telegraf/internal" + "github.com/influxdata/telegraf/metric" +) + +const ( + lldName = "lld" + empty = `{"data":[]}` +) + +type lldInfo struct { + Hostname string + Key string + DataHash uint64 + Data map[uint64]map[string]string +} + +func (i *lldInfo) hash() uint64 { + ids := make([]uint64, 0, len(i.Data)) + for id := range i.Data { + ids = append(ids, id) + } + sort.Slice(ids, func(i, j int) bool { return ids[i] < ids[j] }) + + h := fnv.New64a() + // Write cannot fail + _ = binary.Write(h, internal.HostEndianness, lldSeriesID(i.Hostname, i.Key)) + h.Write([]byte{0}) + _ = binary.Write(h, internal.HostEndianness, ids) + + return h.Sum64() +} + +func (i *lldInfo) metric(hostTag string) (telegraf.Metric, error) { + values := make([]map[string]string, 0, len(i.Data)) + for _, v := range i.Data { + values = append(values, v) + } + data := map[string]interface{}{"data": values} + buf, err := json.Marshal(data) + if err != nil { + return nil, err + } + + return metric.New( + lldName, + map[string]string{ + hostTag: i.Hostname, + }, + map[string]interface{}{ + i.Key: buf, + }, + time.Now(), + ), nil +} + +type zabbixLLD struct { + log telegraf.Logger + + // current is the collection of metrics added during the recent period + current map[uint64]lldInfo + + // previous stores the hashes of metrics received during the previous period + previous map[uint64]lldInfo + + // lastClear store the time of the last clear of the LLD data + lastClear time.Time + + // clearInterval after this number of pushes, all data is considered new. + // The idea behind this parameter is to resend known LLDs with low freq in case + // previous sent was not processed by Zabbix. + clearInterval config.Duration + + // hostTag is the name of the tag that contains the host name + hostTag string +} + +// Push returns a slice of metrics to send to Zabbix with the LLD data using the accumulated info. +// The name of the metric will be always "lld" (const). +// It will have only one tag, with the host. +// It will have an uniq field, with the LLD key as the key name and the JSON data as the value +// Eg.: lld,host=hostA disk.device.fstype.mode.path="{\"data\":[... +func (zl *zabbixLLD) Push() []telegraf.Metric { + metrics := make([]telegraf.Metric, 0, len(zl.current)) + newPrevious := make(map[uint64]lldInfo, len(zl.current)) + + // Iterate over the data collected in the closing period and determine which + // data needs to be send. This can be done by comparing the complete data + // hash with what was previously sent (i.e. during last push). If different, + // send a new LLD. + seen := make(map[uint64]bool, len(zl.current)) + for series, info := range zl.current { + dataHash := info.hash() + + // Get the hash of the complete data and remember the data for next period + newPrevious[series] = lldInfo{ + Hostname: info.Hostname, + Key: info.Key, + DataHash: dataHash, + } + seen[series] = true + + // Skip already sent data + previous, found := zl.previous[series] + if found && previous.DataHash == dataHash { + continue + } + + // For unseen series or series with new tags, we should send/resend + // the data for discovery + m, err := info.metric(zl.hostTag) + if err != nil { + zl.log.Warnf("Marshaling to JSON LLD tags in Zabbix format: %v", err) + continue + } + metrics = append(metrics, m) + } + + // Check if we have seen the LLD in this cycle and send an empty LLD otherwise + for series, info := range zl.previous { + if seen[series] { + continue + } + m := metric.New( + lldName, + map[string]string{ + zl.hostTag: info.Hostname, + }, + map[string]interface{}{ + info.Key: []byte(empty), + }, + time.Now(), + ) + metrics = append(metrics, m) + } + + // Replace "previous" with the data of this period or clear previous + // if enough time has passed + if time.Since(zl.lastClear) < time.Duration(zl.clearInterval) { + zl.previous = newPrevious + } else { + zl.previous = make(map[uint64]lldInfo, len(zl.previous)) + zl.lastClear = time.Now() + } + + // Clear current + zl.current = make(map[uint64]lldInfo, len(zl.current)) + + return metrics +} + +// Add parse a metric and add it to the LLD cache. +func (zl *zabbixLLD) Add(in telegraf.Metric) error { + // Extract all necessary information from the metric + // Get the metric tags. The tag-list is already sorted by key name + tagList := in.TagList() + + // Iterate over the tags and extract + // - the hostname contained in the host tag + // - the key-values of the tags WITHOUT the host tag + // - the LLD-key for sending the metric in the form + // .[,...,] + // - a hash for the metric + var hostname string + keys := make([]string, 0, len(tagList)) + data := make(map[string]string, len(tagList)) + for _, tag := range tagList { + // Extract the host key and skip everything else + if tag.Key == zl.hostTag { + hostname = tag.Value + continue + } + + // Collect the tag keys for generating the lld-key later + if tag.Value != "" { + keys = append(keys, tag.Key) + } + + // Prepare the data for lld-metric + key := "{#" + strings.ToUpper(tag.Key) + "}" + data[key] = tag.Value + } + + if len(keys) == 0 { + // Ignore metrics without tags (apart from the host tag) + return nil + } + key := in.Name() + "." + strings.Join(keys, ".") + + // If hostname is not defined, use the hostname of the system + if hostname == "" { + var err error + hostname, err = os.Hostname() + if err != nil { + return fmt.Errorf("no hostname found and unable to get hostname from system: %w", err) + } + } + + // Try to lookup the Zabbix series in the already received metrics and + // create a new one if not found + series := lldSeriesID(hostname, key) + if _, found := zl.current[series]; !found { + zl.current[series] = lldInfo{ + Hostname: hostname, + Key: key, + Data: make(map[uint64]map[string]string), + } + } + zl.current[series].Data[in.HashID()] = data + + return nil +} + +func lldSeriesID(hostname, key string) uint64 { + h := fnv.New64a() + h.Write([]byte(hostname)) + h.Write([]byte{0}) + h.Write([]byte(key)) + h.Write([]byte{0}) + return h.Sum64() +} diff --git a/plugins/outputs/zabbix/lld_test.go b/plugins/outputs/zabbix/lld_test.go new file mode 100644 index 0000000000000..b76a813adee1b --- /dev/null +++ b/plugins/outputs/zabbix/lld_test.go @@ -0,0 +1,1137 @@ +package zabbix + +import ( + "encoding/json" + "fmt" + "os" + "sort" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/config" + "github.com/influxdata/telegraf/testutil" +) + +type ( + // Operations is an interface to simulate aggregator operations + Operations interface{} + // OperationAdd is an array of metrics to add to the aggregator + OperationAdd []telegraf.Metric + // OperationPush simulate a push call to the aggregator + OperationPush struct{} + // OperationCheck is an array of metrics to check if they are generated by the aggregator + OperationCheck []telegraf.Metric + // OperationCrossClearIntervalTime is used to simulate a time cross the clear interval + OperationCrossClearIntervalTime struct{} +) + +func TestAddAndPush(t *testing.T) { + tests := map[string][]Operations{ + "metric without extra tags does not generate LLD metric": { + OperationAdd{ + testutil.MustMetric( + "name", + map[string]string{"host": "hostA"}, + map[string]interface{}{"value": 1}, + time.Now()), + }, + OperationPush{}, + OperationCheck{}, + }, + "simple Add, Push and check generated LLD metric": { + OperationAdd{ + testutil.MustMetric( + "name", + map[string]string{"host": "hostA", "foo": "bar"}, + map[string]interface{}{"value": 1}, + time.Now()), + }, + OperationPush{}, + OperationCheck{ + testutil.MustMetric( + lldName, + map[string]string{"host": "hostA"}, + map[string]interface{}{"name.foo": `{"data":[{"{#FOO}":"bar"}]}`}, + time.Now(), + ), + }, + }, + "same metric with different tag values": { + OperationAdd{ + testutil.MustMetric( + "name", + map[string]string{"host": "hostA", "foo": "bar1"}, + map[string]interface{}{"value": 1}, + time.Now()), + }, + OperationAdd{ + testutil.MustMetric( + "name", + map[string]string{"host": "hostA", "foo": "bar2"}, + map[string]interface{}{"value": 1}, + time.Now()), + }, + OperationPush{}, + OperationCheck{ + testutil.MustMetric( + lldName, + map[string]string{"host": "hostA"}, + map[string]interface{}{"name.foo": `{"data":[{"{#FOO}":"bar1"},{"{#FOO}":"bar2"}]}`}, + time.Now(), + ), + }, + }, + "add two metrics, Push and check generated LLD metric": { + OperationAdd{ + testutil.MustMetric( + "nameA", + map[string]string{"host": "hostA", "foo": "bar"}, + map[string]interface{}{"value": 1}, + time.Now()), + testutil.MustMetric( + "nameB", + map[string]string{"host": "hostA", "foo": "bar"}, + map[string]interface{}{"value": 1}, + time.Now()), + }, + OperationPush{}, + OperationCheck{ + testutil.MustMetric( + lldName, + map[string]string{"host": "hostA"}, + map[string]interface{}{"nameA.foo": `{"data":[{"{#FOO}":"bar"}]}`}, + time.Now(), + ), + testutil.MustMetric( + lldName, + map[string]string{"host": "hostA"}, + map[string]interface{}{"nameB.foo": `{"data":[{"{#FOO}":"bar"}]}`}, + time.Now(), + ), + }, + }, + "add two similar metrics, one with one more extra tag": { + OperationAdd{ + testutil.MustMetric( + "nameA", + map[string]string{"host": "hostA", "foo1": "bar"}, + map[string]interface{}{"value": 1}, + time.Now()), + testutil.MustMetric( + "nameA", + map[string]string{"host": "hostA", "foo1": "bar", "foo2": "baz"}, + map[string]interface{}{"value": 1}, + time.Now()), + }, + OperationPush{}, + OperationCheck{ + testutil.MustMetric( + lldName, + map[string]string{"host": "hostA"}, + map[string]interface{}{"nameA.foo1": `{"data":[{"{#FOO1}":"bar"}]}`}, + time.Now(), + ), + testutil.MustMetric( + lldName, + map[string]string{"host": "hostA"}, + map[string]interface{}{"nameA.foo1.foo2": `{"data":[{"{#FOO1}":"bar","{#FOO2}":"baz"}]}`}, + time.Now(), + ), + }, + }, + "same metric several times generate only one LLD": { + OperationAdd{ + testutil.MustMetric( + "name", + map[string]string{"host": "hostA", "foo": "bar"}, + map[string]interface{}{"value": 1}, + time.Now(), + ), + testutil.MustMetric( + "name", + map[string]string{"host": "hostA", "foo": "bar"}, + map[string]interface{}{"value": 1}, + time.Now(), + ), + testutil.MustMetric( + "name", + map[string]string{"host": "hostA", "foo": "bar"}, + map[string]interface{}{"value": 1}, + time.Now(), + ), + }, + OperationPush{}, + OperationCheck{ + testutil.MustMetric( + lldName, + map[string]string{"host": "hostA"}, + map[string]interface{}{"name.foo": `{"data":[{"{#FOO}":"bar"}]}`}, + time.Now(), + ), + }, + }, + "same metric several times, with different tag ordering, generate only one LLD": { + OperationAdd{ + testutil.MustMetric( + "name", + map[string]string{"host": "hostA", "foo": "bar", "baz": "qux"}, + map[string]interface{}{"value": 1}, + time.Now(), + ), + testutil.MustMetric( + "name", + map[string]string{"host": "hostA", "baz": "qux", "foo": "bar"}, + map[string]interface{}{"value": 1}, + time.Now(), + ), + testutil.MustMetric( + "name", + map[string]string{"baz": "qux", "foo": "bar", "host": "hostA"}, + map[string]interface{}{"value": 1}, + time.Now(), + ), + }, + OperationPush{}, + OperationCheck{ + testutil.MustMetric( + lldName, + map[string]string{"host": "hostA"}, + map[string]interface{}{"name.baz.foo": `{"data":[{"{#BAZ}":"qux","{#FOO}":"bar"}]}`}, + time.Now(), + ), + }, + }, + "after sending correctly an LLD, same tag values does not generate the same LLD": { + OperationAdd{testutil.MustMetric( + "name", + map[string]string{"host": "hostA", "foo": "bar"}, + map[string]interface{}{"value": 1}, + time.Now(), + )}, + OperationPush{}, + OperationCheck{ + testutil.MustMetric( + lldName, + map[string]string{"host": "hostA"}, + map[string]interface{}{"name.foo": `{"data":[{"{#FOO}":"bar"}]}`}, + time.Now(), + ), + }, + OperationAdd{testutil.MustMetric( + "name", + map[string]string{"host": "hostA", "foo": "bar"}, + map[string]interface{}{"value": 1}, + time.Now(), + )}, + OperationPush{}, + OperationCheck{}, + }, + "after lld_clear_interval, already seen LLDs could be resend": { + OperationAdd{testutil.MustMetric( + "name", + map[string]string{"host": "hostA", "foo": "bar"}, + map[string]interface{}{"value": 1}, + time.Now(), + )}, + OperationPush{}, + OperationCheck{ + testutil.MustMetric( + lldName, + map[string]string{"host": "hostA"}, + map[string]interface{}{"name.foo": `{"data":[{"{#FOO}":"bar"}]}`}, + time.Now(), + ), + }, + OperationAdd{testutil.MustMetric( + "name", + map[string]string{"host": "hostA", "foo": "bar"}, + map[string]interface{}{"value": 1}, + time.Now(), + )}, + OperationPush{}, + OperationCheck{}, + OperationCrossClearIntervalTime{}, // The clear of the previous LLD seen is done in the next push + OperationAdd{testutil.MustMetric( + "name", + map[string]string{"host": "hostA", "foo": "bar"}, + map[string]interface{}{"value": 1}, + time.Now(), + )}, + OperationPush{}, + OperationCheck{}, + OperationAdd{testutil.MustMetric( + "name", + map[string]string{"host": "hostA", "foo": "bar"}, + map[string]interface{}{"value": 1}, + time.Now(), + )}, + OperationPush{}, + OperationCheck{ + testutil.MustMetric( + lldName, + map[string]string{"host": "hostA"}, + map[string]interface{}{"name.foo": `{"data":[{"{#FOO}":"bar"}]}`}, + time.Now(), + ), + }, + }, + "clear interval does not interfere with the send of empty LLDs": { + OperationAdd{testutil.MustMetric( + "name", + map[string]string{"host": "hostA", "foo": "bar"}, + map[string]interface{}{"value": 1}, + time.Now(), + )}, + OperationPush{}, + OperationCheck{ + testutil.MustMetric( + lldName, + map[string]string{"host": "hostA"}, + map[string]interface{}{"name.foo": `{"data":[{"{#FOO}":"bar"}]}`}, + time.Now(), + ), + }, + OperationAdd{testutil.MustMetric( + "name", + map[string]string{"host": "hostA", "foo": "bar"}, + map[string]interface{}{"value": 1}, + time.Now(), + )}, + OperationPush{}, + OperationCheck{}, // LLD has already been sent for this metric + // In this interval between push, the metric is not received + OperationCrossClearIntervalTime{}, // The clear of the previous LLD seen is done in the next push + OperationPush{}, + OperationCheck{ + testutil.MustMetric( + lldName, + map[string]string{"host": "hostA"}, + map[string]interface{}{"name.foo": `{"data":[]}`}, + time.Now(), + ), + }, + }, + "one metric changes the value of the tag, it should send the new value and not send and empty lld": { + OperationAdd{testutil.MustMetric( + "name", + map[string]string{"host": "hostA", "foo": "bar1"}, + map[string]interface{}{"value": 1}, + time.Now(), + )}, + OperationPush{}, + OperationCheck{ + testutil.MustMetric( + lldName, + map[string]string{"host": "hostA"}, + map[string]interface{}{"name.foo": `{"data":[{"{#FOO}":"bar1"}]}`}, + time.Now(), + ), + }, + OperationAdd{testutil.MustMetric( + "name", + map[string]string{"host": "hostA", "foo": "bar2"}, + map[string]interface{}{"value": 1}, + time.Now(), + )}, + OperationPush{}, + OperationCheck{ + testutil.MustMetric( + lldName, + map[string]string{"host": "hostA"}, + map[string]interface{}{"name.foo": `{"data":[{"{#FOO}":"bar2"}]}`}, + time.Now(), + ), + }, + }, + "if one input stop sending metrics, an empty LLD is sent": { + OperationAdd{testutil.MustMetric( + "name", + map[string]string{"host": "hostA", "foo": "bar"}, + map[string]interface{}{"value": 1}, + time.Now(), + )}, + OperationPush{}, + OperationCheck{ + testutil.MustMetric( + lldName, + map[string]string{"host": "hostA"}, + map[string]interface{}{"name.foo": `{"data":[{"{#FOO}":"bar"}]}`}, + time.Now(), + ), + }, + OperationPush{}, + OperationCheck{testutil.MustMetric( + lldName, + map[string]string{"host": "hostA"}, + map[string]interface{}{"name.foo": `{"data":[]}`}, + time.Now(), + )}, + }, + "from two inputs, one stop sending metrics, an empty LLD is sent just for that stopped input": { + OperationAdd{ + testutil.MustMetric( + "name", + map[string]string{"host": "hostA", "foo": "bar"}, + map[string]interface{}{"value": 1}, + time.Now(), + ), + testutil.MustMetric( + "name", + map[string]string{"host": "hostB", "foo": "bar"}, + map[string]interface{}{"value": 1}, + time.Now(), + ), + }, + OperationPush{}, + OperationCheck{ + testutil.MustMetric( + lldName, + map[string]string{"host": "hostA"}, + map[string]interface{}{"name.foo": `{"data":[{"{#FOO}":"bar"}]}`}, + time.Now(), + ), + testutil.MustMetric( + lldName, + map[string]string{"host": "hostB"}, + map[string]interface{}{"name.foo": `{"data":[{"{#FOO}":"bar"}]}`}, + time.Now(), + ), + }, + OperationAdd{ + testutil.MustMetric( + "name", + map[string]string{"host": "hostB", "foo": "bar"}, + map[string]interface{}{"value": 1}, + time.Now(), + ), + }, + OperationPush{}, + OperationCheck{ + testutil.MustMetric( + lldName, + map[string]string{"host": "hostA"}, + map[string]interface{}{"name.foo": `{"data":[]}`}, + time.Now(), + ), + }, + }, + "different hosts sending the same metric should generate different LLDs": { + OperationAdd{testutil.MustMetric( + "name", + map[string]string{"host": "hostA", "foo": "bar"}, + map[string]interface{}{"value": 1}, + time.Now(), + )}, + OperationAdd{testutil.MustMetric( + "name", + map[string]string{"host": "hostB", "foo": "bar"}, + map[string]interface{}{"value": 1}, + time.Now(), + )}, + OperationPush{}, + OperationCheck{ + testutil.MustMetric( + lldName, + map[string]string{"host": "hostA"}, + map[string]interface{}{"name.foo": `{"data":[{"{#FOO}":"bar"}]}`}, + time.Now(), + ), + testutil.MustMetric( + lldName, + map[string]string{"host": "hostB"}, + map[string]interface{}{"name.foo": `{"data":[{"{#FOO}":"bar"}]}`}, + time.Now(), + ), + }, + }, + "same measurement with different tags should generate different LLDs": { + OperationAdd{testutil.MustMetric( + "name", + map[string]string{"host": "hostA", "foo": "a"}, + map[string]interface{}{"value": 1}, + time.Now(), + )}, + OperationAdd{ + testutil.MustMetric( + "name", + map[string]string{"host": "hostA", "foo": "a", "bar": "b"}, + map[string]interface{}{"value": 1}, + time.Now(), + ), + }, + OperationPush{}, + OperationCheck{ + testutil.MustMetric( + lldName, + map[string]string{"host": "hostA"}, + map[string]interface{}{"name.foo": `{"data":[{"{#FOO}":"a"}]}`}, + time.Now(), + ), + testutil.MustMetric( + lldName, + map[string]string{"host": "hostA"}, + map[string]interface{}{"name.bar.foo": `{"data":[{"{#BAR}":"b","{#FOO}":"a"}]}`}, + time.Now(), + ), + }, + }, + "a set with a new combination of tag values already seen should generate a new lld": { + OperationAdd{ + testutil.MustMetric( + "name", + map[string]string{"host": "hostA", "foo": "a", "bar": "b"}, + map[string]interface{}{"value": 1}, + time.Now(), + ), + }, + OperationAdd{ + testutil.MustMetric( + "name", + map[string]string{"host": "hostA", "foo": "x", "bar": "y"}, + map[string]interface{}{"value": 1}, + time.Now(), + ), + }, + OperationAdd{ + testutil.MustMetric( + "name", + map[string]string{"host": "hostA", "foo": "a", "bar": "y"}, + map[string]interface{}{"value": 1}, + time.Now(), + ), + }, + OperationPush{}, + OperationCheck{ + testutil.MustMetric( + lldName, + map[string]string{"host": "hostA"}, + map[string]interface{}{ + "name.bar.foo": `{"data":[{"{#BAR}":"b","{#FOO}":"a"},{"{#BAR}":"y","{#FOO}":"a"},{"{#BAR}":"y","{#FOO}":"x"}]}`, + }, + time.Now(), + ), + }, + }, + "same host and metric with and without extra tag": { + OperationAdd{ + testutil.MustMetric( + "name", + map[string]string{"host": "hostA", "foo": "a"}, + map[string]interface{}{"value": 1}, + time.Now(), + ), + }, + OperationAdd{ + testutil.MustMetric( + "name", + map[string]string{"host": "hostA"}, + map[string]interface{}{"value": 1}, + time.Now(), + ), + }, + OperationPush{}, + OperationCheck{ + testutil.MustMetric( + lldName, + map[string]string{"host": "hostA"}, + map[string]interface{}{ + "name.foo": `{"data":[{"{#FOO}":"a"}]}`, + }, + time.Now(), + ), + }, + OperationCrossClearIntervalTime{}, + OperationPush{}, + OperationAdd{ + testutil.MustMetric( + "name", + map[string]string{"host": "hostA"}, + map[string]interface{}{"value": 1}, + time.Now(), + ), + }, + OperationAdd{ + testutil.MustMetric( + "name", + map[string]string{"host": "hostA", "foo": "a"}, + map[string]interface{}{"value": 1}, + time.Now(), + ), + }, + OperationPush{}, + OperationCheck{ + testutil.MustMetric( + lldName, + map[string]string{"host": "hostA"}, + map[string]interface{}{ + "name.foo": `{"data":[{"{#FOO}":"a"}]}`, + }, + time.Now(), + ), + }, + OperationAdd{ + testutil.MustMetric( + "name", + map[string]string{"host": "hostA"}, + map[string]interface{}{"value": 1}, + time.Now(), + ), + }, + OperationPush{}, + // Clean name.foo because it has not been since the last push + OperationCheck{ + testutil.MustMetric( + lldName, + map[string]string{"host": "hostA"}, + map[string]interface{}{ + "name.foo": `{"data":[]}`, + }, + time.Now(), + ), + }, + }, + } + + for desc, test := range tests { + t.Run(desc, func(t *testing.T) { + zl := zabbixLLD{ + log: testutil.Logger{}, + clearInterval: config.Duration(time.Hour), + lastClear: time.Now(), + hostTag: "host", + current: make(map[uint64]lldInfo), + } + + metrics := []telegraf.Metric{} + + for _, op := range test { + switch o := (op).(type) { + case OperationAdd: + for _, m := range o { + require.NoError(t, zl.Add(m)) + } + case OperationPush: + metrics = zl.Push() + case OperationCheck: + metrics = sortMetricJSONData(metrics) + testutil.RequireMetricsEqual(t, o, metrics, testutil.IgnoreTime(), testutil.SortMetrics()) + case OperationCrossClearIntervalTime: + // Simulate the time passing by and crossing the clear interval time. + // Add an extra millisecond to be sure to cross the interval in the next operation. + zl.lastClear = time.Now().Add(-time.Duration(zl.clearInterval)).Add(-time.Millisecond) + } + } + }) + } +} + +func TestPush(t *testing.T) { + tests := map[string]struct { + ReceivedData map[uint64]lldInfo + PreviousReceivedData map[uint64]lldInfo + Metrics []telegraf.Metric + }{ + "an empty ReceivedData does not generate any metric": { + ReceivedData: map[uint64]lldInfo{}, + PreviousReceivedData: map[uint64]lldInfo{}, + Metrics: []telegraf.Metric{}, + }, + "simple one host with one lld with one set of values": { + ReceivedData: map[uint64]lldInfo{ + 0: { + Hostname: "hostA", + Key: "disk.foo", + Data: map[uint64]map[string]string{ + 1: { + "{#FOO}": "bar", + }, + }, + }, + }, + PreviousReceivedData: map[uint64]lldInfo{}, + Metrics: []telegraf.Metric{ + testutil.MustMetric(lldName, + map[string]string{"host": "hostA"}, + map[string]interface{}{"disk.foo": `{"data":[{"{#FOO}":"bar"}]}`}, + time.Now(), + ), + }, + }, + "one host with one lld with two set of values": { + ReceivedData: map[uint64]lldInfo{ + 0: { + Hostname: "hostA", + Key: "disk.foo", + Data: map[uint64]map[string]string{ + 1: { + "{#FOO}": "bar1", + }, + 2: { + "{#FOO}": "bar2", + }, + }, + }, + }, + PreviousReceivedData: map[uint64]lldInfo{}, + Metrics: []telegraf.Metric{ + testutil.MustMetric(lldName, + map[string]string{"host": "hostA"}, + map[string]interface{}{"disk.foo": `{"data":[{"{#FOO}":"bar1"},{"{#FOO}":"bar2"}]}`}, + time.Now(), + ), + }, + }, + "one host with one lld with one multiset of values": { + ReceivedData: map[uint64]lldInfo{ + 0: { + Hostname: "hostA", + Key: "disk.fooA.fooB.fooC", + Data: map[uint64]map[string]string{ + 1: { + "{#FOOA}": "bar1", + "{#FOOB}": "bar2", + "{#FOOC}": "bar3", + }, + }, + }, + }, + PreviousReceivedData: map[uint64]lldInfo{}, + Metrics: []telegraf.Metric{ + testutil.MustMetric(lldName, + map[string]string{"host": "hostA"}, + map[string]interface{}{"disk.fooA.fooB.fooC": `{"data":[{"{#FOOA}":"bar1","{#FOOB}":"bar2","{#FOOC}":"bar3"}]}`}, + time.Now(), + ), + }, + }, + "one host with three lld with one set of values, not sorted": { + ReceivedData: map[uint64]lldInfo{ + 0: { + Hostname: "hostA", + Key: "disk.foo", + Data: map[uint64]map[string]string{ + 1: { + "{#FOO}": "bar", + }, + }, + }, + 1: { + Hostname: "hostA", + Key: "net.iface", + Data: map[uint64]map[string]string{ + 1: { + "{#IFACE}": "eth0", + }, + }, + }, + 2: { + Hostname: "hostA", + Key: "proc.pid", + Data: map[uint64]map[string]string{ + 1: { + "{#PID}": "1234", + }, + }, + }, + }, + PreviousReceivedData: map[uint64]lldInfo{}, + Metrics: []telegraf.Metric{ + testutil.MustMetric(lldName, + map[string]string{"host": "hostA"}, + map[string]interface{}{"proc.pid": `{"data":[{"{#PID}":"1234"}]}`}, + time.Now(), + ), + testutil.MustMetric(lldName, + map[string]string{"host": "hostA"}, + map[string]interface{}{"disk.foo": `{"data":[{"{#FOO}":"bar"}]}`}, + time.Now(), + ), + testutil.MustMetric(lldName, + map[string]string{"host": "hostA"}, + map[string]interface{}{"net.iface": `{"data":[{"{#IFACE}":"eth0"}]}`}, + time.Now(), + ), + }, + }, + "two host with the same lld with one set of values": { + ReceivedData: map[uint64]lldInfo{ + 0: { + Hostname: "hostA", + Key: "disk.foo", + Data: map[uint64]map[string]string{ + 1: { + "{#FOO}": "bar", + }, + }, + }, + 1: { + Hostname: "hostB", + Key: "disk.foo", + Data: map[uint64]map[string]string{ + 1: { + "{#FOO}": "bar", + }, + }, + }, + }, + PreviousReceivedData: map[uint64]lldInfo{}, + Metrics: []telegraf.Metric{ + testutil.MustMetric(lldName, + map[string]string{"host": "hostA"}, + map[string]interface{}{"disk.foo": `{"data":[{"{#FOO}":"bar"}]}`}, + time.Now(), + ), + testutil.MustMetric(lldName, + map[string]string{"host": "hostB"}, + map[string]interface{}{"disk.foo": `{"data":[{"{#FOO}":"bar"}]}`}, + time.Now(), + ), + }, + }, + "ignore generating a new lld if it was sent the last time": { + ReceivedData: map[uint64]lldInfo{ + 2658406801034663970: { + Hostname: "hostA", + Key: "disk.foo", + Data: map[uint64]map[string]string{ + 1: { + "{#FOO}": "bar", + }, + }, + }, + }, + PreviousReceivedData: map[uint64]lldInfo{ + 2658406801034663970: { + Hostname: "hostA", + Key: "disk.foo", + Data: map[uint64]map[string]string{ + 1: { + "{#FOO}": "bar", + }, + }, + }, + }, + Metrics: []telegraf.Metric{}, + }, + "send an empty LLD if one metric has stopped being sent": { + ReceivedData: map[uint64]lldInfo{}, + PreviousReceivedData: map[uint64]lldInfo{ + 0: { + Hostname: "hostA", + Key: "disk.foo", + Data: map[uint64]map[string]string{ + 1: { + "{#FOO}": "bar", + }, + }, + }, + }, + Metrics: []telegraf.Metric{ + testutil.MustMetric(lldName, + map[string]string{"host": "hostA"}, + map[string]interface{}{"disk.foo": `{"data":[]}`}, + time.Now(), + ), + }, + }, + } + + for desc, test := range tests { + t.Run(desc, func(t *testing.T) { + zl := zabbixLLD{ + clearInterval: 4, + log: testutil.Logger{}, + current: test.ReceivedData, + previous: test.PreviousReceivedData, + hostTag: "host", + } + + // Hash the previous data + for series, info := range zl.previous { + info.DataHash = info.hash() + zl.previous[series] = info + } + + metrics := zl.Push() + // Sort the "data" dict in the metrics values to get always the same order. + metrics = sortMetricJSONData(metrics) + + testutil.RequireMetricsEqual(t, test.Metrics, metrics, testutil.IgnoreTime(), testutil.SortMetrics()) + }) + } +} + +type MetricValue struct { + Data []map[string]string `json:"data"` +} + +// sortMetricJSONData given a list of metrics, if the name is equal to lldName, the JSON data dictionaries are sorted. +// This is needed because the order of the JSON data dictionaries is not guaranteed but we need them sorted to compare +// them against the expected tests values. This sorting should be done using the keys of the dictionaries. +// Example: +// +// Original metrics: lld,host=foo disk.foo={"data":[{"{#FOO2}":"bar2"},{"{#FOO1}":"bar1"}]} +// Sorted metrics: lld,host=foo disk.foo={"data":[{"{#FOO1}":"bar1"},{"{#FOO2}":"bar2"}]} +func sortMetricJSONData(metrics []telegraf.Metric) []telegraf.Metric { + for _, m := range metrics { + if m.Name() == lldName { + for _, f := range m.FieldList() { + // f is a string with format: '{"data":[{"{#PID}":"1234"}]}' + var data MetricValue + err := json.Unmarshal([]byte(f.Value.(string)), &data) + if err != nil { + panic(err) + } + + // Sort data comparing the content as a string + sort.Slice(data.Data, func(i, j int) bool { + return fmt.Sprintf("%v", data.Data[i]) < fmt.Sprintf("%v", data.Data[j]) + }) + + dataJSON, err := json.Marshal(data) + if err != nil { + panic(err) + } + + f.Value = string(dataJSON) + } + } + } + + return metrics +} + +func TestAdd(t *testing.T) { + hostname, err := os.Hostname() + require.NoError(t, err) + + tests := map[string]struct { + Metrics []telegraf.Metric + Current map[uint64]lldInfo + }{ + "metric without tags is ignored": { + Metrics: []telegraf.Metric{ + testutil.MustMetric("disk", map[string]string{}, map[string]interface{}{"a": 0}, time.Now()), + }, + Current: map[uint64]lldInfo{}, + }, + "metric with only the host tag is not used for LLD": { + Metrics: []telegraf.Metric{ + testutil.MustMetric( + "disk", + map[string]string{"host": "bar"}, + map[string]interface{}{"a": 0}, + time.Now(), + ), + }, + Current: map[uint64]lldInfo{}, + }, + "add one metric with one tag and not host tag, use the system hostname": { + Metrics: []telegraf.Metric{ + testutil.MustMetric( + "disk", + map[string]string{"foo": "bar"}, + map[string]interface{}{"a": 0}, + time.Now(), + ), + }, + Current: map[uint64]lldInfo{ + 1: { + Hostname: hostname, + Key: "disk.foo", + Data: map[uint64]map[string]string{ + 2011740591878200733: { + "{#FOO}": "bar", + }, + }, + }, + }, + }, + "add one metric with one extra tag": { + Metrics: []telegraf.Metric{ + testutil.MustMetric( + "disk", + map[string]string{"host": "bar", "foo": "bar"}, + map[string]interface{}{"a": 0}, + time.Now(), + ), + }, + Current: map[uint64]lldInfo{ + 1: { + Hostname: "bar", + Key: "disk.foo", + Data: map[uint64]map[string]string{ + 13756738031738276742: { + "{#FOO}": "bar", + }, + }, + }, + }, + }, + "same metric with different field values is only stored once": { + Metrics: []telegraf.Metric{ + testutil.MustMetric( + "disk", + map[string]string{"host": "bar", "foo": "bar"}, + map[string]interface{}{"a": 0}, + time.Now(), + ), + testutil.MustMetric( + "disk", + map[string]string{"host": "bar", "foo": "bar"}, + map[string]interface{}{"a": 999}, + time.Now(), + ), + }, + Current: map[uint64]lldInfo{ + 1: { + Hostname: "bar", + Key: "disk.foo", + Data: map[uint64]map[string]string{ + 13756738031738276742: { + "{#FOO}": "bar", + }, + }, + }, + }, + }, + "for the same measurement and tags, the different combinations of tag values are stored under the same key": { + Metrics: []telegraf.Metric{ + testutil.MustMetric( + "disk", + map[string]string{"host": "bar", "foo": "bar1"}, + map[string]interface{}{"a": 0}, + time.Now(), + ), + testutil.MustMetric( + "disk", + map[string]string{"host": "bar", "foo": "bar2"}, + map[string]interface{}{"a": 0}, + time.Now(), + ), + }, + Current: map[uint64]lldInfo{ + 1: { + Hostname: "bar", + Key: "disk.foo", + Data: map[uint64]map[string]string{ + 9541037171803204811: { + "{#FOO}": "bar1", + }, + 10966311568236988310: { + "{#FOO}": "bar2", + }, + }, + }, + }, + }, + "same measurement and tags for different hosts are stored in different keys": { + Metrics: []telegraf.Metric{ + testutil.MustMetric( + "disk", + map[string]string{"host": "barA", "foo": "bar"}, + map[string]interface{}{"a": 0}, + time.Now(), + ), + testutil.MustMetric( + "disk", + map[string]string{"host": "barB", "foo": "bar"}, + map[string]interface{}{"a": 0}, + time.Now(), + ), + }, + Current: map[uint64]lldInfo{ + 1: { + Hostname: "barA", + Key: "disk.foo", + Data: map[uint64]map[string]string{ + 4916699111010086803: { + "{#FOO}": "bar", + }, + }, + }, + 2: { + Hostname: "barB", + Key: "disk.foo", + Data: map[uint64]map[string]string{ + 4917655686126441148: { + "{#FOO}": "bar", + }, + }, + }, + }, + }, + "different number of tags for the same measurement are stored in different keys": { + Metrics: []telegraf.Metric{ + testutil.MustMetric( + "disk", + map[string]string{"host": "bar", "foo1": "bar", "foo2": "bar"}, + map[string]interface{}{"a": 0}, + time.Now(), + ), + testutil.MustMetric( + "disk", + map[string]string{"host": "bar", "foo1": "bar"}, + map[string]interface{}{"a": 0}, + time.Now(), + ), + }, + Current: map[uint64]lldInfo{ + 1: { + Hostname: "bar", + Key: "disk.foo1.foo2", + Data: map[uint64]map[string]string{ + 12473238139685120014: { + "{#FOO1}": "bar", + "{#FOO2}": "bar", + }, + }, + }, + 2: { + Hostname: "bar", + Key: "disk.foo1", + Data: map[uint64]map[string]string{ + 4193955122073793785: { + "{#FOO1}": "bar", + }, + }, + }, + }, + }, + } + + for desc, test := range tests { + t.Run(desc, func(t *testing.T) { + zl := zabbixLLD{ + log: testutil.Logger{}, + clearInterval: config.Duration(time.Hour), + lastClear: time.Now(), + hostTag: "host", + current: make(map[uint64]lldInfo), + } + + for _, m := range test.Metrics { + require.NoError(t, zl.Add(m)) + } + + // Calculate series ID for the test data. + // Metric hashes could not be calculated because we don't have enough information. + for id, info := range test.Current { + calculatedID := lldSeriesID(info.Hostname, info.Key) + if id == calculatedID { + continue + } + + test.Current[calculatedID] = info + + // Drop old ID + delete(test.Current, id) + } + + require.Equal(t, test.Current, zl.current) + }) + } +} diff --git a/plugins/outputs/zabbix/sample.conf b/plugins/outputs/zabbix/sample.conf new file mode 100644 index 0000000000000..24f29c97a0eb2 --- /dev/null +++ b/plugins/outputs/zabbix/sample.conf @@ -0,0 +1,33 @@ +# Send metrics to Zabbix +[[outputs.zabbix]] + ## Address and (optional) port of the Zabbix server + address = "zabbix.example.com:10051" + + ## Send metrics as type "Zabbix agent (active)" + # agent_active = false + + ## Add prefix to all keys sent to Zabbix. + # prefix = "telegraf." + + ## Name of the tag that contains the host name + # host_tag = "host" + + ## Skip measurement prefix to all keys sent to Zabbix. + # skip_measurement_prefix = false + + ## This field will be sent as HostMetadata to Zabbix Server to autoregister the host. + ## To enable this feature, this option must be set to a value other than "". + # autoregister = "" + + ## Interval to resend auto-registration data to Zabbix. + ## Only applies if autoregister feature is enabled. + ## This value is a lower limit, the actual resend should be triggered by the next flush interval. + # autoregister_resend_interval = "30m" + + ## Interval to send LLD data to Zabbix. + ## This value is a lower limit, the actual resend should be triggered by the next flush interval. + # lld_send_interval = "10m" + + ## Interval to delete stored LLD known data and start capturing it again. + ## This value is a lower limit, the actual resend should be triggered by the next flush interval. + # lld_clear_interval = "1h" diff --git a/plugins/outputs/zabbix/zabbix.go b/plugins/outputs/zabbix/zabbix.go new file mode 100644 index 0000000000000..9010f4f50c5a3 --- /dev/null +++ b/plugins/outputs/zabbix/zabbix.go @@ -0,0 +1,240 @@ +//go:generate ../../../tools/readme_config_includer/generator +package zabbix + +import ( + _ "embed" + "fmt" + "net" + "os" + "sort" + "strings" + "time" + + "github.com/datadope-io/go-zabbix/v2" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/config" + "github.com/influxdata/telegraf/internal" + "github.com/influxdata/telegraf/plugins/outputs" +) + +// zabbixSender is an interface to send autoregister data to Zabbix. +// It is implemented by Zabbix.Sender. +// Created to be able to mock Zabbix.Sender in tests. +type zabbixSender interface { + Send(packet *zabbix.Packet) (res zabbix.Response, err error) + SendMetrics(metrics []*zabbix.Metric) (resActive zabbix.Response, resTrapper zabbix.Response, err error) + RegisterHost(hostname string, hostMetadata string) error +} + +// Zabbix allows pushing metrics to Zabbix software +type Zabbix struct { + Address string `toml:"address"` + AgentActive bool `toml:"agent_active"` + Prefix string `toml:"prefix"` + HostTag string `toml:"host_tag"` + SkipMeasurementPrefix bool `toml:"skip_measurement_prefix"` + LLDSendInterval config.Duration `toml:"lld_send_interval"` + LLDClearInterval config.Duration `toml:"lld_clear_interval"` + Autoregister string `toml:"autoregister"` + AutoregisterResendInterval config.Duration `toml:"autoregister_resend_interval"` + Log telegraf.Logger `toml:"-"` + + // lldHandler handles low level discovery data + lldHandler zabbixLLD + // lldLastSend store the last LLD send to known where to send it again + lldLastSend time.Time + // autoregisterLastSend stores the last time autoregister data was sent to Zabbix for each host. + autoregisterLastSend map[string]time.Time + // sender is the interface to send data to Zabbix. + sender zabbixSender +} + +//go:embed sample.conf +var sampleConfig string + +func (*Zabbix) SampleConfig() string { + return sampleConfig +} + +// Connect does nothing, Write() would initiate connection in each call. +// Checking if Zabbix server is alive in this step does not allow Telegraf +// to start if there is a temporal connection problem with the server. +func (z *Zabbix) Connect() error { + return nil +} + +// Init initilizes LLD and autoregister maps. Copy config values to them. Configure Logger. +func (z *Zabbix) Init() error { + // Add port to address if not present + if _, _, err := net.SplitHostPort(z.Address); err != nil { + z.Address = net.JoinHostPort(z.Address, "10051") + } + + z.sender = zabbix.NewSender(z.Address) + // Initialize autoregisterLastSend map with size one, as the most common scenario is to have one host. + z.autoregisterLastSend = make(map[string]time.Time, 1) + z.lldLastSend = time.Now() + + z.lldHandler = zabbixLLD{ + log: z.Log, + hostTag: z.HostTag, + clearInterval: z.LLDClearInterval, + lastClear: time.Now(), + current: make(map[uint64]lldInfo, 100), + } + + return nil +} + +func (z *Zabbix) Close() error { + return nil +} + +// Write sends metrics to Zabbix server +func (z *Zabbix) Write(metrics []telegraf.Metric) error { + if len(metrics) == 0 { + return nil + } + + zbxMetrics := make([]*zabbix.Metric, 0, len(metrics)) + + for _, metric := range metrics { + hostname, err := getHostname(z.HostTag, metric) + if err != nil { + z.Log.Errorf("Error getting hostname for metric %v: %v", metric, err) + continue + } + + zbxMetrics = append(zbxMetrics, z.processMetric(metric)...) + + // Handle hostname for autoregister + z.autoregisterAdd(hostname) + + // Process LLD data + err = z.lldHandler.Add(metric) + if err != nil { + z.Log.Errorf("Error processing LLD for metric %v: %v", metric, err) + } + } + + // Send LLD data if enough time has passed + if time.Since(z.lldLastSend) > time.Duration(z.LLDSendInterval) { + z.lldLastSend = time.Now() + for _, lldMetric := range z.lldHandler.Push() { + zbxMetrics = append(zbxMetrics, z.processMetric(lldMetric)...) + } + } + + // Send metrics to Zabbix server + err := z.sendZabbixMetrics(zbxMetrics) + + // Send autoregister data after sending metrics. + z.autoregisterPush() + + return err +} + +// sendZabbixMetrics sends metrics to Zabbix server +func (z *Zabbix) sendZabbixMetrics(zbxMetrics []*zabbix.Metric) error { + if len(zbxMetrics) == 0 { + return nil + } + + // Sort metrics by time. + // Avoid extra work in Zabbix when generating the trends. + // If values are not sent in clock order, trend generation is forced to + // make more database operations. + // When a value is received with a new hour, trend is flushed to the + // database. + // If later a value is received with the previous hour, new trend is + // flushed, old one is retrieved from database and updated. + // When a new value with the new hour is received, old trend is flushed, + // new trend retrieved from database and updated. + sort.Slice(zbxMetrics, func(i, j int) bool { + return zbxMetrics[i].Clock < zbxMetrics[j].Clock + }) + + packet := zabbix.NewPacket(zbxMetrics, z.AgentActive) + _, err := z.sender.Send(packet) + + return err +} + +// processMetric converts a Telegraf metric to a list of Zabbix metrics. +// Ignore metrics with no hostname. +func (z Zabbix) processMetric(metric telegraf.Metric) []*zabbix.Metric { + zbxMetrics := make([]*zabbix.Metric, 0, len(metric.FieldList())) + + for _, field := range metric.FieldList() { + zbxMetric, err := z.buildZabbixMetric(metric, field.Key, field.Value) + if err != nil { + z.Log.Errorf("Error converting telegraf metric to Zabbix format: %v", err) + continue + } + + zbxMetrics = append(zbxMetrics, zbxMetric) + } + + return zbxMetrics +} + +// buildZabbixMetric builds a Zabbix metric from a Telegraf metric, for one particular value. +func (z Zabbix) buildZabbixMetric(metric telegraf.Metric, fieldName string, value interface{}) (*zabbix.Metric, error) { + hostname, err := getHostname(z.HostTag, metric) + if err != nil { + return nil, fmt.Errorf("error getting hostname: %w", err) + } + + metricValue, err := internal.ToString(value) + if err != nil { + return nil, fmt.Errorf("error converting value: %w", err) + } + + key := z.Prefix + metric.Name() + "." + fieldName + if z.SkipMeasurementPrefix { + key = z.Prefix + fieldName + } + + // Ignore host tag. + // We want to add tags to the key in alphabetical order. Eg.: + // lld.dns_query.query_time_ms[DOMAIN,RECORD_TYPE,SERVER] + // TagList already return the tags in alphabetical order. + tagValues := make([]string, 0, len(metric.TagList())) + + for _, tag := range metric.TagList() { + if tag.Key == z.HostTag { + continue + } + + // Get tag values in the same order as the tag keys in the tags slice. + tagValues = append(tagValues, tag.Value) + } + + if len(tagValues) != 0 { + key = fmt.Sprintf("%v[%v]", key, strings.Join(tagValues, ",")) + } + + return zabbix.NewMetric(hostname, key, metricValue, z.AgentActive, metric.Time().Unix()), nil +} + +func init() { + outputs.Add("zabbix", func() telegraf.Output { + return &Zabbix{ + Prefix: "telegraf.", + HostTag: "host", + AutoregisterResendInterval: config.Duration(time.Minute * 30), + LLDSendInterval: config.Duration(time.Minute * 10), + LLDClearInterval: config.Duration(time.Hour), + } + }) +} + +// getHostname returns the hostname from the tags, or the system hostname if not found. +func getHostname(hostTag string, metric telegraf.Metric) (string, error) { + if hostname, ok := metric.GetTag(hostTag); ok { + return hostname, nil + } + + return os.Hostname() +} diff --git a/plugins/outputs/zabbix/zabbix_test.go b/plugins/outputs/zabbix/zabbix_test.go new file mode 100644 index 0000000000000..41002e1350d0b --- /dev/null +++ b/plugins/outputs/zabbix/zabbix_test.go @@ -0,0 +1,920 @@ +package zabbix + +import ( + "encoding/binary" + "encoding/json" + "fmt" + "net" + "os" + "sort" + "strings" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/config" + "github.com/influxdata/telegraf/testutil" +) + +type zabbixRequestData struct { + Host string `json:"host"` + Key string `json:"key"` + Value string `json:"value"` + Clock int64 `json:"clock"` +} + +type zabbixRequest struct { + Request string `json:"request"` + Data []zabbixRequestData `json:"data"` + Clock int `json:"clock"` + Host string `json:"host"` + HostMetadata string `json:"host_metadata"` +} + +type zabbixLLDValue struct { + Data []map[string]string `json:"data"` +} + +func TestZabbix(t *testing.T) { + hostname, err := os.Hostname() + require.NoError(t, err) + + tests := map[string]struct { + Prefix string + AgentActive bool + SkipMeasurementPrefix bool + telegrafMetrics []telegraf.Metric + zabbixMetrics []zabbixRequestData + }{ + "send one metric with one field and no extra tags, generates one zabbix metric": { + telegrafMetrics: []telegraf.Metric{ + testutil.MustMetric("name", + map[string]string{ + "host": "hostname", + }, + map[string]interface{}{ + "value": int64(0), + }, + time.Unix(1522082244, 0), + ), + }, + zabbixMetrics: []zabbixRequestData{ + { + Host: "hostname", + Key: "name.value", + Value: "0", + Clock: 1522082244, + }, + }, + }, + "string values representing a float number should be sent in the exact same format": { + telegrafMetrics: []telegraf.Metric{ + testutil.MustMetric("name", + map[string]string{ + "host": "hostname", + }, + map[string]interface{}{ + "value": "3.1415", + }, + time.Unix(1522082244, 0), + ), + }, + zabbixMetrics: []zabbixRequestData{ + { + Host: "hostname", + Key: "name.value", + Value: "3.1415", + Clock: 1522082244, + }, + }, + }, + "send one metric with one string field and no extra tags, generates one zabbix metric": { + telegrafMetrics: []telegraf.Metric{ + testutil.MustMetric("name", + map[string]string{ + "host": "hostname", + }, + map[string]interface{}{ + "value": "some value", + }, + time.Unix(1522082244, 0), + ), + }, + zabbixMetrics: []zabbixRequestData{ + { + Host: "hostname", + Key: "name.value", + Value: "some value", + Clock: 1522082244, + }, + }, + }, + "boolean values are converted to 1 (true) or 0 (false)": { + telegrafMetrics: []telegraf.Metric{ + testutil.MustMetric("name", + map[string]string{ + "host": "hostname", + }, + map[string]interface{}{ + "valueTrue": true, + "valueFalse": false, + }, + time.Unix(1522082244, 0), + ), + }, + zabbixMetrics: []zabbixRequestData{ + { + Host: "hostname", + Key: "name.valueTrue", + Value: "true", + Clock: 1522082244, + }, + { + Host: "hostname", + Key: "name.valueFalse", + Value: "false", + Clock: 1522082244, + }, + }, + }, + "invalid value data is ignored and not sent": { + telegrafMetrics: []telegraf.Metric{ + testutil.MustMetric("name", + map[string]string{ + "host": "hostname", + }, + map[string]interface{}{ + "value": []int{1, 2}, + }, + time.Unix(1522082244, 0), + ), + }, + zabbixMetrics: []zabbixRequestData{}, + }, + "metrics without host tag use the system hostname": { + telegrafMetrics: []telegraf.Metric{ + testutil.MustMetric("name", + map[string]string{}, + map[string]interface{}{ + "value": "x", + }, + time.Unix(1522082244, 0), + ), + }, + zabbixMetrics: []zabbixRequestData{ + { + Host: hostname, + Key: "name.value", + Value: "x", + Clock: 1522082244, + }, + }, + }, + "send one metric with extra tags, zabbix metric should be generated with a parameter": { + telegrafMetrics: []telegraf.Metric{ + testutil.MustMetric("name", + map[string]string{ + "host": "hostname", + "foo": "bar", + }, + map[string]interface{}{ + "value": int64(0), + }, + time.Unix(1522082244, 0), + ), + }, + zabbixMetrics: []zabbixRequestData{ + { + Host: "hostname", + Key: "name.value[bar]", + Value: "0", + Clock: 1522082244, + }, + }, + }, + "send one metric with two extra tags, zabbix parameters should be alfabetically orderer": { + telegrafMetrics: []telegraf.Metric{ + testutil.MustMetric("name", + map[string]string{ + "host": "hostname", + "zparam": "last", + "aparam": "first", + }, + map[string]interface{}{ + "value": int64(0), + }, + time.Unix(1522082244, 0), + ), + }, + zabbixMetrics: []zabbixRequestData{ + { + Host: "hostname", + Key: "name.value[first,last]", + Value: "0", + Clock: 1522082244, + }, + }, + }, + "send one metric with two fields and no extra tags, generates two zabbix metrics": { + telegrafMetrics: []telegraf.Metric{ + testutil.MustMetric("name", + map[string]string{ + "host": "hostname", + }, + map[string]interface{}{ + "valueA": int64(0), + "valueB": int64(1), + }, + time.Unix(1522082244, 0), + ), + }, + zabbixMetrics: []zabbixRequestData{ + { + Host: "hostname", + Key: "name.valueA", + Value: "0", + Clock: 1522082244, + }, + { + Host: "hostname", + Key: "name.valueB", + Value: "1", + Clock: 1522082244, + }, + }, + }, + "send two metrics with one field and no extra tags, generates two zabbix metrics": { + telegrafMetrics: []telegraf.Metric{ + testutil.MustMetric("nameA", + map[string]string{ + "host": "hostname", + }, + map[string]interface{}{ + "value": int64(0), + }, + time.Unix(1522082244, 0), + ), + testutil.MustMetric("nameB", + map[string]string{ + "host": "hostname", + }, + map[string]interface{}{ + "value": int64(0), + }, + time.Unix(1522082244, 0), + ), + }, + zabbixMetrics: []zabbixRequestData{ + { + Host: "hostname", + Key: "nameA.value", + Value: "0", + Clock: 1522082244, + }, + { + Host: "hostname", + Key: "nameB.value", + Value: "0", + Clock: 1522082244, + }, + }, + }, + "send two metrics with different hostname, generates two zabbix metrics for different hosts": { + telegrafMetrics: []telegraf.Metric{ + testutil.MustMetric("name", + map[string]string{ + "host": "hostnameA", + }, + map[string]interface{}{ + "value": int64(0), + }, + time.Unix(1522082244, 0), + ), + testutil.MustMetric("name", + map[string]string{ + "host": "hostnameB", + }, + map[string]interface{}{ + "value": int64(0), + }, + time.Unix(1522082244, 0), + ), + }, + zabbixMetrics: []zabbixRequestData{ + { + Host: "hostnameA", + Key: "name.value", + Value: "0", + Clock: 1522082244, + }, + { + Host: "hostnameB", + Key: "name.value", + Value: "0", + Clock: 1522082244, + }, + }, + }, + "if prefix is configured, zabbix metrics should have that prefix in the key": { + Prefix: "telegraf.", + telegrafMetrics: []telegraf.Metric{ + testutil.MustMetric("name", + map[string]string{ + "host": "hostname", + }, + map[string]interface{}{ + "value": int64(0), + }, + time.Unix(1522082244, 0), + ), + }, + zabbixMetrics: []zabbixRequestData{ + { + Host: "hostname", + Key: "telegraf.name.value", + Value: "0", + Clock: 1522082244, + }, + }, + }, + "if skip_measurement_prefix is configured, zabbix metrics should have to skip that prefix in the key": { + SkipMeasurementPrefix: true, + telegrafMetrics: []telegraf.Metric{ + testutil.MustMetric("name", + map[string]string{ + "host": "hostname", + }, + map[string]interface{}{ + "value": int64(0), + }, + time.Unix(1522082244, 0), + ), + }, + zabbixMetrics: []zabbixRequestData{ + { + Host: "hostname", + Key: "value", + Value: "0", + Clock: 1522082244, + }, + }, + }, + "if AgentActive is configured, zabbix metrics should be sent respecting that protocol": { + AgentActive: true, + telegrafMetrics: []telegraf.Metric{ + testutil.MustMetric("name", + map[string]string{ + "host": "hostname", + }, + map[string]interface{}{ + "value": int64(0), + }, + time.Unix(1522082244, 0), + ), + }, + zabbixMetrics: []zabbixRequestData{ + { + Host: "hostname", + Key: "name.value", + Value: "0", + Clock: 1522082244, + }, + }, + }, + "metrics should be time sorted, oldest to newest, to avoid zabbix doing extra work when generating trends": { + telegrafMetrics: []telegraf.Metric{ + testutil.MustMetric("name", + map[string]string{ + "host": "hostnameD", + }, + map[string]interface{}{ + "value": int64(0), + }, + time.Unix(4444444444, 0), + ), + testutil.MustMetric("name", + map[string]string{ + "host": "hostnameC", + }, + map[string]interface{}{ + "value": int64(0), + }, + time.Unix(3333333333, 0), + ), + testutil.MustMetric("name", + map[string]string{ + "host": "hostnameA", + }, + map[string]interface{}{ + "value": int64(0), + }, + time.Unix(1111111111, 0), + ), + testutil.MustMetric("name", + map[string]string{ + "host": "hostnameB", + }, + map[string]interface{}{ + "value": int64(0), + }, + time.Unix(2222222222, 0), + ), + }, + zabbixMetrics: []zabbixRequestData{ + { + Host: "hostnameA", + Key: "name.value", + Value: "0", + Clock: 1111111111, + }, + { + Host: "hostnameB", + Key: "name.value", + Value: "0", + Clock: 2222222222, + }, + { + Host: "hostnameC", + Key: "name.value", + Value: "0", + Clock: 3333333333, + }, + { + Host: "hostnameD", + Key: "name.value", + Value: "0", + Clock: 4444444444, + }, + }, + }, + } + + for desc, test := range tests { + t.Run(desc, func(t *testing.T) { + // Simulate a Zabbix server to get the data sent. It has a timeout to avoid waiting forever. + listener, err := net.Listen("tcp", "127.0.0.1:") + require.NoError(t, err) + defer listener.Close() + + z := &Zabbix{ + Address: listener.Addr().String(), + Prefix: test.Prefix, + HostTag: "host", + SkipMeasurementPrefix: test.SkipMeasurementPrefix, + AgentActive: test.AgentActive, + LLDSendInterval: config.Duration(10 * time.Minute), + Log: testutil.Logger{}, + } + require.NoError(t, z.Init()) + + wg := sync.WaitGroup{} + wg.Add(1) + go func() { + success := make(chan zabbixRequest, 1) + + go func() { + success <- listenForZabbixMetric(t, listener, len(test.zabbixMetrics) == 0) + }() + + // By default we use trappers + requestType := "sender data" + if test.AgentActive { + requestType = "agent data" + } + + select { + case request := <-success: + require.Equal(t, requestType, request.Request) + compareData(t, test.zabbixMetrics, request.Data) + case <-time.After(1 * time.Second): + require.Empty(t, test.zabbixMetrics, "no metrics should be expected if the connection times out") + } + + wg.Done() + }() + + require.NoError(t, z.Write(test.telegrafMetrics)) + + // Wait for zabbix server emulator to finish + wg.Wait() + }) + } +} + +// TestLLD tests how LLD metrics are sent simulating the time passing. +// LLD is sent each LLDSendInterval. Only new data. +// LLD data is cleared LLDClearInterval. +func TestLLD(t *testing.T) { + // Telegraf metric which will be sent repeatedly + m := testutil.MustMetric( + "name", + map[string]string{"host": "hostA", "foo": "bar"}, + map[string]interface{}{"value": int64(0)}, + time.Unix(0, 0), + ) + + mNew := testutil.MustMetric( + "name", + map[string]string{"host": "hostA", "foo": "moo"}, + map[string]interface{}{"value": int64(0)}, + time.Unix(0, 0), + ) + + // Expected Zabbix metric generated + zabbixMetric := zabbixRequestData{ + Host: "hostA", + Key: "telegraf.name.value[bar]", + Value: "0", + Clock: 0, + } + + // Expected Zabbix metric generated + zabbixMetricNew := zabbixRequestData{ + Host: "hostA", + Key: "telegraf.name.value[moo]", + Value: "0", + Clock: 0, + } + + // Expected Zabbix LLD metric generated + zabbixLLDMetric := zabbixRequestData{ + Host: "hostA", + Key: "telegraf.lld.name.foo", + Value: `{"data":[{"{#FOO}":"bar"}]}`, + Clock: 0, + } + + // Expected Zabbix LLD metric generated + zabbixLLDMetricNew := zabbixRequestData{ + Host: "hostA", + Key: "telegraf.lld.name.foo", + Value: `{"data":[{"{#FOO}":"bar"},{"{#FOO}":"moo"}]}`, + Clock: 0, + } + + // Simulate a Zabbix server to get the data sent + listener, err := net.Listen("tcp", "127.0.0.1:") + require.NoError(t, err) + defer listener.Close() + + z := &Zabbix{ + Address: listener.Addr().String(), + Prefix: "telegraf.", + HostTag: "host", + LLDSendInterval: config.Duration(10 * time.Minute), + LLDClearInterval: config.Duration(1 * time.Hour), + Log: testutil.Logger{}, + } + require.NoError(t, z.Init()) + + wg := sync.WaitGroup{} + wg.Add(1) + + // Read first packet with two metrics, then the first autoregister packet and the second autoregister packet. + go func() { + // First packet with metrics + request := listenForZabbixMetric(t, listener, false) + compareData(t, []zabbixRequestData{zabbixMetric}, request.Data) + + // Second packet, while time has not surpassed LLDSendInterval + request = listenForZabbixMetric(t, listener, false) + compareData(t, []zabbixRequestData{zabbixMetric}, request.Data) + + // Third packet, time has surpassed LLDSendInterval, metrics + LLD + request = listenForZabbixMetric(t, listener, false) + require.Len(t, request.Data, 2, "Expected 2 metrics") + request.Data[1].Clock = 0 // Ignore lld request clock + compareData(t, []zabbixRequestData{zabbixMetric, zabbixLLDMetric}, request.Data) + + // Fourth packet with metrics + request = listenForZabbixMetric(t, listener, false) + compareData(t, []zabbixRequestData{zabbixMetric}, request.Data) + + // Fifth packet, time has surpassed LLDSendInterval, metrics. No LLD as there is nothing new. + request = listenForZabbixMetric(t, listener, false) + compareData(t, []zabbixRequestData{zabbixMetric}, request.Data) + + // Sixth packet, new LLD info, but time has not surpassed LLDSendInterval + request = listenForZabbixMetric(t, listener, false) + compareData(t, []zabbixRequestData{zabbixMetricNew}, request.Data) + + // Seventh packet, time has surpassed LLDSendInterval, metrics + LLD. + // Also, time has surpassed LLDClearInterval, so LLD is cleared. + request = listenForZabbixMetric(t, listener, false) + require.Len(t, request.Data, 2, "Expected 2 metrics") + request.Data[1].Clock = 0 // Ignore lld request clock + compareData(t, []zabbixRequestData{zabbixMetric, zabbixLLDMetricNew}, request.Data) + + // Eighth packet, time host not surpassed LLDSendInterval, just metrics. + request = listenForZabbixMetric(t, listener, false) + compareData(t, []zabbixRequestData{zabbixMetric}, request.Data) + + // Ninth packet, time has surpassed LLDSendInterval, metrics + LLD. + // Just the info of the zabbixMetric as zabbixMetricNew has not been seen since LLDClearInterval. + request = listenForZabbixMetric(t, listener, false) + require.Len(t, request.Data, 2, "Expected 2 metrics") + request.Data[1].Clock = 0 // Ignore lld request clock + compareData(t, []zabbixRequestData{zabbixMetric, zabbixLLDMetric}, request.Data) + + wg.Done() + }() + + // First packet + require.NoError(t, z.Write([]telegraf.Metric{m})) + + // Second packet, while time has not surpassed LLDSendInterval + require.NoError(t, z.Write([]telegraf.Metric{m})) + + // Simulate time passing for a new LLD send + z.lldLastSend = time.Now().Add(-time.Duration(z.LLDSendInterval)).Add(-time.Millisecond) + + // Third packet, time has surpassed LLDSendInterval, metrics + LLD + require.NoError(t, z.Write([]telegraf.Metric{m})) + + // Fourth packet + require.NoError(t, z.Write([]telegraf.Metric{m})) + + // Simulate time passing for a new LLD send + z.lldLastSend = time.Now().Add(-time.Duration(z.LLDSendInterval)).Add(-time.Millisecond) + + // Fifth packet, time has surpassed LLDSendInterval, metrics. No LLD as there is nothing new. + require.NoError(t, z.Write([]telegraf.Metric{m})) + + // Sixth packet, new LLD info, but time has not surpassed LLDSendInterval + require.NoError(t, z.Write([]telegraf.Metric{mNew})) + + // Simulate time passing for LLD clear + z.lldLastSend = time.Now().Add(-time.Duration(z.LLDClearInterval)).Add(-time.Millisecond) + + // Seventh packet, time has surpassed LLDSendInterval and LLDClearInterval, metrics + LLD. + // LLD will be cleared. + require.NoError(t, z.Write([]telegraf.Metric{m})) + + // Eighth packet, time host not surpassed LLDSendInterval, just metrics. + require.NoError(t, z.Write([]telegraf.Metric{m})) + + // Simulate time passing for a new LLD send + z.lldLastSend = time.Now().Add(-time.Duration(z.LLDSendInterval)).Add(-time.Millisecond) + + // Ninth packet, time has surpassed LLDSendInterval, metrics + LLD. + require.NoError(t, z.Write([]telegraf.Metric{m})) + + // Wait for zabbix server emulator to finish + wg.Wait() +} + +// TestAutoregister tests that autoregistration requests are sent to zabbix if enabled +func TestAutoregister(t *testing.T) { + // Simulate a Zabbix server to get the data sent + listener, err := net.Listen("tcp", "127.0.0.1:") + require.NoError(t, err) + defer listener.Close() + + z := &Zabbix{ + Address: listener.Addr().String(), + Prefix: "telegraf.", + HostTag: "host", + SkipMeasurementPrefix: false, + AgentActive: false, + Autoregister: "xxx", + AutoregisterResendInterval: config.Duration(time.Minute * 5), + Log: testutil.Logger{}, + } + require.NoError(t, z.Init()) + + wg := sync.WaitGroup{} + wg.Add(1) + + // Read first packet with two metrics, then the first autoregister packet and the second autoregister packet. + go func() { + // Accept packet with the two metrics sent + _ = listenForZabbixMetric(t, listener, false) + + // Read the first autoregister packet + request := listenForZabbixMetric(t, listener, false) + require.Equal(t, "active checks", request.Request) + require.Equal(t, "xxx", request.HostMetadata) + + hostsRegistered := []string{request.Host} + + // Read the second autoregister packet + request = listenForZabbixMetric(t, listener, false) + require.Equal(t, "active checks", request.Request) + require.Equal(t, "xxx", request.HostMetadata) + + // Check we have received autoregistration for both hosts + hostsRegistered = append(hostsRegistered, request.Host) + require.ElementsMatch(t, []string{"hostA", "hostB"}, hostsRegistered) + + wg.Done() + }() + + err = z.Write([]telegraf.Metric{ + testutil.MustMetric( + "name", + map[string]string{"host": "hostA"}, + map[string]interface{}{"value": int64(0)}, + time.Now(), + ), + testutil.MustMetric( + "name", + map[string]string{"host": "hostB"}, + map[string]interface{}{"value": int64(0)}, + time.Now(), + ), + }) + require.NoError(t, err) + + // Wait for zabbix server emulator to finish + wg.Wait() +} + +// compareData compares generated data with expected data ignoring slice order if all Clocks are +// the same. +// This is useful for metrics with several fields that should produce several Zabbix values that +// could not be sorted by clock +func compareData(t *testing.T, expected []zabbixRequestData, data []zabbixRequestData) { + t.Helper() + + var clock int64 + + sameClock := true + + // Check if all clocks are the same + for i := 0; i < len(data); i++ { + if i == 0 { + clock = data[i].Clock + } else if clock != data[i].Clock { + sameClock = false + + break + } + } + + // Zabbix requests with LLD data contains a JSON value with an array of dictionaries. + // That array order depends in the access to a map, so it does not have a defined order. + // To compare the data, we need to sort the array of dictionaries. + // Before comparing the requests, sort those values. + // To detect if a request contains LLD data, try to unmarshal it to a ZabbixLLDValue. + // If it could be unmarshalled, sort the slice and marshal it again. + for i := 0; i < len(data); i++ { + var lldValue zabbixLLDValue + + err := json.Unmarshal([]byte(data[i].Value), &lldValue) + if err == nil { + sort.Slice(lldValue.Data, func(i, j int) bool { + // Generate a global order based on the keys and values present in the map + keysValuesI := make([]string, 0, len(lldValue.Data[i])*2) + keysValuesJ := make([]string, 0, len(lldValue.Data[j])*2) + for k, v := range lldValue.Data[i] { + keysValuesI = append(keysValuesI, k, v) + } + for k, v := range lldValue.Data[j] { + keysValuesJ = append(keysValuesJ, k, v) + } + + sort.Strings(keysValuesI) + sort.Strings(keysValuesJ) + + return strings.Join(keysValuesI, "") < strings.Join(keysValuesJ, "") + }) + sortedValue, err := json.Marshal(lldValue) + require.NoError(t, err) + + data[i].Value = string(sortedValue) + } + } + + if sameClock { + require.ElementsMatch(t, expected, data) + } else { + require.Equal(t, expected, data) + } +} + +// listenForZabbixMetric starts a TCP server listening for one Zabbix metric. +// ignoreAcceptError is used to ignore the error when the server is closed. +func listenForZabbixMetric(t *testing.T, listener net.Listener, ignoreAcceptError bool) zabbixRequest { + t.Helper() + + conn, err := listener.Accept() + if err != nil && ignoreAcceptError { + return zabbixRequest{} + } + + require.NoError(t, err) + + // Obtain request from the mock zabbix server + // Read protocol header and version + header := make([]byte, 5) + _, err = conn.Read(header) + require.NoError(t, err) + + // Read data length + dataLengthRaw := make([]byte, 8) + _, err = conn.Read(dataLengthRaw) + require.NoError(t, err) + + dataLength := binary.LittleEndian.Uint64(dataLengthRaw) + + // Read data content + content := make([]byte, dataLength) + _, err = conn.Read(content) + require.NoError(t, err) + + // The zabbix output checks that there are not errors + // Simulated response from the server + resp := []byte("ZBXD\x01\x00\x00\x00\x00\x00\x00\x00\x00{\"response\": \"success\", \"info\": \"\"}\n") + _, err = conn.Write(resp) + require.NoError(t, err) + + // Close connection after reading the client data + conn.Close() + + // Strip zabbix header and get JSON request + var request zabbixRequest + require.NoError(t, json.Unmarshal(content, &request)) + + return request +} + +func TestBuildZabbixMetric(t *testing.T) { + prefix := "prefix." + hostTag := "host" + + z := &Zabbix{ + Prefix: prefix, + HostTag: hostTag, + } + + zm, err := z.buildZabbixMetric(testutil.MustMetric( + "name", + map[string]string{hostTag: "hostA", "foo": "bar", "a": "b"}, + map[string]interface{}{}, + time.Now()), + "value", + 1, + ) + require.NoError(t, err) + require.Equal(t, fmt.Sprintf("%sname.value[b,bar]", prefix), zm.Key) + + zm, err = z.buildZabbixMetric(testutil.MustMetric( + "name", + map[string]string{hostTag: "hostA"}, + map[string]interface{}{}, + time.Now()), + "value", + 1, + ) + require.NoError(t, err) + require.Equal(t, fmt.Sprintf("%sname.value", prefix), zm.Key) +} + +func TestGetHostname(t *testing.T) { + hostname, err := os.Hostname() + require.NoError(t, err) + + tests := map[string]struct { + HostTag string + Host string + Tags map[string]string + Result string + }{ + "metric with host tag": { + HostTag: "host", + Tags: map[string]string{ + "host": "bar", + }, + Result: "bar", + }, + "metric with host tag changed": { + HostTag: "source", + Tags: map[string]string{ + "source": "bar", + }, + Result: "bar", + }, + "metric with no host tag": { + Tags: map[string]string{}, + Result: hostname, + }, + } + + for desc, test := range tests { + t.Run(desc, func(t *testing.T) { + metric := testutil.MustMetric( + "name", + test.Tags, + map[string]interface{}{}, + time.Now(), + ) + + host, err := getHostname(test.HostTag, metric) + require.NoError(t, err) + require.Equal(t, test.Result, host) + }) + } +}