From d4e349148be19b484405a9a59ec4c8864a8d3592 Mon Sep 17 00:00:00 2001 From: khorshuheng Date: Sun, 15 Sep 2019 16:01:41 +0800 Subject: [PATCH 1/2] Adding support to control which prometheus metrics to expose --- README.md | 19 ++-- .../templates/030-ConfigMap.yaml | 7 ++ charts/kafka-lag-exporter/values.yaml | 13 +++ src/main/resources/reference.conf | 1 + .../kafkalagexporter/AppConfig.scala | 15 ++- .../lightbend/kafkalagexporter/MainApp.scala | 6 +- .../PrometheusEndpointSink.scala | 54 +++++---- .../PrometheusEndpointSinkTest.scala | 106 ++++++++++++++++++ 8 files changed, 181 insertions(+), 40 deletions(-) create mode 100644 src/test/scala/com/lightbend/kafkalagexporter/PrometheusEndpointSinkTest.scala diff --git a/README.md b/README.md index 9af6deef..810063cf 100644 --- a/README.md +++ b/README.md @@ -191,15 +191,16 @@ defaults defined in the project itself. See [`reference.conf`](./src/main/resou General Configuration (`kafka-lag-exporter{}`) -| Key | Default | Description | -|------------------------|--------------------|---------------------------------------------------------------------------------------------------------------------------------------| -| `port` | `8000` | The port to run the Prometheus endpoint on | -| `poll-interval` | `30 seconds` | How often to poll Kafka for latest and group offsets | -| `lookup-table-size` | `60` | The maximum window size of the look up table **per partition** | -| `client-group-id` | `kafkalagexporter` | Consumer group id of kafka-lag-exporter's client connections | -| `kafka-client-timeout` | `10 seconds` | Connection timeout when making API calls to Kafka | -| `clusters` | `[]` | A statically defined list of Kafka connection details. This list is optional if you choose to use the Strimzi auto-discovery feature | -| `watchers` | `{}` | Settings for Kafka cluster "watchers" used for auto-discovery. | +| Key | Default | Description | +|-------------------------------|--------------------|---------------------------------------------------------------------------------------------------------------------------------------| +| `port` | `8000` | The port to run the Prometheus endpoint on | +| `poll-interval` | `30 seconds` | How often to poll Kafka for latest and group offsets | +| `lookup-table-size` | `60` | The maximum window size of the look up table **per partition** | +| `client-group-id` | `kafkalagexporter` | Consumer group id of kafka-lag-exporter's client connections | +| `kafka-client-timeout` | `10 seconds` | Connection timeout when making API calls to Kafka | +| `clusters` | `[]` | A statically defined list of Kafka connection details. This list is optional if you choose to use the Strimzi auto-discovery feature | +| `watchers` | `{}` | Settings for Kafka cluster "watchers" used for auto-discovery. | +| `metric-whitelist` | `[".*"]` | Regex of metrics to be exposed via Prometheus endpoint. Eg. `[".*_max_lag.*", "kafka_partition_latest_offset"]` | Kafka Cluster Connection Details (`kafka-lag-exporter.clusters[]`) diff --git a/charts/kafka-lag-exporter/templates/030-ConfigMap.yaml b/charts/kafka-lag-exporter/templates/030-ConfigMap.yaml index 35f49282..57983ae3 100644 --- a/charts/kafka-lag-exporter/templates/030-ConfigMap.yaml +++ b/charts/kafka-lag-exporter/templates/030-ConfigMap.yaml @@ -41,6 +41,13 @@ data: watchers = { strimzi = "{{ .Values.watchers.strimzi }}" } + metric-whitelist = [ + {{- $lastIndex := sub (len .Values.metricWhitelist) 1}} + {{- range $i, $whitelist := .Values.metricWhitelist }} + {{ quote $whitelist }}{{- if ne $i $lastIndex -}}, {{ end }} + {{- end }} + ] + } akka { diff --git a/charts/kafka-lag-exporter/values.yaml b/charts/kafka-lag-exporter/values.yaml index ad399f12..2382b7f6 100644 --- a/charts/kafka-lag-exporter/values.yaml +++ b/charts/kafka-lag-exporter/values.yaml @@ -33,6 +33,19 @@ watchers: ## The Strimzi Cluster Watcher automatically watches for `kafka.strimzi.io` group, `Kafka` kind resources and will ## configure the Kafka Lag Exporter appropriately. strimzi: false +## You can use regex to control the metrics exposed by Prometheus endpoint. +## Any metric that matches one of the regex in the whitelist will be exposed. +## For example, if you only wish to expose the max lag metrics, use either: +## metricWhitelist: +## - ^kafka_consumergroup_group_max_lag.* +## +## Or +## +## metricWhitelist: +## - kafka_consumergroup_group_max_lag +## - kafka_consumergroup_group_max_lag_seconds +metricWhitelist: + - .* ## The log level of the ROOT logger rootLogLevel: INFO diff --git a/src/main/resources/reference.conf b/src/main/resources/reference.conf index a952fd41..cd3c2a24 100644 --- a/src/main/resources/reference.conf +++ b/src/main/resources/reference.conf @@ -15,6 +15,7 @@ kafka-lag-exporter { strimzi = "false" strimzi = ${?KAFKA_LAG_EXPORTER_STRIMZI} } + metric-whitelist = [".*"] } akka { diff --git a/src/main/scala/com/lightbend/kafkalagexporter/AppConfig.scala b/src/main/scala/com/lightbend/kafkalagexporter/AppConfig.scala index 851288a0..0b0b0817 100644 --- a/src/main/scala/com/lightbend/kafkalagexporter/AppConfig.scala +++ b/src/main/scala/com/lightbend/kafkalagexporter/AppConfig.scala @@ -6,6 +6,7 @@ package com.lightbend.kafkalagexporter import java.util +import com.lightbend.kafkalagexporter.PrometheusEndpointSink.ClusterGlobalLabels import com.typesafe.config.{Config, ConfigObject} import scala.annotation.tailrec @@ -51,7 +52,8 @@ object AppConfig { ) } val strimziWatcher = c.getString("watchers.strimzi").toBoolean - AppConfig(pollInterval, lookupTableSize, port, clientGroupId, kafkaClientTimeout, clusters, strimziWatcher) + val metricWhitelist = c.getStringList("metric-whitelist").asScala.toList + AppConfig(pollInterval, lookupTableSize, port, clientGroupId, kafkaClientTimeout, clusters, strimziWatcher, metricWhitelist) } // Copied from Alpakka Kafka @@ -93,7 +95,7 @@ final case class KafkaCluster(name: String, bootstrapBrokers: String, } } final case class AppConfig(pollInterval: FiniteDuration, lookupTableSize: Int, port: Int, clientGroupId: String, - clientTimeout: FiniteDuration, clusters: List[KafkaCluster], strimziWatcher: Boolean) { + clientTimeout: FiniteDuration, clusters: List[KafkaCluster], strimziWatcher: Boolean, metricWhitelist: List[String]) { override def toString(): String = { val clusterString = if (clusters.isEmpty) @@ -103,6 +105,7 @@ final case class AppConfig(pollInterval: FiniteDuration, lookupTableSize: Int, p |Poll interval: $pollInterval |Lookup table size: $lookupTableSize |Prometheus metrics endpoint port: $port + |Prometheus metrics whitelist: [${metricWhitelist.mkString(", ")}] |Admin client consumer group id: $clientGroupId |Kafka client timeout: $clientTimeout |Statically defined Clusters: @@ -112,10 +115,10 @@ final case class AppConfig(pollInterval: FiniteDuration, lookupTableSize: Int, p """.stripMargin } - def globalLabelsForCluster(clusterName: String): Map[String, String] = { - clusters.find(_.name == clusterName).map { - cluster => cluster.labels - }.getOrElse(Map.empty[String, String]) + def clustersGlobalLabels(): ClusterGlobalLabels = { + clusters.map { cluster => + cluster.name -> cluster.labels + }.toMap } } diff --git a/src/main/scala/com/lightbend/kafkalagexporter/MainApp.scala b/src/main/scala/com/lightbend/kafkalagexporter/MainApp.scala index 39667e3d..fee3c553 100644 --- a/src/main/scala/com/lightbend/kafkalagexporter/MainApp.scala +++ b/src/main/scala/com/lightbend/kafkalagexporter/MainApp.scala @@ -8,6 +8,8 @@ import java.util.concurrent.Executors import akka.actor.typed.ActorSystem import com.typesafe.config.{Config, ConfigFactory} +import io.prometheus.client.CollectorRegistry +import io.prometheus.client.exporter.HTTPServer import scala.concurrent.duration._ import scala.concurrent.{Await, ExecutionContext} @@ -29,7 +31,9 @@ object MainApp extends App { val clientCreator = (cluster: KafkaCluster) => KafkaClient(cluster, appConfig.clientGroupId, appConfig.clientTimeout)(kafkaClientEc) - val endpointCreator = () => PrometheusEndpointSink(appConfig, Metrics.definitions) + val server = new HTTPServer(appConfig.port) + val endpointCreator = () => PrometheusEndpointSink(Metrics.definitions, appConfig.metricWhitelist, + appConfig.clustersGlobalLabels(), server, CollectorRegistry.defaultRegistry) ActorSystem( KafkaClusterManager.init(appConfig, endpointCreator, clientCreator), "kafka-lag-exporter") diff --git a/src/main/scala/com/lightbend/kafkalagexporter/PrometheusEndpointSink.scala b/src/main/scala/com/lightbend/kafkalagexporter/PrometheusEndpointSink.scala index 538d3fba..f8b1b63a 100644 --- a/src/main/scala/com/lightbend/kafkalagexporter/PrometheusEndpointSink.scala +++ b/src/main/scala/com/lightbend/kafkalagexporter/PrometheusEndpointSink.scala @@ -5,6 +5,7 @@ package com.lightbend.kafkalagexporter import com.lightbend.kafkalagexporter.MetricsSink._ +import com.lightbend.kafkalagexporter.PrometheusEndpointSink.ClusterGlobalLabels import io.prometheus.client.exporter.HTTPServer import io.prometheus.client.hotspot.DefaultExports import io.prometheus.client.{CollectorRegistry, Gauge} @@ -12,41 +13,46 @@ import io.prometheus.client.{CollectorRegistry, Gauge} import scala.util.Try object PrometheusEndpointSink { - def apply(appConfig: AppConfig, definitions: MetricDefinitions): MetricsSink = - Try(new PrometheusEndpointSink(appConfig, definitions)) + type ClusterName = String + type GlobalLabels = Map[String, String] + type ClusterGlobalLabels = Map[ClusterName, GlobalLabels] + + def apply(definitions: MetricDefinitions, metricWhitelist: List[String], clusterGlobalLabels: ClusterGlobalLabels, + server: HTTPServer, registry: CollectorRegistry): MetricsSink = { + Try(new PrometheusEndpointSink(definitions, metricWhitelist, clusterGlobalLabels, server, registry)) .fold(t => throw new Exception("Could not create Prometheus Endpoint", t), sink => sink) + } } -class PrometheusEndpointSink private(appConfig: AppConfig, definitions: MetricDefinitions) extends MetricsSink { - - private val server = new HTTPServer(appConfig.port) - private val registry = CollectorRegistry.defaultRegistry - +class PrometheusEndpointSink private(definitions: MetricDefinitions, metricWhitelist: List[String], clusterGlobalLabels: ClusterGlobalLabels, + server: HTTPServer, registry: CollectorRegistry) extends MetricsSink { DefaultExports.initialize() - private val metrics: Map[String, Map[GaugeDefinition, Gauge]] = { - appConfig.clusters.map { cluster => - val globalLabelNamesForCluster = appConfig.globalLabelsForCluster(cluster.name).keys.toSeq - cluster.name -> definitions.map(definition => - definition -> Gauge.build() - .name(definition.name) - .help(definition.help) - .labelNames(globalLabelNamesForCluster ++ definition.labels: _*) + private val metrics: Map[PrometheusEndpointSink.ClusterName, Map[GaugeDefinition, Gauge]] = clusterGlobalLabels.map { + case (clusterName, globalLabels) => + clusterName -> definitions.filter(d => metricWhitelist.exists(d.name.matches)).map { d => + d -> Gauge.build() + .name(d.name) + .help(d.help) + .labelNames(globalLabels.keys.toSeq ++ d.labels: _*) .register(registry) - ).toMap - }.toMap + }.toMap } override def report(m: MetricValue): Unit = { - val metric = getMetricsForClusterName(m.definition, m.clusterName) - val globalLabelValuesForCluster = appConfig.globalLabelsForCluster(m.clusterName).values.toSeq - metric.labels(globalLabelValuesForCluster ++ m.labels: _*).set(m.value) + if(metricWhitelist.exists(m.definition.name.matches)) { + val metric = getMetricsForClusterName(m.definition, m.clusterName) + val globalLabelValuesForCluster = clusterGlobalLabels.getOrElse(m.clusterName, Map.empty) + metric.labels(globalLabelValuesForCluster.values.toSeq ++ m.labels: _*).set(m.value) + } } - override def remove(m: RemoveMetric): Unit = { - metrics.foreach { case (_, gaugeDefinitionsForCluster) => - gaugeDefinitionsForCluster.get(m.definition).foreach(_.remove(m.labels: _*)) + if(metricWhitelist.exists(m.definition.name.matches)) { + metrics.foreach { case (_, gaugeDefinitionsForCluster) => + val globalLabelValuesForCluster = clusterGlobalLabels.getOrElse(m.clusterName, Map.empty) + gaugeDefinitionsForCluster.get(m.definition).foreach(_.remove(globalLabelValuesForCluster.values.toSeq ++ m.labels: _*)) + } } } @@ -60,7 +66,7 @@ class PrometheusEndpointSink private(appConfig: AppConfig, definitions: MetricDe } private def getMetricsForClusterName(gaugeDefinition: GaugeDefinition, clusterName: String): Gauge = { - val metricsForCluster = metrics.getOrElse(clusterName, throw new IllegalArgumentException(s"No metric for the ${clusterName} registered")) + val metricsForCluster = metrics.getOrElse(clusterName, throw new IllegalArgumentException(s"No metric for the $clusterName registered")) metricsForCluster.getOrElse(gaugeDefinition, throw new IllegalArgumentException(s"No metric with definition ${gaugeDefinition.name} registered")) } } diff --git a/src/test/scala/com/lightbend/kafkalagexporter/PrometheusEndpointSinkTest.scala b/src/test/scala/com/lightbend/kafkalagexporter/PrometheusEndpointSinkTest.scala new file mode 100644 index 00000000..c268764a --- /dev/null +++ b/src/test/scala/com/lightbend/kafkalagexporter/PrometheusEndpointSinkTest.scala @@ -0,0 +1,106 @@ +/* + * Copyright (C) 2019 Lightbend Inc. + */ + +package com.lightbend.kafkalagexporter + +import java.net.ServerSocket + +import io.prometheus.client.CollectorRegistry +import io.prometheus.client.exporter.HTTPServer +import org.scalatest._ + +import scala.collection.JavaConverters._ +import scala.util.{Failure, Success, Try} + +class PrometheusEndpointSinkTest extends fixture.FreeSpec with Matchers { + + case class Fixture(server: HTTPServer, registry: CollectorRegistry) + type FixtureParam = Fixture + + override def withFixture(test: OneArgTest): Outcome = { + val httpServer = + Try(new ServerSocket(0)) match { + case Success(socket) => + val freePort = socket.getLocalPort + socket.close() + new HTTPServer(freePort) + case Failure(exception) => throw exception + } + val registry = CollectorRegistry.defaultRegistry + try test(Fixture(httpServer, registry)) + finally { + registry.clear() + httpServer.stop() + } + } + + "PrometheusEndpointSinkImpl should" - { + + "register only metrics which match the regex" in { fixture => + PrometheusEndpointSink(Metrics.definitions, List(".*max_lag.*"), Map("cluster" -> Map.empty), fixture.server, fixture.registry) + val metricSamples = fixture.registry.metricFamilySamples().asScala.toSet + + metricSamples.map(_.name).intersect(Metrics.definitions.map(_.name).toSet) should contain theSameElementsAs + Set("kafka_consumergroup_group_max_lag", "kafka_consumergroup_group_max_lag_seconds") + } + + "append global labels to metric labels" in { fixture => + val groupLabel = Map( + "cluster" -> Map( + "environment" ->"dev", + "org" -> "organization", + ) + ) + val sink = PrometheusEndpointSink(Metrics.definitions, List(".*"), groupLabel, fixture.server, fixture.registry) + sink.report(Metrics.GroupValueMessage(Metrics.MaxGroupTimeLagMetric, "cluster", "group", 1)) + + val metricSamples = fixture.registry.metricFamilySamples().asScala.toList + val maxGroupTimeLagMetricSamples = metricSamples.filter(_.name.equals(Metrics.MaxGroupTimeLagMetric.name)).flatMap(_.samples.asScala) + + maxGroupTimeLagMetricSamples should have length 1 + val labels = maxGroupTimeLagMetricSamples.flatMap(_.labelNames.asScala) + val labelValues = maxGroupTimeLagMetricSamples.flatMap(_.labelValues.asScala) + (labels zip labelValues).toMap should contain theSameElementsAs + Map( + "environment" ->"dev", + "org" -> "organization", + "cluster_name" -> "cluster", + "group" -> "group", + ) + + sink.remove(Metrics.GroupRemoveMetricMessage(Metrics.MaxGroupTimeLagMetric, "cluster", "group")) + + val metricSamplesAfterRemoval = fixture.registry.metricFamilySamples().asScala.toList + val maxGroupTimeLagMetricSamplesAfterRemoval = metricSamplesAfterRemoval.filter(_.name.equals(Metrics.MaxGroupTimeLagMetric.name)).flatMap(_.samples.asScala) + + + maxGroupTimeLagMetricSamplesAfterRemoval should have length 0 + } + + "report only metrics which match the regex" in { fixture => + val sink = PrometheusEndpointSink(Metrics.definitions, List("kafka_consumergroup_group_max_lag"), Map("cluster" -> Map.empty), + fixture.server, fixture.registry) + sink.report(Metrics.GroupValueMessage(Metrics.MaxGroupOffsetLagMetric, "cluster", "group", 100)) + sink.report(Metrics.GroupValueMessage(Metrics.MaxGroupTimeLagMetric, "cluster", "group", 1)) + val labels = Array[String]("cluster_name", "group") + val labelVals = Array[String]("cluster", "group") + fixture.registry.getSampleValue("kafka_consumergroup_group_max_lag", labels, labelVals) should be (100) + val metricSamples = fixture.registry.metricFamilySamples().asScala.toSet + metricSamples.map(_.name) should not contain "kafka_consumergroup_group_max_lag_seconds" + } + + "remove only metrics which match the regex" in { fixture => + val sink = PrometheusEndpointSink(Metrics.definitions, List("kafka_consumergroup_group_max_lag"), Map("cluster" -> Map.empty), + fixture.server, fixture.registry) + sink.report(Metrics.GroupValueMessage(Metrics.MaxGroupOffsetLagMetric, "cluster", "group", 100)) + sink.remove(Metrics.GroupRemoveMetricMessage(Metrics.MaxGroupOffsetLagMetric, "cluster", "group")) + sink.remove(Metrics.GroupRemoveMetricMessage(Metrics.MaxGroupTimeLagMetric, "cluster", "group")) + val labels = Array[String]("cluster_name", "group") + val labelVals = Array[String]("cluster", "group") + fixture.registry.getSampleValue("kafka_consumergroup_group_max_lag", labels, labelVals) should be (null) + } + + } + +} From 1d69db38c9c50d6a149c7c229e403fc08b40cdcc Mon Sep 17 00:00:00 2001 From: khorshuheng Date: Mon, 16 Sep 2019 10:39:36 +0800 Subject: [PATCH 2/2] Clean up remove metric --- .../kafkalagexporter/PrometheusEndpointSink.scala | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/src/main/scala/com/lightbend/kafkalagexporter/PrometheusEndpointSink.scala b/src/main/scala/com/lightbend/kafkalagexporter/PrometheusEndpointSink.scala index f8b1b63a..17e5ca2d 100644 --- a/src/main/scala/com/lightbend/kafkalagexporter/PrometheusEndpointSink.scala +++ b/src/main/scala/com/lightbend/kafkalagexporter/PrometheusEndpointSink.scala @@ -49,9 +49,13 @@ class PrometheusEndpointSink private(definitions: MetricDefinitions, metricWhite override def remove(m: RemoveMetric): Unit = { if(metricWhitelist.exists(m.definition.name.matches)) { - metrics.foreach { case (_, gaugeDefinitionsForCluster) => - val globalLabelValuesForCluster = clusterGlobalLabels.getOrElse(m.clusterName, Map.empty) - gaugeDefinitionsForCluster.get(m.definition).foreach(_.remove(globalLabelValuesForCluster.values.toSeq ++ m.labels: _*)) + for( + clusterMetrics <- metrics.get(m.clusterName); + globalLabels <- clusterGlobalLabels.get(m.clusterName); + gauge <- clusterMetrics.get(m.definition) + ) { + val metricLabels = globalLabels.values.toList ++ m.labels + gauge.remove(metricLabels: _*) } } }