Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Embedding Kafka Lag Exporter #674

Merged
merged 21 commits into from
Jul 14, 2021
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# Main (unreleased)

- [FEATURE] Added [Kafka Lag exporter](https://github.com/davidmparrott/kafka_exporter)
gaantunes marked this conversation as resolved.
Show resolved Hide resolved
integration. (@gaantunes)

- [FEATURE] (beta) A Grafana Agent Operator is now available. (@rfratto)

- [ENHANCEMENT] Error messages when installing the Grafana Agent for Grafana
Expand Down
98 changes: 98 additions & 0 deletions docs/configuration-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -2252,6 +2252,8 @@ consul_exporter: <consul_exporter_config>
# Controls the windows_exporter integration
windows_exporter: <windows_exporter_config>

kafka_lag_exporter: <kafka_lag_exporter_config>
gaantunes marked this conversation as resolved.
Show resolved Hide resolved

# Automatically collect metrics from enabled integrations. If disabled,
# integrations will be run but not scraped and thus not remote_written. Metrics
# for integrations will be exposed at /integrations/<integration_key>/metrics
Expand Down Expand Up @@ -3729,3 +3731,99 @@ Full reference of options:
# Maps to collector.logical_disk.volume-blacklist in windows_exporter
[blacklist: <string> | default=".+"]
```

### kafka_lag_exporter_config

The `kafka_lag_exporter_config` block configures the `kafka_lag_exporter`
integration, which is an embedded version of [`kafka_exporter`](https://github.com/davidmparrott/kafka_exporter).
This allows for the collection of Kafka Lag metrics and exposing them as Prometheus metrics.

Full reference of options:

```yaml
gaantunes marked this conversation as resolved.
Show resolved Hide resolved

gaantunes marked this conversation as resolved.
Show resolved Hide resolved
#Address array (host:port) of Kafka server

[kafka_uris: [- string] | default="localhost:11091"]
gaantunes marked this conversation as resolved.
Show resolved Hide resolved

#Connect using SASL/PLAIN

[use_sasl: <bool> | default=false]
gaantunes marked this conversation as resolved.
Show resolved Hide resolved

#Only set this to false if using a non-Kafka SASL proxy

[use_sasl_handshake: <bool> | default=true]
gaantunes marked this conversation as resolved.
Show resolved Hide resolved

#SASL user name

[sasl_username: <string> | default=""]

#SASL user password

[sasl_password: <string> | default=""]

#The SASL SCRAM SHA algorithm sha256 or sha512 as mechanism

[sasl_mechanism: <string> | default=""]

#Connect using TLS

[use_tls: <bool> | default=false]

#The optional certificate authority file for TLS client authentication

[tls_cafile: <string> | default=""]

#The optional certificate file for TLS client authentication

[tls_certfile: <string> | default=""]

#The optional key file for TLS client authentication

[tls_keyfile: <string> | default=""]

#If true, the server's certificate will not be checked for validity. This will make your HTTPS connections insecure

[tls_insecure_skip_tlsverify: <bool> | default=false]

#Kafka broker version

[kafka_version: <string> | default="2.0.0"]

#if you need to use a group from zookeeper

[use_zookeeper_lag: <bool> | default=false]

#Address array (hosts) of zookeeper server.

[zookeeper_uris: [- string] | default=""]

#Kafka cluster name

[kafka_cluster_name: [- string] | default="my-cluster"]

#Metadata refresh interval

[metadata_refresh_interval: [- string] | default="1m"]

#If true, all scrapes will trigger kafka operations otherwise, they will share results. WARN: This should be disabled on large clusters

[allow_concurrency: <bool> | default=true]

#Maximum number of offsets to store in the interpolation table for a partition

[max_offsets: <int> | default = 1000]

#How frequently should the interpolation table be pruned, in seconds

[prune_interval_seconds: <int> | default = 30]

#Regex filter for topics to be monitored

[topics_filter_regex: <string> | default=".*"]

#Regex filter for consumer groups to be monitored

[groups_filter_regex: <string> | default=".*"]

```
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ go 1.16

require (
contrib.go.opencensus.io/exporter/prometheus v0.2.0
github.com/Shopify/sarama v1.28.0
github.com/cortexproject/cortex v1.8.2-0.20210428155238-d382e1d80eaf
github.com/davidmparrott/kafka_exporter/v2 v2.0.1
github.com/drone/envsubst v1.0.2
github.com/fatih/structs v0.0.0-20180123065059-ebf56d35bba7
github.com/go-kit/kit v0.10.0
Expand Down
8 changes: 8 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ github.com/SermoDigital/jose v0.0.0-20180104203859-803625baeddc/go.mod h1:ARgCUh
github.com/Shopify/sarama v1.19.0/go.mod h1:FVkBWblsNy7DGZRfXLU0O9RCGt5g3g3yEuWXgklEdEo=
github.com/Shopify/sarama v1.22.2-0.20190604114437-cd910a683f9f/go.mod h1:XLH1GYJnLVE0XCr6KdJGVJRTwY30moWNJ4sERjXX6fs=
github.com/Shopify/sarama v1.27.1/go.mod h1:g5s5osgELxgM+Md9Qni9rzo7Rbt+vvFQI4bt/Mc93II=
github.com/Shopify/sarama v1.27.2/go.mod h1:g5s5osgELxgM+Md9Qni9rzo7Rbt+vvFQI4bt/Mc93II=
github.com/Shopify/sarama v1.28.0 h1:lOi3SfE6OcFlW9Trgtked2aHNZ2BIG/d6Do+PEUAqqM=
github.com/Shopify/sarama v1.28.0/go.mod h1:j/2xTrU39dlzBmsxF1eQ2/DdWrxyBCl6pzz7a81o/ZY=
github.com/Shopify/toxiproxy v2.1.4+incompatible h1:TKdv8HiTLgE5wdJuEML90aBgNWsokNbMijUGhmcoBJc=
Expand Down Expand Up @@ -377,6 +378,8 @@ github.com/davecgh/go-spew v0.0.0-20161028175848-04cdfd42973b/go.mod h1:J7Y8YcW2
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davidmparrott/kafka_exporter/v2 v2.0.1 h1:nGn+MKV8z08NK4xqcYSa3fBCs/VPVesT/5kboFWJaiE=
github.com/davidmparrott/kafka_exporter/v2 v2.0.1/go.mod h1:n3ho8mZ5tZcmr8NAu/SjQHY61CDTqXtrACcEYwLXv4Y=
github.com/denisenkom/go-mssqldb v0.0.0-20180620032804-94c9c97e8c9f/go.mod h1:xN/JuLBIz4bjkxNmByTiV1IbhfnYb6oo99phBn4Eqhc=
github.com/denisenkom/go-mssqldb v0.0.0-20190515213511-eb9f6a1743f3/go.mod h1:zAg7JM8CkOJ43xKXIj7eRO9kmWm/TW578qo+oDO6tuM=
github.com/denisenkom/go-mssqldb v0.0.0-20190707035753-2be1aa521ff4/go.mod h1:zAg7JM8CkOJ43xKXIj7eRO9kmWm/TW578qo+oDO6tuM=
Expand Down Expand Up @@ -1111,6 +1114,8 @@ github.com/kr/pty v1.1.8/go.mod h1:O1sed60cT9XZ5uDucP5qwvh+TE3NnUj51EiZO/lmSfw=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/krallistic/kazoo-go v0.0.0-20170526135507-a15279744f4e h1:IWiVY66Xy9YrDZ28qJMt1UTlh6x9UGW0aDH/o58CSnA=
github.com/krallistic/kazoo-go v0.0.0-20170526135507-a15279744f4e/go.mod h1:Rq6003vCNoJNrT6ol0hMebQ3GWLWXSHrD/QcMlXt0EE=
github.com/kshvakov/clickhouse v1.3.5/go.mod h1:DMzX7FxRymoNkVgizH0DWAL8Cur7wHLgx3MUnGwJqpE=
github.com/kubernetes/apimachinery v0.0.0-20190119020841-d41becfba9ee/go.mod h1:Pe/YBTPc3vqoMkbuIWPH8CF9ehINdvNyS0dP3J6HC0s=
github.com/kylelemons/godebug v0.0.0-20160406211939-eadb3ce320cb/go.mod h1:B69LEHPfb2qLo0BaaOLcbitczOKLWTsrBG9LczfCD4k=
Expand Down Expand Up @@ -1536,6 +1541,7 @@ github.com/ryanuber/columnize v2.1.0+incompatible/go.mod h1:sm1tb6uqfes/u+d4ooFo
github.com/ryanuber/go-glob v0.0.0-20170128012129-256dc444b735/go.mod h1:807d1WSdnB0XRJzKNil9Om6lcp/3a0v4qIHxIXzX/Yc=
github.com/safchain/ethtool v0.0.0-20200218184317-f459e2d13664/go.mod h1:Z0q5wiBQGYcxhMZ6gUqHn6pYNLypFAvaL3UvgZLR0U4=
github.com/samuel/go-zookeeper v0.0.0-20180130194729-c4fab1ac1bec/go.mod h1:gi+0XIa01GRL2eRQVjQkKGqKF3SF9vZR/HnPullcV2E=
github.com/samuel/go-zookeeper v0.0.0-20190923202752-2cc03de413da h1:p3Vo3i64TCLY7gIfzeQaUJ+kppEO5WQG3cL8iE8tGHU=
github.com/samuel/go-zookeeper v0.0.0-20190923202752-2cc03de413da/go.mod h1:gi+0XIa01GRL2eRQVjQkKGqKF3SF9vZR/HnPullcV2E=
github.com/sanity-io/litter v1.2.0/go.mod h1:JF6pZUFgu2Q0sBZ+HSV35P8TVPI1TTzEwyu9FXAw2W4=
github.com/santhosh-tekuri/jsonschema v1.2.4/go.mod h1:TEAUOeZSmIxTTuHatJzrvARHiuO9LYd+cIxzgEHCQI4=
Expand Down Expand Up @@ -1721,8 +1727,10 @@ github.com/xdg-go/scram v1.0.2 h1:akYIkZ28e6A96dkWNJQu3nmCzH3YfwMPQExUYDaRv7w=
github.com/xdg-go/scram v1.0.2/go.mod h1:1WAq6h33pAW+iRreB34OORO2Nf7qel3VV3fjBj+hCSs=
github.com/xdg-go/stringprep v1.0.2 h1:6iq84/ryjjeRmMJwxutI51F2GIPlP5BfTvXHeYjyhBc=
github.com/xdg-go/stringprep v1.0.2/go.mod h1:8F9zXuvzgwmyT5DUm4GUfZGDdT3W+LCvS6+da4O5kxM=
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c h1:u40Z8hqBAAQyv+vATcGgV0YCnDjqSL7/q/JyPhhJSPk=
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c/go.mod h1:lB8K/P019DLNhemzwFU4jHLhdvlE6uDZjXFejJXr49I=
github.com/xdg/stringprep v0.0.0-20180714160509-73f8eece6fdc/go.mod h1:Jhud4/sHMO4oL310DaZAKk9ZaJ08SJfe+sJh0HrGL1Y=
github.com/xdg/stringprep v1.0.0 h1:d9X0esnoa3dFsV0FG35rAT0RIhYFlPq7MiP+DW89La0=
github.com/xdg/stringprep v1.0.0/go.mod h1:Jhud4/sHMO4oL310DaZAKk9ZaJ08SJfe+sJh0HrGL1Y=
github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2 h1:eY9dn8+vbi4tKz5Qo6v2eYzo7kUS51QINcR5jNpbZS8=
github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU=
Expand Down
1 change: 1 addition & 0 deletions pkg/integrations/install/install.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,5 @@ import (
_ "github.com/grafana/agent/pkg/integrations/redis_exporter" // register redis_exporter
_ "github.com/grafana/agent/pkg/integrations/statsd_exporter" // register statsd_exporter
_ "github.com/grafana/agent/pkg/integrations/windows_exporter" // register windows_exporter
_ "github.com/grafana/agent/pkg/integrations/kafka_lag_exporter" // register kafka_lag_exporter
)
173 changes: 173 additions & 0 deletions pkg/integrations/kafka_lag_exporter/kafka_lag_exporter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
package kafka_lag_exporter
gaantunes marked this conversation as resolved.
Show resolved Hide resolved

import (
"fmt"

"github.com/go-kit/kit/log"
"github.com/grafana/agent/pkg/integrations"
"github.com/grafana/agent/pkg/integrations/config"

"github.com/Shopify/sarama"
kafka_exporter "github.com/davidmparrott/kafka_exporter/v2/exporter"
)

// DefaultConfig holds the default settings for the elasticsearch_exporter
// integration.
var DefaultConfig = Config{
KafkaUri: []string{"localhost:11091"},
UseSASL: true,
UseTLS: false,
gaantunes marked this conversation as resolved.
Show resolved Hide resolved
SaslUsername: "test",
SaslPassword: "test",
gaantunes marked this conversation as resolved.
Show resolved Hide resolved
KafkaVersion: sarama.V2_0_0_0.String(),
UseZooKeeperLag: false,
Labels: "dev-cluster",
gaantunes marked this conversation as resolved.
Show resolved Hide resolved
MetadataRefreshInterval: "1m",
AllowConcurrent: true,
MaxOffsets: 1000,
PruneIntervalSeconds: 30,
}

type Config struct {
gaantunes marked this conversation as resolved.
Show resolved Hide resolved
Common config.Common `yaml:",inline"`

// Exporter configuration
gaantunes marked this conversation as resolved.
Show resolved Hide resolved

//Address array (host:port) of Kafka server
gaantunes marked this conversation as resolved.
Show resolved Hide resolved
KafkaUri []string `yaml:"kafka_uris,omitempty"`

//Connect using SASL/PLAIN
UseSASL bool `yaml:"use_sasl,omitempty"`

//Only set this to false if using a non-Kafka SASL proxy
UseSASLHandshake bool `yaml:"use_sasl_handshake,omitempty"`

//SASL user name
SaslUsername string `yaml:"sasl_username,omitempty"`

//SASL user password
SaslPassword string `yaml:"sasl_password,omitempty"`

//The SASL SCRAM SHA algorithm sha256 or sha512 as mechanism
SaslMechanism string `yaml:"sasl_mechanism,omitempty"`

//Connect using TLS
UseTLS bool `yaml:"use_tls,omitempty"`

//The optional certificate authority file for TLS client authentication
TlsCAFile string `yaml:"tls_cafile,omitempty"`

//The optional certificate file for TLS client authentication
TlsCertFile string `yaml:"tls_certfile,omitempty"`

//Password for the certificate file for TLS client authentication
TlsCertFilePassword string `yaml:"tls_certfile_password,omitempty"`

//The optional key file for TLS client authentication
TlsKeyFile string `yaml:"tls_keyfile,omitempty"`

//Password for the key file for TLS client authentication
TlsKeyFilePassword string `yaml:"tls_keyfile_password,omitempty"`

//If true, the server's certificate will not be checked for validity. This will make your HTTPS connections insecure
TlsInsecureSkipTLSVerify bool `yaml:"tls_insecure_skip_tlsverify,omitempty"`

gaantunes marked this conversation as resolved.
Show resolved Hide resolved
//Kafka broker version
KafkaVersion string `yaml:"kafka_version,omitempty"`

//if you need to use a group from zookeeper
UseZooKeeperLag bool `yaml:"use_zookeeper_lag,omitempty"`

//Address array (hosts) of zookeeper server.
UriZookeeper []string `yaml:"zookeeper_uris,omitempty"`

//Kafka cluster name
Labels string `yaml:"kafka_cluster_name,omitempty"`

//Metadata refresh interval
MetadataRefreshInterval string `yaml:"metadata_refresh_interval,omitempty"`

//If true, all scrapes will trigger kafka operations otherwise, they will share results. WARN: This should be disabled on large clusters
AllowConcurrent bool `yaml:"allow_concurrency,omitempty"`

//Maximum number of offsets to store in the interpolation table for a partition
MaxOffsets int `yaml:"max_offsets,omitempty"`

//How frequently should the interpolation table be pruned, in seconds
PruneIntervalSeconds int `yaml:"prune_interval_seconds,omitempty"`

//Regex filter for topics to be monitored
TopicsFilter string `yaml:"topics_filter_regex,omitempty"`

//Regex filter for consumer groups to be monitored
GroupFilter string `yaml:"groups_filter_regex,omitempty"`
gaantunes marked this conversation as resolved.
Show resolved Hide resolved
}

// UnmarshalYAML implements yaml.Unmarshaler for Config
func (c *Config) UnmarshalYAML(unmarshal func(interface{}) error) error {
*c = DefaultConfig

type plain Config
return unmarshal((*plain)(c))
}

// Name returns the name of the integration that this config represents.
func (c *Config) Name() string {
return "kafka_lag_exporter"
}

// CommonConfig returns the common settings shared across all configs for
// integrations.
func (c *Config) CommonConfig() config.Common {
return c.Common
}

// NewIntegration creates a new elasticsearch_exporter
func (c *Config) NewIntegration(logger log.Logger) (integrations.Integration, error) {
return New(logger, c)
}

func init() {
integrations.RegisterIntegration(&Config{})
}

// New creates a new kafka_lag_exporter integration.
func New(logger log.Logger, c *Config) (integrations.Integration, error) {

gaantunes marked this conversation as resolved.
Show resolved Hide resolved
var options kafka_exporter.Options

options.Uri = c.KafkaUri
options.UseSASL = c.UseSASL
options.UseSASLHandshake = c.UseSASLHandshake
options.SaslUsername = c.SaslUsername
options.SaslPassword = c.SaslPassword
options.SaslMechanism = c.SaslMechanism
options.UseTLS = c.UseTLS
options.TlsCAFile = c.TlsCAFile
options.TlsCertFile = c.TlsCertFile
//options.TlsCertFilePassword = c.TlsCertFilePassword
options.TlsKeyFile = c.TlsKeyFile
//options.TlsKeyFilePassword = c.TlsKeyFilePassword
gaantunes marked this conversation as resolved.
Show resolved Hide resolved
options.TlsInsecureSkipTLSVerify = c.TlsInsecureSkipTLSVerify
options.KafkaVersion = c.KafkaVersion
options.UseZooKeeperLag = c.UseZooKeeperLag
options.UriZookeeper = c.UriZookeeper
options.Labels = c.Labels
options.MetadataRefreshInterval = c.MetadataRefreshInterval
options.AllowConcurrent = c.AllowConcurrent
options.MaxOffsets = c.MaxOffsets
options.PruneIntervalSeconds = c.PruneIntervalSeconds

newExporter, err := kafka_exporter.New(logger, options, c.TopicsFilter, c.GroupFilter)
if err != nil {
return nil, fmt.Errorf("could not instanciate kafka lag exporter: %w", err)
}

return integrations.NewCollectorIntegration(
c.Name(),
integrations.WithCollectors(
newExporter,
),
), nil

}