Skip to content
This repository has been archived by the owner on Mar 17, 2024. It is now read-only.

Support arbitrary kafka client configuration #48

Merged
merged 1 commit into from
Aug 7, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 15 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -203,13 +203,12 @@ General Configuration (`kafka-lag-exporter{}`)

Kafka Cluster Connection Details (`kafka-lag-exporter.clusters[]`)

| Key | Default | Required | Description |
|---------------------|-------------|----------|--------------------------------------------------------------------|
| `name` | `""` | Yes | A unique cluster name to for this Kafka connection detail object |
| `bootstrap-brokers` | `""` | Yes | Kafka bootstrap brokers. Comma delimited list of broker hostnames |
| `security-protocol` | `PLAINTEXT` | No | The Kafka security protocol. `PLAINTEXT` or `TLS`. |
| `sasl-mechanism` | `""` | No | Kafka SASL mechanism |
| `sasl-jaas-config` | `""` | No | Kafka JAAS configuration |
| Key | Default | Required | Description |
|---------------------------|-------------|----------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| `name` | `""` | Yes | A unique cluster name to for this Kafka connection detail object |
| `bootstrap-brokers` | `""` | Yes | Kafka bootstrap brokers. Comma delimited list of broker hostnames |
| `consumer-properties` | `{}` | No | A map of key value pairs used to configure the `KafkaConsumer`. See the [Consumer Config](https://kafka.apache.org/documentation/#consumerconfigs) section of the Kafka documentation for options. |
| `admin-client-properties` | `{}` | No | A map of key value pairs used to configure the `AdminClient`. See the [Admit Config](https://kafka.apache.org/documentation/#adminclientconfigs) section of the Kafka documentation for options. |

Watchers (`kafka-lag-exporters.watchers{}`)

Expand All @@ -218,16 +217,23 @@ Watchers (`kafka-lag-exporters.watchers{}`)
| `strimzi` | `false` | Toggle for using Strimzi auto-discovery. |


Ex) Expose metrics on port `9999`, double the default lookup table size, and setup a single non-TLS cluster connection object.
Ex) Expose metrics on port `9999`, double the default lookup table size, and define `client.id`'s for the `KafkaConsumer`
and `AdminClient` used by the project.

```
kafka-lag-exporter {
port = 9999
lookup-table-size = 60
lookup-table-size = 120
clusters = [
{
name = "a-cluster"
bootstrap-brokers = "a-1.cluster-a.xyzcorp.com:9092,a-2.cluster-a.xyzcorp.com:9092,a-3.cluster-a.xyzcorp.com:9092"
consumer-properties = {
client.id = "consumer-client-id"
}
admin-client-properties = {
client.id = "admin-client-id"
}
}
]
}
Expand Down
21 changes: 13 additions & 8 deletions charts/kafka-lag-exporter/templates/030-ConfigMap.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,23 @@ data:
port = {{ .Values.service.port }}
poll-interval = {{ .Values.pollIntervalSeconds }} seconds
lookup-table-size = {{ .Values.lookupTableSize }}
client-group-id = {{ .Values.clientGroupId }}
client-group-id = "{{ .Values.clientGroupId }}"
kafka-client-timeout = {{ .Values.kafkaClientTimeoutSeconds }} seconds
clusters = [
{{- range $cluster := .Values.clusters }}
{
name = {{ $cluster.name }}
bootstrap-brokers = {{ $cluster.bootstrapBrokers }}
config = [
# {{- range $property := $cluster }}
# $property.key = $property.value
# {{- end }}
]
name = "{{ $cluster.name }}"
bootstrap-brokers = "{{ $cluster.bootstrapBrokers }}"
consumer-properties = {
{{- range $key, $val := $cluster.consumerProperties }}
{{ $key }} = "{{ $val }}"
{{- end }}
}
admin-client-properties = {
{{- range $key, $val := $cluster.adminClientProperties }}
{{ $key }} = "{{ $val }}"
{{- end }}
}
}
{{- end }}
]
Expand Down
16 changes: 11 additions & 5 deletions charts/kafka-lag-exporter/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,17 @@ clusters: {}
## Ex)
#clusters:
# - name: "default"
# bootstrapBrokers: "my-cluster-kafka-bootstrap:9092"
# # optional values for TLS/SASL enabled clusters
# securityProtocol: SASL_SSL
# saslMechanism: PLAIN
# saslJaasConfig: org.apache.kafka.common.security.plain.PlainLoginModule required username=\"foo\" password=\"bar\";
# bootstrapBrokers: "simple-strimzi-kafka-bootstrap.strimzi.svc.cluster.local:9092"
# # Properties defined by org.apache.kafka.clients.consumer.ConsumerConfig
# # can be defined in this configuration section.
# # https://kafka.apache.org/documentation/#consumerconfigs
# consumerProperties: {
# client.id: consumer-client-id
# }
# # https://kafka.apache.org/documentation/#adminclientconfigs
# adminClientProperties: {
# client.id: admin-client-id
# }

## The interval between refreshing metrics
pollIntervalSeconds: 30
Expand Down
71 changes: 55 additions & 16 deletions src/main/scala/com/lightbend/kafkalagexporter/AppConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,14 @@

package com.lightbend.kafkalagexporter

import com.typesafe.config.Config
import java.util

import com.typesafe.config.{Config, ConfigObject}

import scala.annotation.tailrec
import scala.collection.JavaConverters._
import scala.compat.java8.DurationConverters._
import scala.concurrent.duration.FiniteDuration
import scala.concurrent.duration.{Duration, FiniteDuration}

object AppConfig {
def apply(config: Config): AppConfig = {
Expand All @@ -19,36 +22,72 @@ object AppConfig {
val clientGroupId = c.getString("client-group-id")
val kafkaClientTimeout = c.getDuration("kafka-client-timeout").toScala
val clusters = c.getConfigList("clusters").asScala.toList.map { clusterConfig =>
val consumerProperties =
if (clusterConfig.hasPath("consumer-properties"))
parseKafkaClientsProperties(clusterConfig.getConfig("consumer-properties"))
else
Map.empty[String, String]
val adminClientProperties =
if (clusterConfig.hasPath("admin-client-properties"))
parseKafkaClientsProperties(clusterConfig.getConfig("admin-client-properties"))
else
Map.empty[String, String]

KafkaCluster(
clusterConfig.getString("name"),
clusterConfig.getString("bootstrap-brokers"),
if (clusterConfig.hasPath("security-protocol")) clusterConfig.getString("security-protocol") else "PLAINTEXT",
if (clusterConfig.hasPath("sasl-mechanism")) clusterConfig.getString("sasl-mechanism") else "",
if (clusterConfig.hasPath("sasl-jaas-config")) clusterConfig.getString("sasl-jaas-config") else ""
consumerProperties,
adminClientProperties
)
}
val strimziWatcher = c.getString("watchers.strimzi").toBoolean
AppConfig(pollInterval, lookupTableSize, port, clientGroupId, kafkaClientTimeout, clusters, strimziWatcher)
}

// Copied from Alpakka Kafka
// https://github.com/akka/alpakka-kafka/blob/v1.0.5/core/src/main/scala/akka/kafka/internal/ConfigSettings.scala
def parseKafkaClientsProperties(config: Config): Map[String, String] = {
@tailrec
def collectKeys(c: ConfigObject, processedKeys: Set[String], unprocessedKeys: List[String]): Set[String] =
if (unprocessedKeys.isEmpty) processedKeys
else {
c.toConfig.getAnyRef(unprocessedKeys.head) match {
case o: util.Map[_, _] =>
collectKeys(c,
processedKeys,
unprocessedKeys.tail ::: o.keySet().asScala.toList.map(unprocessedKeys.head + "." + _))
case _ =>
collectKeys(c, processedKeys + unprocessedKeys.head, unprocessedKeys.tail)
}
}

val keys = collectKeys(config.root, Set.empty[String], config.root().keySet().asScala.toList)
keys.map(key => key -> config.getString(key)).toMap
}

def getPotentiallyInfiniteDuration(underlying: Config, path: String): Duration = underlying.getString(path) match {
case "infinite" => Duration.Inf
case _ => underlying.getDuration(path).toScala
}
}

final case class KafkaCluster(name: String, bootstrapBrokers: String, securityProtocol: String = "PLAINTEXT",
saslMechanism: String = "", saslJaasConfig: String = "")
final case class KafkaCluster(name: String, bootstrapBrokers: String,
consumerProperties: Map[String, String] = Map.empty,
adminClientProperties: Map[String, String] = Map.empty) {
override def toString(): String = {
s"""
| Cluster name: $name
| Cluster Kafka bootstrap brokers: $bootstrapBrokers
""".stripMargin
}
}
final case class AppConfig(pollInterval: FiniteDuration, lookupTableSize: Int, port: Int, clientGroupId: String,
clientTimeout: FiniteDuration, clusters: List[KafkaCluster], strimziWatcher: Boolean) {
override def toString(): String = {
val clusterString =
if (clusters.isEmpty)
" (none)"
else
clusters.map { cluster =>
s"""
| Cluster name: ${cluster.name}
| Cluster Kafka bootstrap brokers: ${cluster.bootstrapBrokers}
| Cluster security protocol: ${cluster.securityProtocol}
| Cluster SASL mechanism: ${cluster.saslMechanism}
""".stripMargin
}.mkString("\n")
else clusters.map(_.toString).mkString("\n")
s"""
|Poll interval: $pollInterval
|Lookup table size: $lookupTableSize
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ object ConsumerGroupCollector {
latestOffsets: PartitionOffsets,
lastGroupOffsets: GroupOffsets
) extends Message {
private val TpoFormat = " %-64s%-11s%s"
private val GtpFormat = " %-64s%-64s%-11s%s"

def diff(other: OffsetsSnapshot): (List[TopicPartition], List[String], List[GroupTopicPartition]) = {
val evictedTps = latestOffsets.keySet.diff(other.latestOffsets.keySet).toList
val evictedGroups = groups.diff(other.groups)
Expand All @@ -40,13 +43,13 @@ object ConsumerGroupCollector {
}

override def toString: String = {
val latestOffsetHeader = " Topic Partition Offset"
val latestOffsetHeader = TpoFormat.format("Topic", "Partition", "Offset")
val latestOffsetsStr = latestOffsets.map {
case (TopicPartition(t, p), LookupTable.Point(offset, _)) => f" $t%-64s$p%-11s$offset"
case (TopicPartition(t, p), LookupTable.Point(offset, _)) => TpoFormat.format(t,p,offset)
}
val lastGroupOffsetHeader = " Group Topic Partition Offset"
val lastGroupOffsetHeader = GtpFormat.format("Group", "Topic", "Partition", "Offset")
val lastGroupOffsetsStr = lastGroupOffsets.map {
case (GroupTopicPartition(id, _, _, _, t, p), LookupTable.Point(offset, _)) => f" $id%-64s$t%-64s$p%-11s$offset"
case (GroupTopicPartition(id, _, _, _, t, p), LookupTable.Point(offset, _)) => GtpFormat.format(id, t, p, offset)
}

s"""
Expand Down
13 changes: 4 additions & 9 deletions src/main/scala/com/lightbend/kafkalagexporter/KafkaClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,16 @@ import java.{lang, util}
import com.lightbend.kafkalagexporter.Domain.GroupOffsets
import com.lightbend.kafkalagexporter.KafkaClient.KafkaClientContract
import org.apache.kafka.clients.admin._
import org.apache.kafka.clients.CommonClientConfigs
import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer, OffsetAndMetadata}
import org.apache.kafka.common.config.SaslConfigs
import org.apache.kafka.common.serialization.ByteArrayDeserializer
import org.apache.kafka.common.{KafkaFuture, TopicPartition => KafkaTopicPartition}

import scala.collection.JavaConverters._
import scala.collection.immutable.Map
import scala.compat.java8.DurationConverters._
import scala.concurrent.duration.FiniteDuration
import scala.concurrent.{ExecutionContext, Future, Promise}
import scala.util.Try
import scala.collection.immutable.Map

object KafkaClient {
val AdminClientConfigRetries = 0 // fail faster when there are transient connection errors, use supervision strategy for backoff
Expand Down Expand Up @@ -55,10 +53,8 @@ object KafkaClient {
private def createAdminClient(cluster: KafkaCluster, clientTimeout: FiniteDuration): AdminClient = {
val props = new Properties()
// AdminClient config: https://kafka.apache.org/documentation/#adminclientconfigs
props.putAll(cluster.adminClientProperties.asJava)
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapBrokers)
props.put(AdminClientConfig.SECURITY_PROTOCOL_CONFIG, cluster.securityProtocol)
props.put(SaslConfigs.SASL_MECHANISM, cluster.saslMechanism)
props.put(SaslConfigs.SASL_JAAS_CONFIG, cluster.saslJaasConfig)
props.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, clientTimeout.toMillis.toString)
props.put(AdminClientConfig.RETRIES_CONFIG, AdminClientConfigRetries.toString)
props.put(AdminClientConfig.RETRY_BACKOFF_MS_CONFIG, CommonClientConfigRetryBackoffMs.toString)
Expand All @@ -68,10 +64,9 @@ object KafkaClient {
private def createConsumerClient(cluster: KafkaCluster, groupId: String, clientTimeout: FiniteDuration): KafkaConsumer[Byte, Byte] = {
val props = new Properties()
val deserializer = (new ByteArrayDeserializer).getClass.getName
// https://kafka.apache.org/documentation/#consumerconfigs
props.putAll(cluster.consumerProperties.asJava)
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapBrokers)
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, cluster.securityProtocol)
props.put(SaslConfigs.SASL_MECHANISM, cluster.saslMechanism)
props.put(SaslConfigs.SASL_JAAS_CONFIG, cluster.saslJaasConfig)
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId)
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, ConsumerConfigAutoCommit.toString)
//props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000")
Expand Down
36 changes: 10 additions & 26 deletions src/test/scala/com/lightbend/kafkalagexporter/AppConfigSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,12 @@ class AppConfigSpec extends FreeSpec with Matchers {
| {
| name = "clusterA"
| bootstrap-brokers = "b-1.cluster-a.xyzcorp.com:9092,b-2.cluster-a.xyzcorp.com:9092"
| consumer-properties = {
| client.id = "consumer-client-id"
| }
| admin-client-properties = {
| client.id = "admin-client-id"
| }
| }
| {
| name = "clusterB"
Expand All @@ -29,34 +35,12 @@ class AppConfigSpec extends FreeSpec with Matchers {
appConfig.clusters.length shouldBe 2
appConfig.clusters(0).name shouldBe "clusterA"
appConfig.clusters(0).bootstrapBrokers shouldBe "b-1.cluster-a.xyzcorp.com:9092,b-2.cluster-a.xyzcorp.com:9092"
appConfig.clusters(0).securityProtocol shouldBe "PLAINTEXT"
appConfig.clusters(0).consumerProperties("client.id") shouldBe "consumer-client-id"
appConfig.clusters(0).adminClientProperties("client.id") shouldBe "admin-client-id"
appConfig.clusters(1).name shouldBe "clusterB"
appConfig.clusters(1).bootstrapBrokers shouldBe "b-1.cluster-b.xyzcorp.com:9092,b-2.cluster-b.xyzcorp.com:9092"
appConfig.clusters(1).securityProtocol shouldBe "PLAINTEXT"
}

"should parse static cluster with SSL info" in {
val config: Config = loadConfig(s"""
|kafka-lag-exporter {
| clusters = [
| {
| name = "clusterA"
| bootstrap-brokers = "b-1.cluster-a.xyzcorp.com:9092,b-2.cluster-a.xyzcorp.com:9092"
| security-protocol = "SASL_SSL"
| sasl-mechanism = "GSSAPI"
| sasl-jaas-config = "listener.name.sasl_ssl.scram-sha-256.sasl.jaas.config=com.example.ScramLoginModule required;"
| }
| ]
|}""".stripMargin)

val appConfig = AppConfig(config)

appConfig.clusters.length shouldBe 1
appConfig.clusters(0).name shouldBe "clusterA"
appConfig.clusters(0).bootstrapBrokers shouldBe "b-1.cluster-a.xyzcorp.com:9092,b-2.cluster-a.xyzcorp.com:9092"
appConfig.clusters(0).securityProtocol shouldBe "SASL_SSL"
appConfig.clusters(0).saslMechanism shouldBe "GSSAPI"
appConfig.clusters(0).saslJaasConfig shouldBe "listener.name.sasl_ssl.scram-sha-256.sasl.jaas.config=com.example.ScramLoginModule required;"
appConfig.clusters(1).consumerProperties shouldBe Map.empty
appConfig.clusters(1).adminClientProperties shouldBe Map.empty
}
}

Expand Down