Skip to content
This repository has been archived by the owner on Mar 17, 2024. It is now read-only.

Adding support to control which prometheus metrics to expose #62

Merged
merged 2 commits into from
Sep 16, 2019
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 10 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -191,15 +191,16 @@ defaults defined in the project itself. See [`reference.conf`](./src/main/resou

General Configuration (`kafka-lag-exporter{}`)

| Key | Default | Description |
|------------------------|--------------------|---------------------------------------------------------------------------------------------------------------------------------------|
| `port` | `8000` | The port to run the Prometheus endpoint on |
| `poll-interval` | `30 seconds` | How often to poll Kafka for latest and group offsets |
| `lookup-table-size` | `60` | The maximum window size of the look up table **per partition** |
| `client-group-id` | `kafkalagexporter` | Consumer group id of kafka-lag-exporter's client connections |
| `kafka-client-timeout` | `10 seconds` | Connection timeout when making API calls to Kafka |
| `clusters` | `[]` | A statically defined list of Kafka connection details. This list is optional if you choose to use the Strimzi auto-discovery feature |
| `watchers` | `{}` | Settings for Kafka cluster "watchers" used for auto-discovery. |
| Key | Default | Description |
|-------------------------------|--------------------|---------------------------------------------------------------------------------------------------------------------------------------|
| `port` | `8000` | The port to run the Prometheus endpoint on |
| `poll-interval` | `30 seconds` | How often to poll Kafka for latest and group offsets |
| `lookup-table-size` | `60` | The maximum window size of the look up table **per partition** |
| `client-group-id` | `kafkalagexporter` | Consumer group id of kafka-lag-exporter's client connections |
| `kafka-client-timeout` | `10 seconds` | Connection timeout when making API calls to Kafka |
| `clusters` | `[]` | A statically defined list of Kafka connection details. This list is optional if you choose to use the Strimzi auto-discovery feature |
| `watchers` | `{}` | Settings for Kafka cluster "watchers" used for auto-discovery. |
| `metric-whitelist` | `[".*"]` | Regex of metrics to be exposed via Prometheus endpoint. Eg. `[".*_max_lag.*", "kafka_partition_latest_offset"]` |

Kafka Cluster Connection Details (`kafka-lag-exporter.clusters[]`)

Expand Down
7 changes: 7 additions & 0 deletions charts/kafka-lag-exporter/templates/030-ConfigMap.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,13 @@ data:
watchers = {
strimzi = "{{ .Values.watchers.strimzi }}"
}
metric-whitelist = [
{{- $lastIndex := sub (len .Values.metricWhitelist) 1}}
{{- range $i, $whitelist := .Values.metricWhitelist }}
{{ quote $whitelist }}{{- if ne $i $lastIndex -}}, {{ end }}
{{- end }}
]

}

akka {
Expand Down
13 changes: 13 additions & 0 deletions charts/kafka-lag-exporter/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,19 @@ watchers:
## The Strimzi Cluster Watcher automatically watches for `kafka.strimzi.io` group, `Kafka` kind resources and will
## configure the Kafka Lag Exporter appropriately.
strimzi: false
## You can use regex to control the metrics exposed by Prometheus endpoint.
## Any metric that matches one of the regex in the whitelist will be exposed.
## For example, if you only wish to expose the max lag metrics, use either:
## metricWhitelist:
## - ^kafka_consumergroup_group_max_lag.*
##
## Or
##
## metricWhitelist:
## - kafka_consumergroup_group_max_lag
## - kafka_consumergroup_group_max_lag_seconds
metricWhitelist:
- .*

## The log level of the ROOT logger
rootLogLevel: INFO
Expand Down
1 change: 1 addition & 0 deletions src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ kafka-lag-exporter {
strimzi = "false"
strimzi = ${?KAFKA_LAG_EXPORTER_STRIMZI}
}
metric-whitelist = [".*"]
}

akka {
Expand Down
15 changes: 9 additions & 6 deletions src/main/scala/com/lightbend/kafkalagexporter/AppConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package com.lightbend.kafkalagexporter

import java.util

import com.lightbend.kafkalagexporter.PrometheusEndpointSink.ClusterGlobalLabels
import com.typesafe.config.{Config, ConfigObject}

import scala.annotation.tailrec
Expand Down Expand Up @@ -51,7 +52,8 @@ object AppConfig {
)
}
val strimziWatcher = c.getString("watchers.strimzi").toBoolean
AppConfig(pollInterval, lookupTableSize, port, clientGroupId, kafkaClientTimeout, clusters, strimziWatcher)
val metricWhitelist = c.getStringList("metric-whitelist").asScala.toList
AppConfig(pollInterval, lookupTableSize, port, clientGroupId, kafkaClientTimeout, clusters, strimziWatcher, metricWhitelist)
}

// Copied from Alpakka Kafka
Expand Down Expand Up @@ -93,7 +95,7 @@ final case class KafkaCluster(name: String, bootstrapBrokers: String,
}
}
final case class AppConfig(pollInterval: FiniteDuration, lookupTableSize: Int, port: Int, clientGroupId: String,
clientTimeout: FiniteDuration, clusters: List[KafkaCluster], strimziWatcher: Boolean) {
clientTimeout: FiniteDuration, clusters: List[KafkaCluster], strimziWatcher: Boolean, metricWhitelist: List[String]) {
override def toString(): String = {
val clusterString =
if (clusters.isEmpty)
Expand All @@ -103,6 +105,7 @@ final case class AppConfig(pollInterval: FiniteDuration, lookupTableSize: Int, p
|Poll interval: $pollInterval
|Lookup table size: $lookupTableSize
|Prometheus metrics endpoint port: $port
|Prometheus metrics whitelist: [${metricWhitelist.mkString(", ")}]
|Admin client consumer group id: $clientGroupId
|Kafka client timeout: $clientTimeout
|Statically defined Clusters:
Expand All @@ -112,10 +115,10 @@ final case class AppConfig(pollInterval: FiniteDuration, lookupTableSize: Int, p
""".stripMargin
}

def globalLabelsForCluster(clusterName: String): Map[String, String] = {
clusters.find(_.name == clusterName).map {
cluster => cluster.labels
}.getOrElse(Map.empty[String, String])
def clustersGlobalLabels(): ClusterGlobalLabels = {
clusters.map { cluster =>
cluster.name -> cluster.labels
}.toMap
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice cleanup.

}
}

6 changes: 5 additions & 1 deletion src/main/scala/com/lightbend/kafkalagexporter/MainApp.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import java.util.concurrent.Executors

import akka.actor.typed.ActorSystem
import com.typesafe.config.{Config, ConfigFactory}
import io.prometheus.client.CollectorRegistry
import io.prometheus.client.exporter.HTTPServer

import scala.concurrent.duration._
import scala.concurrent.{Await, ExecutionContext}
Expand All @@ -29,7 +31,9 @@ object MainApp extends App {

val clientCreator = (cluster: KafkaCluster) =>
KafkaClient(cluster, appConfig.clientGroupId, appConfig.clientTimeout)(kafkaClientEc)
val endpointCreator = () => PrometheusEndpointSink(appConfig, Metrics.definitions)
val server = new HTTPServer(appConfig.port)
val endpointCreator = () => PrometheusEndpointSink(Metrics.definitions, appConfig.metricWhitelist,
appConfig.clustersGlobalLabels(), server, CollectorRegistry.defaultRegistry)

ActorSystem(
KafkaClusterManager.init(appConfig, endpointCreator, clientCreator), "kafka-lag-exporter")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,48 +5,54 @@
package com.lightbend.kafkalagexporter

import com.lightbend.kafkalagexporter.MetricsSink._
import com.lightbend.kafkalagexporter.PrometheusEndpointSink.ClusterGlobalLabels
import io.prometheus.client.exporter.HTTPServer
import io.prometheus.client.hotspot.DefaultExports
import io.prometheus.client.{CollectorRegistry, Gauge}

import scala.util.Try

object PrometheusEndpointSink {
def apply(appConfig: AppConfig, definitions: MetricDefinitions): MetricsSink =
Try(new PrometheusEndpointSink(appConfig, definitions))
type ClusterName = String
type GlobalLabels = Map[String, String]
type ClusterGlobalLabels = Map[ClusterName, GlobalLabels]

def apply(definitions: MetricDefinitions, metricWhitelist: List[String], clusterGlobalLabels: ClusterGlobalLabels,
server: HTTPServer, registry: CollectorRegistry): MetricsSink = {
Try(new PrometheusEndpointSink(definitions, metricWhitelist, clusterGlobalLabels, server, registry))
.fold(t => throw new Exception("Could not create Prometheus Endpoint", t), sink => sink)
}
}

class PrometheusEndpointSink private(appConfig: AppConfig, definitions: MetricDefinitions) extends MetricsSink {

private val server = new HTTPServer(appConfig.port)
private val registry = CollectorRegistry.defaultRegistry

class PrometheusEndpointSink private(definitions: MetricDefinitions, metricWhitelist: List[String], clusterGlobalLabels: ClusterGlobalLabels,
server: HTTPServer, registry: CollectorRegistry) extends MetricsSink {
DefaultExports.initialize()

private val metrics: Map[String, Map[GaugeDefinition, Gauge]] = {
appConfig.clusters.map { cluster =>
val globalLabelNamesForCluster = appConfig.globalLabelsForCluster(cluster.name).keys.toSeq
cluster.name -> definitions.map(definition =>
definition -> Gauge.build()
.name(definition.name)
.help(definition.help)
.labelNames(globalLabelNamesForCluster ++ definition.labels: _*)
private val metrics: Map[PrometheusEndpointSink.ClusterName, Map[GaugeDefinition, Gauge]] = clusterGlobalLabels.map {
case (clusterName, globalLabels) =>
clusterName -> definitions.filter(d => metricWhitelist.exists(d.name.matches)).map { d =>
d -> Gauge.build()
.name(d.name)
.help(d.help)
.labelNames(globalLabels.keys.toSeq ++ d.labels: _*)
.register(registry)
).toMap
}.toMap
}.toMap
}

override def report(m: MetricValue): Unit = {
val metric = getMetricsForClusterName(m.definition, m.clusterName)
val globalLabelValuesForCluster = appConfig.globalLabelsForCluster(m.clusterName).values.toSeq
metric.labels(globalLabelValuesForCluster ++ m.labels: _*).set(m.value)
if(metricWhitelist.exists(m.definition.name.matches)) {
val metric = getMetricsForClusterName(m.definition, m.clusterName)
val globalLabelValuesForCluster = clusterGlobalLabels.getOrElse(m.clusterName, Map.empty)
metric.labels(globalLabelValuesForCluster.values.toSeq ++ m.labels: _*).set(m.value)
}
}


override def remove(m: RemoveMetric): Unit = {
metrics.foreach { case (_, gaugeDefinitionsForCluster) =>
gaugeDefinitionsForCluster.get(m.definition).foreach(_.remove(m.labels: _*))
if(metricWhitelist.exists(m.definition.name.matches)) {
metrics.foreach { case (_, gaugeDefinitionsForCluster) =>
val globalLabelValuesForCluster = clusterGlobalLabels.getOrElse(m.clusterName, Map.empty)
gaugeDefinitionsForCluster.get(m.definition).foreach(_.remove(globalLabelValuesForCluster.values.toSeq ++ m.labels: _*))
}
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for fixing this.

[minor] This could be made more clear with a for expression.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have substituted foreach with for expression. Let me know if the changes is what you have in mind.

}
}

Expand All @@ -60,7 +66,7 @@ class PrometheusEndpointSink private(appConfig: AppConfig, definitions: MetricDe
}

private def getMetricsForClusterName(gaugeDefinition: GaugeDefinition, clusterName: String): Gauge = {
val metricsForCluster = metrics.getOrElse(clusterName, throw new IllegalArgumentException(s"No metric for the ${clusterName} registered"))
val metricsForCluster = metrics.getOrElse(clusterName, throw new IllegalArgumentException(s"No metric for the $clusterName registered"))
metricsForCluster.getOrElse(gaugeDefinition, throw new IllegalArgumentException(s"No metric with definition ${gaugeDefinition.name} registered"))
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
/*
* Copyright (C) 2019 Lightbend Inc. <http://www.lightbend.com>
*/

package com.lightbend.kafkalagexporter

import java.net.ServerSocket

import io.prometheus.client.CollectorRegistry
import io.prometheus.client.exporter.HTTPServer
import org.scalatest._

import scala.collection.JavaConverters._
import scala.util.{Failure, Success, Try}

class PrometheusEndpointSinkTest extends fixture.FreeSpec with Matchers {

case class Fixture(server: HTTPServer, registry: CollectorRegistry)
type FixtureParam = Fixture

override def withFixture(test: OneArgTest): Outcome = {
val httpServer =
Try(new ServerSocket(0)) match {
case Success(socket) =>
val freePort = socket.getLocalPort
socket.close()
new HTTPServer(freePort)
case Failure(exception) => throw exception
}
val registry = CollectorRegistry.defaultRegistry
try test(Fixture(httpServer, registry))
finally {
registry.clear()
httpServer.stop()
}
}

"PrometheusEndpointSinkImpl should" - {

"register only metrics which match the regex" in { fixture =>
PrometheusEndpointSink(Metrics.definitions, List(".*max_lag.*"), Map("cluster" -> Map.empty), fixture.server, fixture.registry)
val metricSamples = fixture.registry.metricFamilySamples().asScala.toSet

metricSamples.map(_.name).intersect(Metrics.definitions.map(_.name).toSet) should contain theSameElementsAs
Set("kafka_consumergroup_group_max_lag", "kafka_consumergroup_group_max_lag_seconds")
}

"append global labels to metric labels" in { fixture =>
val groupLabel = Map(
"cluster" -> Map(
"environment" ->"dev",
"org" -> "organization",
)
)
val sink = PrometheusEndpointSink(Metrics.definitions, List(".*"), groupLabel, fixture.server, fixture.registry)
sink.report(Metrics.GroupValueMessage(Metrics.MaxGroupTimeLagMetric, "cluster", "group", 1))

val metricSamples = fixture.registry.metricFamilySamples().asScala.toList
val maxGroupTimeLagMetricSamples = metricSamples.filter(_.name.equals(Metrics.MaxGroupTimeLagMetric.name)).flatMap(_.samples.asScala)

maxGroupTimeLagMetricSamples should have length 1
val labels = maxGroupTimeLagMetricSamples.flatMap(_.labelNames.asScala)
val labelValues = maxGroupTimeLagMetricSamples.flatMap(_.labelValues.asScala)
(labels zip labelValues).toMap should contain theSameElementsAs
Map(
"environment" ->"dev",
"org" -> "organization",
"cluster_name" -> "cluster",
"group" -> "group",
)

sink.remove(Metrics.GroupRemoveMetricMessage(Metrics.MaxGroupTimeLagMetric, "cluster", "group"))

val metricSamplesAfterRemoval = fixture.registry.metricFamilySamples().asScala.toList
val maxGroupTimeLagMetricSamplesAfterRemoval = metricSamplesAfterRemoval.filter(_.name.equals(Metrics.MaxGroupTimeLagMetric.name)).flatMap(_.samples.asScala)


maxGroupTimeLagMetricSamplesAfterRemoval should have length 0
}

"report only metrics which match the regex" in { fixture =>
val sink = PrometheusEndpointSink(Metrics.definitions, List("kafka_consumergroup_group_max_lag"), Map("cluster" -> Map.empty),
fixture.server, fixture.registry)
sink.report(Metrics.GroupValueMessage(Metrics.MaxGroupOffsetLagMetric, "cluster", "group", 100))
sink.report(Metrics.GroupValueMessage(Metrics.MaxGroupTimeLagMetric, "cluster", "group", 1))
val labels = Array[String]("cluster_name", "group")
val labelVals = Array[String]("cluster", "group")
fixture.registry.getSampleValue("kafka_consumergroup_group_max_lag", labels, labelVals) should be (100)
val metricSamples = fixture.registry.metricFamilySamples().asScala.toSet
metricSamples.map(_.name) should not contain "kafka_consumergroup_group_max_lag_seconds"
}

"remove only metrics which match the regex" in { fixture =>
val sink = PrometheusEndpointSink(Metrics.definitions, List("kafka_consumergroup_group_max_lag"), Map("cluster" -> Map.empty),
fixture.server, fixture.registry)
sink.report(Metrics.GroupValueMessage(Metrics.MaxGroupOffsetLagMetric, "cluster", "group", 100))
sink.remove(Metrics.GroupRemoveMetricMessage(Metrics.MaxGroupOffsetLagMetric, "cluster", "group"))
sink.remove(Metrics.GroupRemoveMetricMessage(Metrics.MaxGroupTimeLagMetric, "cluster", "group"))
val labels = Array[String]("cluster_name", "group")
val labelVals = Array[String]("cluster", "group")
fixture.registry.getSampleValue("kafka_consumergroup_group_max_lag", labels, labelVals) should be (null)
}

}

}