Skip to content
This repository has been archived by the owner on Mar 17, 2024. It is now read-only.

Commit

Permalink
Adding support to control which prometheus metrics to expose (#62)
Browse files Browse the repository at this point in the history
* Adding support to control which prometheus metrics to expose

* Clean up remove metric
  • Loading branch information
khorshuheng authored and seglo committed Sep 16, 2019
1 parent c509d19 commit 46648d3
Show file tree
Hide file tree
Showing 8 changed files with 185 additions and 40 deletions.
19 changes: 10 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -191,15 +191,16 @@ defaults defined in the project itself. See [`reference.conf`](./src/main/resou

General Configuration (`kafka-lag-exporter{}`)

| Key | Default | Description |
|------------------------|--------------------|---------------------------------------------------------------------------------------------------------------------------------------|
| `port` | `8000` | The port to run the Prometheus endpoint on |
| `poll-interval` | `30 seconds` | How often to poll Kafka for latest and group offsets |
| `lookup-table-size` | `60` | The maximum window size of the look up table **per partition** |
| `client-group-id` | `kafkalagexporter` | Consumer group id of kafka-lag-exporter's client connections |
| `kafka-client-timeout` | `10 seconds` | Connection timeout when making API calls to Kafka |
| `clusters` | `[]` | A statically defined list of Kafka connection details. This list is optional if you choose to use the Strimzi auto-discovery feature |
| `watchers` | `{}` | Settings for Kafka cluster "watchers" used for auto-discovery. |
| Key | Default | Description |
|-------------------------------|--------------------|---------------------------------------------------------------------------------------------------------------------------------------|
| `port` | `8000` | The port to run the Prometheus endpoint on |
| `poll-interval` | `30 seconds` | How often to poll Kafka for latest and group offsets |
| `lookup-table-size` | `60` | The maximum window size of the look up table **per partition** |
| `client-group-id` | `kafkalagexporter` | Consumer group id of kafka-lag-exporter's client connections |
| `kafka-client-timeout` | `10 seconds` | Connection timeout when making API calls to Kafka |
| `clusters` | `[]` | A statically defined list of Kafka connection details. This list is optional if you choose to use the Strimzi auto-discovery feature |
| `watchers` | `{}` | Settings for Kafka cluster "watchers" used for auto-discovery. |
| `metric-whitelist` | `[".*"]` | Regex of metrics to be exposed via Prometheus endpoint. Eg. `[".*_max_lag.*", "kafka_partition_latest_offset"]` |

Kafka Cluster Connection Details (`kafka-lag-exporter.clusters[]`)

Expand Down
7 changes: 7 additions & 0 deletions charts/kafka-lag-exporter/templates/030-ConfigMap.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,13 @@ data:
watchers = {
strimzi = "{{ .Values.watchers.strimzi }}"
}
metric-whitelist = [
{{- $lastIndex := sub (len .Values.metricWhitelist) 1}}
{{- range $i, $whitelist := .Values.metricWhitelist }}
{{ quote $whitelist }}{{- if ne $i $lastIndex -}}, {{ end }}
{{- end }}
]
}
akka {
Expand Down
13 changes: 13 additions & 0 deletions charts/kafka-lag-exporter/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,19 @@ watchers:
## The Strimzi Cluster Watcher automatically watches for `kafka.strimzi.io` group, `Kafka` kind resources and will
## configure the Kafka Lag Exporter appropriately.
strimzi: false
## You can use regex to control the metrics exposed by Prometheus endpoint.
## Any metric that matches one of the regex in the whitelist will be exposed.
## For example, if you only wish to expose the max lag metrics, use either:
## metricWhitelist:
## - ^kafka_consumergroup_group_max_lag.*
##
## Or
##
## metricWhitelist:
## - kafka_consumergroup_group_max_lag
## - kafka_consumergroup_group_max_lag_seconds
metricWhitelist:
- .*

## The log level of the ROOT logger
rootLogLevel: INFO
Expand Down
1 change: 1 addition & 0 deletions src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ kafka-lag-exporter {
strimzi = "false"
strimzi = ${?KAFKA_LAG_EXPORTER_STRIMZI}
}
metric-whitelist = [".*"]
}

akka {
Expand Down
15 changes: 9 additions & 6 deletions src/main/scala/com/lightbend/kafkalagexporter/AppConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package com.lightbend.kafkalagexporter

import java.util

import com.lightbend.kafkalagexporter.PrometheusEndpointSink.ClusterGlobalLabels
import com.typesafe.config.{Config, ConfigObject}

import scala.annotation.tailrec
Expand Down Expand Up @@ -51,7 +52,8 @@ object AppConfig {
)
}
val strimziWatcher = c.getString("watchers.strimzi").toBoolean
AppConfig(pollInterval, lookupTableSize, port, clientGroupId, kafkaClientTimeout, clusters, strimziWatcher)
val metricWhitelist = c.getStringList("metric-whitelist").asScala.toList
AppConfig(pollInterval, lookupTableSize, port, clientGroupId, kafkaClientTimeout, clusters, strimziWatcher, metricWhitelist)
}

// Copied from Alpakka Kafka
Expand Down Expand Up @@ -93,7 +95,7 @@ final case class KafkaCluster(name: String, bootstrapBrokers: String,
}
}
final case class AppConfig(pollInterval: FiniteDuration, lookupTableSize: Int, port: Int, clientGroupId: String,
clientTimeout: FiniteDuration, clusters: List[KafkaCluster], strimziWatcher: Boolean) {
clientTimeout: FiniteDuration, clusters: List[KafkaCluster], strimziWatcher: Boolean, metricWhitelist: List[String]) {
override def toString(): String = {
val clusterString =
if (clusters.isEmpty)
Expand All @@ -103,6 +105,7 @@ final case class AppConfig(pollInterval: FiniteDuration, lookupTableSize: Int, p
|Poll interval: $pollInterval
|Lookup table size: $lookupTableSize
|Prometheus metrics endpoint port: $port
|Prometheus metrics whitelist: [${metricWhitelist.mkString(", ")}]
|Admin client consumer group id: $clientGroupId
|Kafka client timeout: $clientTimeout
|Statically defined Clusters:
Expand All @@ -112,10 +115,10 @@ final case class AppConfig(pollInterval: FiniteDuration, lookupTableSize: Int, p
""".stripMargin
}

def globalLabelsForCluster(clusterName: String): Map[String, String] = {
clusters.find(_.name == clusterName).map {
cluster => cluster.labels
}.getOrElse(Map.empty[String, String])
def clustersGlobalLabels(): ClusterGlobalLabels = {
clusters.map { cluster =>
cluster.name -> cluster.labels
}.toMap
}
}

6 changes: 5 additions & 1 deletion src/main/scala/com/lightbend/kafkalagexporter/MainApp.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import java.util.concurrent.Executors

import akka.actor.typed.ActorSystem
import com.typesafe.config.{Config, ConfigFactory}
import io.prometheus.client.CollectorRegistry
import io.prometheus.client.exporter.HTTPServer

import scala.concurrent.duration._
import scala.concurrent.{Await, ExecutionContext}
Expand All @@ -29,7 +31,9 @@ object MainApp extends App {

val clientCreator = (cluster: KafkaCluster) =>
KafkaClient(cluster, appConfig.clientGroupId, appConfig.clientTimeout)(kafkaClientEc)
val endpointCreator = () => PrometheusEndpointSink(appConfig, Metrics.definitions)
val server = new HTTPServer(appConfig.port)
val endpointCreator = () => PrometheusEndpointSink(Metrics.definitions, appConfig.metricWhitelist,
appConfig.clustersGlobalLabels(), server, CollectorRegistry.defaultRegistry)

ActorSystem(
KafkaClusterManager.init(appConfig, endpointCreator, clientCreator), "kafka-lag-exporter")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,48 +5,58 @@
package com.lightbend.kafkalagexporter

import com.lightbend.kafkalagexporter.MetricsSink._
import com.lightbend.kafkalagexporter.PrometheusEndpointSink.ClusterGlobalLabels
import io.prometheus.client.exporter.HTTPServer
import io.prometheus.client.hotspot.DefaultExports
import io.prometheus.client.{CollectorRegistry, Gauge}

import scala.util.Try

object PrometheusEndpointSink {
def apply(appConfig: AppConfig, definitions: MetricDefinitions): MetricsSink =
Try(new PrometheusEndpointSink(appConfig, definitions))
type ClusterName = String
type GlobalLabels = Map[String, String]
type ClusterGlobalLabels = Map[ClusterName, GlobalLabels]

def apply(definitions: MetricDefinitions, metricWhitelist: List[String], clusterGlobalLabels: ClusterGlobalLabels,
server: HTTPServer, registry: CollectorRegistry): MetricsSink = {
Try(new PrometheusEndpointSink(definitions, metricWhitelist, clusterGlobalLabels, server, registry))
.fold(t => throw new Exception("Could not create Prometheus Endpoint", t), sink => sink)
}
}

class PrometheusEndpointSink private(appConfig: AppConfig, definitions: MetricDefinitions) extends MetricsSink {

private val server = new HTTPServer(appConfig.port)
private val registry = CollectorRegistry.defaultRegistry

class PrometheusEndpointSink private(definitions: MetricDefinitions, metricWhitelist: List[String], clusterGlobalLabels: ClusterGlobalLabels,
server: HTTPServer, registry: CollectorRegistry) extends MetricsSink {
DefaultExports.initialize()

private val metrics: Map[String, Map[GaugeDefinition, Gauge]] = {
appConfig.clusters.map { cluster =>
val globalLabelNamesForCluster = appConfig.globalLabelsForCluster(cluster.name).keys.toSeq
cluster.name -> definitions.map(definition =>
definition -> Gauge.build()
.name(definition.name)
.help(definition.help)
.labelNames(globalLabelNamesForCluster ++ definition.labels: _*)
private val metrics: Map[PrometheusEndpointSink.ClusterName, Map[GaugeDefinition, Gauge]] = clusterGlobalLabels.map {
case (clusterName, globalLabels) =>
clusterName -> definitions.filter(d => metricWhitelist.exists(d.name.matches)).map { d =>
d -> Gauge.build()
.name(d.name)
.help(d.help)
.labelNames(globalLabels.keys.toSeq ++ d.labels: _*)
.register(registry)
).toMap
}.toMap
}.toMap
}

override def report(m: MetricValue): Unit = {
val metric = getMetricsForClusterName(m.definition, m.clusterName)
val globalLabelValuesForCluster = appConfig.globalLabelsForCluster(m.clusterName).values.toSeq
metric.labels(globalLabelValuesForCluster ++ m.labels: _*).set(m.value)
if(metricWhitelist.exists(m.definition.name.matches)) {
val metric = getMetricsForClusterName(m.definition, m.clusterName)
val globalLabelValuesForCluster = clusterGlobalLabels.getOrElse(m.clusterName, Map.empty)
metric.labels(globalLabelValuesForCluster.values.toSeq ++ m.labels: _*).set(m.value)
}
}


override def remove(m: RemoveMetric): Unit = {
metrics.foreach { case (_, gaugeDefinitionsForCluster) =>
gaugeDefinitionsForCluster.get(m.definition).foreach(_.remove(m.labels: _*))
if(metricWhitelist.exists(m.definition.name.matches)) {
for(
clusterMetrics <- metrics.get(m.clusterName);
globalLabels <- clusterGlobalLabels.get(m.clusterName);
gauge <- clusterMetrics.get(m.definition)
) {
val metricLabels = globalLabels.values.toList ++ m.labels
gauge.remove(metricLabels: _*)
}
}
}

Expand All @@ -60,7 +70,7 @@ class PrometheusEndpointSink private(appConfig: AppConfig, definitions: MetricDe
}

private def getMetricsForClusterName(gaugeDefinition: GaugeDefinition, clusterName: String): Gauge = {
val metricsForCluster = metrics.getOrElse(clusterName, throw new IllegalArgumentException(s"No metric for the ${clusterName} registered"))
val metricsForCluster = metrics.getOrElse(clusterName, throw new IllegalArgumentException(s"No metric for the $clusterName registered"))
metricsForCluster.getOrElse(gaugeDefinition, throw new IllegalArgumentException(s"No metric with definition ${gaugeDefinition.name} registered"))
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
/*
* Copyright (C) 2019 Lightbend Inc. <http://www.lightbend.com>
*/

package com.lightbend.kafkalagexporter

import java.net.ServerSocket

import io.prometheus.client.CollectorRegistry
import io.prometheus.client.exporter.HTTPServer
import org.scalatest._

import scala.collection.JavaConverters._
import scala.util.{Failure, Success, Try}

class PrometheusEndpointSinkTest extends fixture.FreeSpec with Matchers {

case class Fixture(server: HTTPServer, registry: CollectorRegistry)
type FixtureParam = Fixture

override def withFixture(test: OneArgTest): Outcome = {
val httpServer =
Try(new ServerSocket(0)) match {
case Success(socket) =>
val freePort = socket.getLocalPort
socket.close()
new HTTPServer(freePort)
case Failure(exception) => throw exception
}
val registry = CollectorRegistry.defaultRegistry
try test(Fixture(httpServer, registry))
finally {
registry.clear()
httpServer.stop()
}
}

"PrometheusEndpointSinkImpl should" - {

"register only metrics which match the regex" in { fixture =>
PrometheusEndpointSink(Metrics.definitions, List(".*max_lag.*"), Map("cluster" -> Map.empty), fixture.server, fixture.registry)
val metricSamples = fixture.registry.metricFamilySamples().asScala.toSet

metricSamples.map(_.name).intersect(Metrics.definitions.map(_.name).toSet) should contain theSameElementsAs
Set("kafka_consumergroup_group_max_lag", "kafka_consumergroup_group_max_lag_seconds")
}

"append global labels to metric labels" in { fixture =>
val groupLabel = Map(
"cluster" -> Map(
"environment" ->"dev",
"org" -> "organization",
)
)
val sink = PrometheusEndpointSink(Metrics.definitions, List(".*"), groupLabel, fixture.server, fixture.registry)
sink.report(Metrics.GroupValueMessage(Metrics.MaxGroupTimeLagMetric, "cluster", "group", 1))

val metricSamples = fixture.registry.metricFamilySamples().asScala.toList
val maxGroupTimeLagMetricSamples = metricSamples.filter(_.name.equals(Metrics.MaxGroupTimeLagMetric.name)).flatMap(_.samples.asScala)

maxGroupTimeLagMetricSamples should have length 1
val labels = maxGroupTimeLagMetricSamples.flatMap(_.labelNames.asScala)
val labelValues = maxGroupTimeLagMetricSamples.flatMap(_.labelValues.asScala)
(labels zip labelValues).toMap should contain theSameElementsAs
Map(
"environment" ->"dev",
"org" -> "organization",
"cluster_name" -> "cluster",
"group" -> "group",
)

sink.remove(Metrics.GroupRemoveMetricMessage(Metrics.MaxGroupTimeLagMetric, "cluster", "group"))

val metricSamplesAfterRemoval = fixture.registry.metricFamilySamples().asScala.toList
val maxGroupTimeLagMetricSamplesAfterRemoval = metricSamplesAfterRemoval.filter(_.name.equals(Metrics.MaxGroupTimeLagMetric.name)).flatMap(_.samples.asScala)


maxGroupTimeLagMetricSamplesAfterRemoval should have length 0
}

"report only metrics which match the regex" in { fixture =>
val sink = PrometheusEndpointSink(Metrics.definitions, List("kafka_consumergroup_group_max_lag"), Map("cluster" -> Map.empty),
fixture.server, fixture.registry)
sink.report(Metrics.GroupValueMessage(Metrics.MaxGroupOffsetLagMetric, "cluster", "group", 100))
sink.report(Metrics.GroupValueMessage(Metrics.MaxGroupTimeLagMetric, "cluster", "group", 1))
val labels = Array[String]("cluster_name", "group")
val labelVals = Array[String]("cluster", "group")
fixture.registry.getSampleValue("kafka_consumergroup_group_max_lag", labels, labelVals) should be (100)
val metricSamples = fixture.registry.metricFamilySamples().asScala.toSet
metricSamples.map(_.name) should not contain "kafka_consumergroup_group_max_lag_seconds"
}

"remove only metrics which match the regex" in { fixture =>
val sink = PrometheusEndpointSink(Metrics.definitions, List("kafka_consumergroup_group_max_lag"), Map("cluster" -> Map.empty),
fixture.server, fixture.registry)
sink.report(Metrics.GroupValueMessage(Metrics.MaxGroupOffsetLagMetric, "cluster", "group", 100))
sink.remove(Metrics.GroupRemoveMetricMessage(Metrics.MaxGroupOffsetLagMetric, "cluster", "group"))
sink.remove(Metrics.GroupRemoveMetricMessage(Metrics.MaxGroupTimeLagMetric, "cluster", "group"))
val labels = Array[String]("cluster_name", "group")
val labelVals = Array[String]("cluster", "group")
fixture.registry.getSampleValue("kafka_consumergroup_group_max_lag", labels, labelVals) should be (null)
}

}

}

0 comments on commit 46648d3

Please sign in to comment.