Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Embedding Kafka Lag Exporter #674

Merged
merged 21 commits into from
Jul 14, 2021
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# Main (unreleased)

- [FEATURE] Added [Kafka Lag exporter](https://github.com/davidmparrott/kafka_exporter)
gaantunes marked this conversation as resolved.
Show resolved Hide resolved
integration. (@gaantunes)

- [FEATURE] Add TLS config options for tempo `remote_write`s. (@mapno)

# v0.16.0 (2021-06-17)
Expand Down
77 changes: 77 additions & 0 deletions docs/configuration-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -2265,6 +2265,8 @@ consul_exporter: <consul_exporter_config>
# Controls the windows_exporter integration
windows_exporter: <windows_exporter_config>

kafka_exporter: <kafka_exporter_config>

# Automatically collect metrics from enabled integrations. If disabled,
# integrations will be run but not scraped and thus not remote_written. Metrics
# for integrations will be exposed at /integrations/<integration_key>/metrics
Expand Down Expand Up @@ -3742,3 +3744,78 @@ Full reference of options:
# Maps to collector.logical_disk.volume-blacklist in windows_exporter
[blacklist: <string> | default=".+"]
```

### kafka_exporter_config

The `kafka_exporter_config` block configures the `kafka_exporter`
integration, which is an embedded version of [`kafka_exporter`](https://github.com/davidmparrott/kafka_exporter).
This allows for the collection of Kafka Lag metrics and exposing them as Prometheus metrics.

Full reference of options:

```yaml
gaantunes marked this conversation as resolved.
Show resolved Hide resolved

gaantunes marked this conversation as resolved.
Show resolved Hide resolved
# Address array (host:port) of Kafka server
[kafka_uris: [- string]
gaantunes marked this conversation as resolved.
Show resolved Hide resolved

# Connect using SASL/PLAIN
[use_sasl: <bool>]

# Only set this to false if using a non-Kafka SASL proxy
[use_sasl_handshake: <bool> | default = true]

# SASL user name
[sasl_username: <string>]

# SASL user password
[sasl_password: <string>]

# The SASL SCRAM SHA algorithm sha256 or sha512 as mechanism
[sasl_mechanism: <string>]

# Connect using TLS
[use_tls: <bool>]

# The optional certificate authority file for TLS client authentication
[cafile: <string>]

# The optional certificate file for TLS client authentication
[certfile: <string>]

# The optional key file for TLS client authentication
[keyfile: <string>]

# If true, the server's certificate will not be checked for validity. This will make your HTTPS connections insecure
[insecure_skip_tls_verify: <bool>]

# Kafka broker version
[kafka_version: <string> | default = "2.0.0"]

# if you need to use a group from zookeeper
[use_zookeeper_lag: <bool>]

# Address array (hosts) of zookeeper server.
[zookeeper_uris: <[]string>]

# Kafka cluster name
[kafka_cluster_name: <string>]

# Metadata refresh interval
[metadata_refresh_interval: <duration> | default = "1m"]

# If true, all scrapes will trigger kafka operations otherwise, they will share results. WARN: This should be disabled on large clusters
[allow_concurrency: <bool> | default = true]

# Maximum number of offsets to store in the interpolation table for a partition
[max_offsets: <int> | default = 1000]

# How frequently should the interpolation table be pruned, in seconds
[prune_interval_seconds: <int> | default = 30]

# Regex filter for topics to be monitored
[topics_filter_regex: <string> | default = ".*"]

# Regex filter for consumer groups to be monitored
[groups_filter_regex: <string> | default = ".*"]

```
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ go 1.16

require (
contrib.go.opencensus.io/exporter/prometheus v0.2.0
github.com/Shopify/sarama v1.28.0
github.com/cortexproject/cortex v1.8.2-0.20210428155238-d382e1d80eaf
github.com/davidmparrott/kafka_exporter/v2 v2.0.1
github.com/drone/envsubst v1.0.2
github.com/fatih/structs v0.0.0-20180123065059-ebf56d35bba7
github.com/go-kit/kit v0.10.0
Expand Down
8 changes: 8 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ github.com/SermoDigital/jose v0.0.0-20180104203859-803625baeddc/go.mod h1:ARgCUh
github.com/Shopify/sarama v1.19.0/go.mod h1:FVkBWblsNy7DGZRfXLU0O9RCGt5g3g3yEuWXgklEdEo=
github.com/Shopify/sarama v1.22.2-0.20190604114437-cd910a683f9f/go.mod h1:XLH1GYJnLVE0XCr6KdJGVJRTwY30moWNJ4sERjXX6fs=
github.com/Shopify/sarama v1.27.1/go.mod h1:g5s5osgELxgM+Md9Qni9rzo7Rbt+vvFQI4bt/Mc93II=
github.com/Shopify/sarama v1.27.2/go.mod h1:g5s5osgELxgM+Md9Qni9rzo7Rbt+vvFQI4bt/Mc93II=
github.com/Shopify/sarama v1.28.0 h1:lOi3SfE6OcFlW9Trgtked2aHNZ2BIG/d6Do+PEUAqqM=
github.com/Shopify/sarama v1.28.0/go.mod h1:j/2xTrU39dlzBmsxF1eQ2/DdWrxyBCl6pzz7a81o/ZY=
github.com/Shopify/toxiproxy v2.1.4+incompatible h1:TKdv8HiTLgE5wdJuEML90aBgNWsokNbMijUGhmcoBJc=
Expand Down Expand Up @@ -377,6 +378,8 @@ github.com/davecgh/go-spew v0.0.0-20161028175848-04cdfd42973b/go.mod h1:J7Y8YcW2
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davidmparrott/kafka_exporter/v2 v2.0.1 h1:nGn+MKV8z08NK4xqcYSa3fBCs/VPVesT/5kboFWJaiE=
github.com/davidmparrott/kafka_exporter/v2 v2.0.1/go.mod h1:n3ho8mZ5tZcmr8NAu/SjQHY61CDTqXtrACcEYwLXv4Y=
github.com/denisenkom/go-mssqldb v0.0.0-20180620032804-94c9c97e8c9f/go.mod h1:xN/JuLBIz4bjkxNmByTiV1IbhfnYb6oo99phBn4Eqhc=
github.com/denisenkom/go-mssqldb v0.0.0-20190515213511-eb9f6a1743f3/go.mod h1:zAg7JM8CkOJ43xKXIj7eRO9kmWm/TW578qo+oDO6tuM=
github.com/denisenkom/go-mssqldb v0.0.0-20190707035753-2be1aa521ff4/go.mod h1:zAg7JM8CkOJ43xKXIj7eRO9kmWm/TW578qo+oDO6tuM=
Expand Down Expand Up @@ -1111,6 +1114,8 @@ github.com/kr/pty v1.1.8/go.mod h1:O1sed60cT9XZ5uDucP5qwvh+TE3NnUj51EiZO/lmSfw=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/krallistic/kazoo-go v0.0.0-20170526135507-a15279744f4e h1:IWiVY66Xy9YrDZ28qJMt1UTlh6x9UGW0aDH/o58CSnA=
github.com/krallistic/kazoo-go v0.0.0-20170526135507-a15279744f4e/go.mod h1:Rq6003vCNoJNrT6ol0hMebQ3GWLWXSHrD/QcMlXt0EE=
github.com/kshvakov/clickhouse v1.3.5/go.mod h1:DMzX7FxRymoNkVgizH0DWAL8Cur7wHLgx3MUnGwJqpE=
github.com/kubernetes/apimachinery v0.0.0-20190119020841-d41becfba9ee/go.mod h1:Pe/YBTPc3vqoMkbuIWPH8CF9ehINdvNyS0dP3J6HC0s=
github.com/kylelemons/godebug v0.0.0-20160406211939-eadb3ce320cb/go.mod h1:B69LEHPfb2qLo0BaaOLcbitczOKLWTsrBG9LczfCD4k=
Expand Down Expand Up @@ -1536,6 +1541,7 @@ github.com/ryanuber/columnize v2.1.0+incompatible/go.mod h1:sm1tb6uqfes/u+d4ooFo
github.com/ryanuber/go-glob v0.0.0-20170128012129-256dc444b735/go.mod h1:807d1WSdnB0XRJzKNil9Om6lcp/3a0v4qIHxIXzX/Yc=
github.com/safchain/ethtool v0.0.0-20200218184317-f459e2d13664/go.mod h1:Z0q5wiBQGYcxhMZ6gUqHn6pYNLypFAvaL3UvgZLR0U4=
github.com/samuel/go-zookeeper v0.0.0-20180130194729-c4fab1ac1bec/go.mod h1:gi+0XIa01GRL2eRQVjQkKGqKF3SF9vZR/HnPullcV2E=
github.com/samuel/go-zookeeper v0.0.0-20190923202752-2cc03de413da h1:p3Vo3i64TCLY7gIfzeQaUJ+kppEO5WQG3cL8iE8tGHU=
github.com/samuel/go-zookeeper v0.0.0-20190923202752-2cc03de413da/go.mod h1:gi+0XIa01GRL2eRQVjQkKGqKF3SF9vZR/HnPullcV2E=
github.com/sanity-io/litter v1.2.0/go.mod h1:JF6pZUFgu2Q0sBZ+HSV35P8TVPI1TTzEwyu9FXAw2W4=
github.com/santhosh-tekuri/jsonschema v1.2.4/go.mod h1:TEAUOeZSmIxTTuHatJzrvARHiuO9LYd+cIxzgEHCQI4=
Expand Down Expand Up @@ -1721,8 +1727,10 @@ github.com/xdg-go/scram v1.0.2 h1:akYIkZ28e6A96dkWNJQu3nmCzH3YfwMPQExUYDaRv7w=
github.com/xdg-go/scram v1.0.2/go.mod h1:1WAq6h33pAW+iRreB34OORO2Nf7qel3VV3fjBj+hCSs=
github.com/xdg-go/stringprep v1.0.2 h1:6iq84/ryjjeRmMJwxutI51F2GIPlP5BfTvXHeYjyhBc=
github.com/xdg-go/stringprep v1.0.2/go.mod h1:8F9zXuvzgwmyT5DUm4GUfZGDdT3W+LCvS6+da4O5kxM=
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c h1:u40Z8hqBAAQyv+vATcGgV0YCnDjqSL7/q/JyPhhJSPk=
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c/go.mod h1:lB8K/P019DLNhemzwFU4jHLhdvlE6uDZjXFejJXr49I=
github.com/xdg/stringprep v0.0.0-20180714160509-73f8eece6fdc/go.mod h1:Jhud4/sHMO4oL310DaZAKk9ZaJ08SJfe+sJh0HrGL1Y=
github.com/xdg/stringprep v1.0.0 h1:d9X0esnoa3dFsV0FG35rAT0RIhYFlPq7MiP+DW89La0=
github.com/xdg/stringprep v1.0.0/go.mod h1:Jhud4/sHMO4oL310DaZAKk9ZaJ08SJfe+sJh0HrGL1Y=
github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2 h1:eY9dn8+vbi4tKz5Qo6v2eYzo7kUS51QINcR5jNpbZS8=
github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU=
Expand Down
1 change: 1 addition & 0 deletions pkg/integrations/install/install.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
_ "github.com/grafana/agent/pkg/integrations/consul_exporter" // register consul_exporter
_ "github.com/grafana/agent/pkg/integrations/dnsmasq_exporter" // register dnsmasq_exporter
_ "github.com/grafana/agent/pkg/integrations/elasticsearch_exporter" // register elasticsearch_exporter
_ "github.com/grafana/agent/pkg/integrations/kafka_exporter" // register kafka_exporter
_ "github.com/grafana/agent/pkg/integrations/memcached_exporter" // register memcached_exporter
_ "github.com/grafana/agent/pkg/integrations/mysqld_exporter" // register mysqld_exporter
_ "github.com/grafana/agent/pkg/integrations/node_exporter" // register node_exporter
Expand Down
168 changes: 168 additions & 0 deletions pkg/integrations/kafka_exporter/kafka_exporter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
package kafka_exporter //nolint:golint

import (
"fmt"

"github.com/Shopify/sarama"
kafka_exporter "github.com/davidmparrott/kafka_exporter/v2/exporter"
"github.com/go-kit/kit/log"
"github.com/grafana/agent/pkg/integrations"
"github.com/grafana/agent/pkg/integrations/config"
)

// DefaultConfig holds the default settings for the kafka_lag_exporter
// integration.
var DefaultConfig = Config{
UseSASLHandshake: true,
KafkaVersion: sarama.V2_0_0_0.String(),
MetadataRefreshInterval: "1m",
AllowConcurrent: true,
MaxOffsets: 1000,
PruneIntervalSeconds: 30,
TopicsFilter: ".*",
GroupFilter: ".*",
}

// Config controls kafka_exporter
type Config struct {
Common config.Common `yaml:",inline"`

// Address array (host:port) of Kafka server
KafkaURIs []string `yaml:"kafka_uris,omitempty"`

// Connect using SASL/PLAIN
UseSASL bool `yaml:"use_sasl,omitempty"`

// Only set this to false if using a non-Kafka SASL proxy
UseSASLHandshake bool `yaml:"use_sasl_handshake,omitempty"`

// SASL user name
SASLUsername string `yaml:"sasl_username,omitempty"`

// SASL user password
SASLPassword string `yaml:"sasl_password,omitempty"`

// The SASL SCRAM SHA algorithm sha256 or sha512 as mechanism
SASLMechanism string `yaml:"sasl_mechanism,omitempty"`

// Connect using TLS
UseTLS bool `yaml:"use_tls,omitempty"`

// The optional certificate authority file for TLS client authentication
CAFile string `yaml:"cafile,omitempty"`

// The optional certificate file for TLS client authentication
CertFile string `yam:"certfile,omitempty"`

// The optional key file for TLS client authentication
KeyFile string `yaml:"keyfile,omitempty"`
gaantunes marked this conversation as resolved.
Show resolved Hide resolved

// If true, the server's certificate will not be checked for validity. This will make your HTTPS connections insecure
InsecureSkipTLSVerify bool `yaml:"insecure_skip_tls_verify,omitempty"`
gaantunes marked this conversation as resolved.
Show resolved Hide resolved

// Kafka broker version
KafkaVersion string `yaml:"kafka_version,omitempty"`
gaantunes marked this conversation as resolved.
Show resolved Hide resolved

// if you need to use a group from zookeeper
UseZooKeeperLag bool `yaml:"use_zookeeper_lag,omitempty"`

// Address array (hosts) of zookeeper server.
ZookeeperURIs []string `yaml:"zookeeper_uris,omitempty"`

// Kafka cluster name
ClusterName string `yaml:"kafka_cluster_name,omitempty"`

// Metadata refresh interval
MetadataRefreshInterval string `yaml:"metadata_refresh_interval,omitempty"`

// If true, all scrapes will trigger kafka operations otherwise, they will share results. WARN: This should be disabled on large clusters
AllowConcurrent bool `yaml:"allow_concurrency,omitempty"`

// Maximum number of offsets to store in the interpolation table for a partition
MaxOffsets int `yaml:"max_offsets,omitempty"`

// How frequently should the interpolation table be pruned, in seconds
PruneIntervalSeconds int `yaml:"prune_interval_seconds,omitempty"`

// Regex filter for topics to be monitored
TopicsFilter string `yaml:"topics_filter_regex,omitempty"`

// Regex filter for consumer groups to be monitored
GroupFilter string `yaml:"groups_filter_regex,omitempty"`
}

// UnmarshalYAML implements yaml.Unmarshaler for Config
func (c *Config) UnmarshalYAML(unmarshal func(interface{}) error) error {
*c = DefaultConfig

type plain Config
return unmarshal((*plain)(c))
}

// Name returns the name of the integration that this config represents.
func (c *Config) Name() string {
return "kafka_exporter"
}

// CommonConfig returns the common settings shared across all configs for
// integrations.
func (c *Config) CommonConfig() config.Common {
return c.Common
}

// NewIntegration creates a new elasticsearch_exporter
func (c *Config) NewIntegration(logger log.Logger) (integrations.Integration, error) {
return New(logger, c)
}

func init() {
integrations.RegisterIntegration(&Config{})
}

// New creates a new kafka_exporter integration.
func New(logger log.Logger, c *Config) (integrations.Integration, error) {
if len(c.KafkaURIs) == 0 || c.KafkaURIs[0] == "" {
return nil, fmt.Errorf("empty kafka_uris provided")
}
if c.UseTLS && (c.CertFile == "" || c.KeyFile == "") {
return nil, fmt.Errorf("tls is enabled but key pair was not provided")
}
if c.UseSASL && (c.SASLPassword == "" || c.SASLUsername == "") {
return nil, fmt.Errorf("SASL is enabled but username or password was not provided")
}
if c.UseZooKeeperLag && (len(c.ZookeeperURIs) == 0 || c.ZookeeperURIs[0] == "") {
return nil, fmt.Errorf("zookeeper lag is enabled but no zookeeper uri was provided")
}

var options kafka_exporter.Options
options.Uri = c.KafkaURIs
options.UseSASL = c.UseSASL
options.UseSASLHandshake = c.UseSASLHandshake
options.SaslUsername = c.SASLUsername
options.SaslPassword = c.SASLPassword
options.SaslMechanism = c.SASLMechanism
options.UseTLS = c.UseTLS
options.TlsCAFile = c.CAFile
options.TlsCertFile = c.CertFile
options.TlsKeyFile = c.KeyFile
options.TlsInsecureSkipTLSVerify = c.InsecureSkipTLSVerify
options.KafkaVersion = c.KafkaVersion
options.UseZooKeeperLag = c.UseZooKeeperLag
options.UriZookeeper = c.ZookeeperURIs
options.Labels = c.ClusterName
options.MetadataRefreshInterval = c.MetadataRefreshInterval
options.AllowConcurrent = c.AllowConcurrent
options.MaxOffsets = c.MaxOffsets
options.PruneIntervalSeconds = c.PruneIntervalSeconds
gaantunes marked this conversation as resolved.
Show resolved Hide resolved

newExporter, err := kafka_exporter.New(logger, options, c.TopicsFilter, c.GroupFilter)
if err != nil {
return nil, fmt.Errorf("could not instanciate kafka lag exporter: %w", err)
}

return integrations.NewCollectorIntegration(
c.Name(),
integrations.WithCollectors(newExporter),
), nil

gaantunes marked this conversation as resolved.
Show resolved Hide resolved
}