From 18d3de17436a25f1b278a56188662d77f7f10774 Mon Sep 17 00:00:00 2001 From: Praveen K Palaniswamy Date: Thu, 23 Mar 2017 18:31:37 -0500 Subject: [PATCH 1/8] Kafka_0_10_2_0 as option --- app/controllers/Logkafka.scala | 4 ++++ app/controllers/Topic.scala | 3 +++ app/kafka/manager/utils/LogkafkaNewConfigs.scala | 3 ++- app/kafka/manager/utils/TopicConfigs.scala | 3 ++- 4 files changed, 11 insertions(+), 2 deletions(-) diff --git a/app/controllers/Logkafka.scala b/app/controllers/Logkafka.scala index 39c48e08f..9191f5f36 100644 --- a/app/controllers/Logkafka.scala +++ b/app/controllers/Logkafka.scala @@ -72,6 +72,8 @@ class Logkafka (val messagesApi: MessagesApi, val kafkaManagerContext: KafkaMana LogkafkaNewConfigs.configMaps(Kafka_0_10_0_1).map{case(k,v) => LKConfig(k,Some(v))}.toList) val kafka_0_10_1_0_Default = CreateLogkafka("","", LogkafkaNewConfigs.configMaps(Kafka_0_10_1_0).map{case(k,v) => LKConfig(k,Some(v))}.toList) + val kafka_0_10_2_0_Default = CreateLogkafka("","", + LogkafkaNewConfigs.configMaps(Kafka_0_10_2_0).map{case(k,v) => LKConfig(k,Some(v))}.toList) val defaultCreateForm = Form( mapping( @@ -119,6 +121,7 @@ class Logkafka (val messagesApi: MessagesApi, val kafkaManagerContext: KafkaMana case Kafka_0_10_0_0 => (defaultCreateForm.fill(kafka_0_10_0_0_Default), clusterContext) case Kafka_0_10_0_1 => (defaultCreateForm.fill(kafka_0_10_0_1_Default), clusterContext) case Kafka_0_10_1_0 => (defaultCreateForm.fill(kafka_0_10_1_0_Default), clusterContext) + case Kafka_0_10_2_0 => (defaultCreateForm.fill(kafka_0_10_2_0_Default), clusterContext) } } } @@ -210,6 +213,7 @@ class Logkafka (val messagesApi: MessagesApi, val kafkaManagerContext: KafkaMana case Kafka_0_10_0_0 => LogkafkaNewConfigs.configNames(Kafka_0_10_0_0).map(n => (n,LKConfig(n,None))).toMap case Kafka_0_10_0_1 => LogkafkaNewConfigs.configNames(Kafka_0_10_0_1).map(n => (n,LKConfig(n,None))).toMap case Kafka_0_10_1_0 => LogkafkaNewConfigs.configNames(Kafka_0_10_1_0).map(n => (n,LKConfig(n,None))).toMap + case Kafka_0_10_2_0 => LogkafkaNewConfigs.configNames(Kafka_0_10_2_0).map(n => (n,LKConfig(n,None))).toMap } val identityOption = li.identityMap.get(log_path) if (identityOption.isDefined) { diff --git a/app/controllers/Topic.scala b/app/controllers/Topic.scala index de21b2909..02c7e05a6 100644 --- a/app/controllers/Topic.scala +++ b/app/controllers/Topic.scala @@ -55,6 +55,7 @@ class Topic (val messagesApi: MessagesApi, val kafkaManagerContext: KafkaManager val kafka_0_10_0_0_Default = CreateTopic("",1,1,TopicConfigs.configNames(Kafka_0_10_0_0).map(n => TConfig(n,None)).toList) val kafka_0_10_0_1_Default = CreateTopic("",1,1,TopicConfigs.configNames(Kafka_0_10_0_1).map(n => TConfig(n,None)).toList) val kafka_0_10_1_0_Default = CreateTopic("",1,1,TopicConfigs.configNames(Kafka_0_10_1_0).map(n => TConfig(n,None)).toList) + val kafka_0_10_2_0_Default = CreateTopic("",1,1,TopicConfigs.configNames(Kafka_0_10_2_0).map(n => TConfig(n,None)).toList) val defaultCreateForm = Form( mapping( @@ -142,6 +143,7 @@ class Topic (val messagesApi: MessagesApi, val kafkaManagerContext: KafkaManager case Kafka_0_10_0_0 => (defaultCreateForm.fill(kafka_0_10_0_0_Default), clusterContext) case Kafka_0_10_0_1 => (defaultCreateForm.fill(kafka_0_10_0_1_Default), clusterContext) case Kafka_0_10_1_0 => (defaultCreateForm.fill(kafka_0_10_1_0_Default), clusterContext) + case Kafka_0_10_2_0 => (defaultCreateForm.fill(kafka_0_10_2_0_Default), clusterContext) } } } @@ -379,6 +381,7 @@ class Topic (val messagesApi: MessagesApi, val kafkaManagerContext: KafkaManager case Kafka_0_10_0_0 => TopicConfigs.configNames(Kafka_0_10_0_0).map(n => (n,TConfig(n,None))).toMap case Kafka_0_10_0_1 => TopicConfigs.configNames(Kafka_0_10_0_1).map(n => (n,TConfig(n,None))).toMap case Kafka_0_10_1_0 => TopicConfigs.configNames(Kafka_0_10_1_0).map(n => (n,TConfig(n,None))).toMap + case Kafka_0_10_2_0 => TopicConfigs.configNames(Kafka_0_10_2_0).map(n => (n,TConfig(n,None))).toMap } val combinedMap = defaultConfigMap ++ ti.config.toMap.map(tpl => tpl._1 -> TConfig(tpl._1,Option(tpl._2))) (defaultUpdateConfigForm.fill(UpdateTopicConfig(ti.topic,combinedMap.toList.map(_._2),ti.configReadVersion)), diff --git a/app/kafka/manager/utils/LogkafkaNewConfigs.scala b/app/kafka/manager/utils/LogkafkaNewConfigs.scala index 1398e7a80..b56ce7665 100644 --- a/app/kafka/manager/utils/LogkafkaNewConfigs.scala +++ b/app/kafka/manager/utils/LogkafkaNewConfigs.scala @@ -26,7 +26,8 @@ object LogkafkaNewConfigs { Kafka_0_9_0_1 -> logkafka82.LogConfig, Kafka_0_10_0_0 -> logkafka82.LogConfig, Kafka_0_10_0_1 -> logkafka82.LogConfig, - Kafka_0_10_1_0 -> logkafka82.LogConfig + Kafka_0_10_1_0 -> logkafka82.LogConfig, + Kafka_0_10_2_0 -> logkafka82.LogConfig ) def configNames(version: KafkaVersion) : Set[String] = { diff --git a/app/kafka/manager/utils/TopicConfigs.scala b/app/kafka/manager/utils/TopicConfigs.scala index 07d110a33..c190bf64f 100644 --- a/app/kafka/manager/utils/TopicConfigs.scala +++ b/app/kafka/manager/utils/TopicConfigs.scala @@ -26,7 +26,8 @@ object TopicConfigs { Kafka_0_9_0_1 -> zero90.LogConfig, Kafka_0_10_0_0 -> zero90.LogConfig, Kafka_0_10_0_1 -> zero90.LogConfig, - Kafka_0_10_1_0 -> zero90.LogConfig + Kafka_0_10_1_0 -> zero90.LogConfig, + Kafka_0_10_2_0 -> zero90.LogConfig ) def configNames(version: KafkaVersion) : Set[String] = { From 52bfc5cbf34de9945a71c7708ad0d186e92a7470 Mon Sep 17 00:00:00 2001 From: Praveen K Palaniswamy Date: Thu, 23 Mar 2017 18:32:11 -0500 Subject: [PATCH 2/8] version upgrades --- build.sbt | 8 +++++--- project/build.properties | 2 +- project/plugins.sbt | 2 +- 3 files changed, 7 insertions(+), 5 deletions(-) diff --git a/build.sbt b/build.sbt index e934503c6..631cb5afc 100644 --- a/build.sbt +++ b/build.sbt @@ -9,6 +9,8 @@ version := "1.3.3.4" scalaVersion := "2.11.8" +licenses := Seq("Apache-2.0" -> new URL("http://www.apache.org/licenses/LICENSE-2.0.txt")) + scalacOptions ++= Seq("-Xlint:-missing-interpolator","-Xfatal-warnings","-deprecation","-feature","-language:implicitConversions","-language:postfixOps","-Xmax-classfile-name","240") // From https://www.playframework.com/documentation/2.3.x/ProductionDist @@ -28,11 +30,11 @@ libraryDependencies ++= Seq( "org.webjars" % "backbonejs" % "1.2.3", "org.webjars" % "underscorejs" % "1.8.3", "org.webjars" % "dustjs-linkedin" % "2.6.1-1", - "org.apache.curator" % "curator-framework" % "2.10.0" exclude("log4j","log4j") exclude("org.slf4j", "slf4j-log4j12") force(), - "org.apache.curator" % "curator-recipes" % "2.10.0" exclude("log4j","log4j") exclude("org.slf4j", "slf4j-log4j12") force(), + "org.apache.curator" % "curator-framework" % "2.12.0" exclude("log4j","log4j") exclude("org.slf4j", "slf4j-log4j12") force(), + "org.apache.curator" % "curator-recipes" % "2.12.0" exclude("log4j","log4j") exclude("org.slf4j", "slf4j-log4j12") force(), "org.json4s" %% "json4s-jackson" % "3.4.0", "org.json4s" %% "json4s-scalaz" % "3.4.0", - "org.slf4j" % "log4j-over-slf4j" % "1.7.12", + "org.slf4j" % "log4j-over-slf4j" % "1.7.21", "com.adrianhurt" %% "play-bootstrap3" % "0.4.5-P24", "org.clapper" %% "grizzled-slf4j" % "1.0.2", "org.apache.kafka" %% "kafka" % "0.10.0.1" exclude("log4j","log4j") exclude("org.slf4j", "slf4j-log4j12") force(), diff --git a/project/build.properties b/project/build.properties index 817bc38df..27e88aa11 100644 --- a/project/build.properties +++ b/project/build.properties @@ -1 +1 @@ -sbt.version=0.13.9 +sbt.version=0.13.13 diff --git a/project/plugins.sbt b/project/plugins.sbt index 11b7ec8c0..11ce9ab85 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -28,7 +28,7 @@ addSbtPlugin("com.typesafe.sbt" % "sbt-less" % "1.1.1") addSbtPlugin("net.virtual-void" % "sbt-dependency-graph" % "0.8.2") -addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.1") +addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.4") // Support packaging plugins addSbtPlugin("com.typesafe.sbt" % "sbt-native-packager" % "1.0.5") From ec755a0a1c1d7ff369a470049b1873922bacf8b5 Mon Sep 17 00:00:00 2001 From: Praveen K Palaniswamy Date: Thu, 23 Mar 2017 18:32:51 -0500 Subject: [PATCH 3/8] ui bug fix --- app/views/cluster/addCluster.scala.html | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/app/views/cluster/addCluster.scala.html b/app/views/cluster/addCluster.scala.html index 6ebd4089d..c2bb11fe8 100644 --- a/app/views/cluster/addCluster.scala.html +++ b/app/views/cluster/addCluster.scala.html @@ -31,7 +31,7 @@ @b3.checkbox(form("jmxEnabled"), '_text -> "Enable JMX Polling (Set JMX_PORT env variable before starting kafka server)") @b3.text(form("jmxUser"), '_label -> "JMX Auth Username") @b3.text(form("jmxPass"), '_label -> "JMX Auth Password") - @b3.checkbox(form("jmxSsl"), '_label -> "JMX with SSL") + @b3.checkbox(form("jmxSsl"), '_text -> "JMX with SSL") @b3.checkbox(form("logkafkaEnabled"), '_text -> "Enable Logkafka") @b3.checkbox(form("pollConsumers"), '_text -> "Poll consumer information (Not recommended for large # of consumers)") @b3.checkbox(form("filterConsumers"), '_text -> "Filter out inactive consumers") From 9ef25ac5f73c25703de7e59a2b13a2022c941eff Mon Sep 17 00:00:00 2001 From: Praveen K Palaniswamy Date: Thu, 23 Mar 2017 18:36:22 -0500 Subject: [PATCH 4/8] lock cluster in read only mode --- app/controllers/Cluster.scala | 6 + app/kafka/manager/KafkaManager.scala | 4 + .../manager/actor/KafkaManagerActor.scala | 1 + app/kafka/manager/features/KMFeature.scala | 4 + app/kafka/manager/model/model.scala | 10 +- app/models/form/ClusterOperation.scala | 7 +- app/models/navigation/Menus.scala | 10 +- app/views/cluster/addCluster.scala.html | 1 + app/views/cluster/updateCluster.scala.html | 1 + app/views/topic/topicList.scala.html | 43 ++++--- app/views/topic/topicViewContent.scala.html | 110 ++++++++++-------- .../manager/TestClusterManagerActor.scala | 2 +- test/kafka/manager/TestKafkaManager.scala | 8 +- .../manager/utils/TestClusterConfig.scala | 20 ++-- 14 files changed, 136 insertions(+), 91 deletions(-) diff --git a/app/controllers/Cluster.scala b/app/controllers/Cluster.scala index bc975eda1..2bfb10100 100644 --- a/app/controllers/Cluster.scala +++ b/app/controllers/Cluster.scala @@ -77,6 +77,7 @@ class Cluster (val messagesApi: MessagesApi, val kafkaManagerContext: KafkaManag , "jmxUser" -> optional(text) , "jmxPass" -> optional(text) , "jmxSsl" -> boolean + , "restrictOperations" -> boolean , "pollConsumers" -> boolean , "filterConsumers" -> boolean , "logkafkaEnabled" -> boolean @@ -115,6 +116,7 @@ class Cluster (val messagesApi: MessagesApi, val kafkaManagerContext: KafkaManag "jmxUser" -> optional(text), "jmxPass" -> optional(text), "jmxSsl" -> boolean, + "restrictOperations" -> boolean, "pollConsumers" -> boolean, "filterConsumers" -> boolean, "logkafkaEnabled" -> boolean, @@ -157,6 +159,7 @@ class Cluster (val messagesApi: MessagesApi, val kafkaManagerContext: KafkaManag ,false ,false ,false + ,false ,Option(defaultTuning) ) } @@ -199,6 +202,7 @@ class Cluster (val messagesApi: MessagesApi, val kafkaManagerContext: KafkaManag cc.jmxUser, cc.jmxPass, cc.jmxSsl, + cc.restrictOperations, cc.pollConsumers, cc.filterConsumers, cc.logkafkaEnabled, @@ -224,6 +228,7 @@ class Cluster (val messagesApi: MessagesApi, val kafkaManagerContext: KafkaManag clusterConfig.jmxUser, clusterConfig.jmxPass, clusterConfig.jmxSsl, + clusterConfig.restrictOperations, clusterConfig.pollConsumers, clusterConfig.filterConsumers, clusterConfig.tuning, @@ -292,6 +297,7 @@ class Cluster (val messagesApi: MessagesApi, val kafkaManagerContext: KafkaManag clusterOperation.clusterConfig.jmxUser, clusterOperation.clusterConfig.jmxPass, clusterOperation.clusterConfig.jmxSsl, + clusterOperation.clusterConfig.restrictOperations, clusterOperation.clusterConfig.pollConsumers, clusterOperation.clusterConfig.filterConsumers, clusterOperation.clusterConfig.tuning, diff --git a/app/kafka/manager/KafkaManager.scala b/app/kafka/manager/KafkaManager.scala index 38fd8e89b..b4c7d9502 100644 --- a/app/kafka/manager/KafkaManager.scala +++ b/app/kafka/manager/KafkaManager.scala @@ -244,6 +244,7 @@ class KafkaManager(akkaConfig: Config) extends Logging { jmxUser: Option[String], jmxPass: Option[String], jmxSsl: Boolean, + restrictOperations: Boolean, pollConsumers: Boolean, filterConsumers: Boolean, tuning: Option[ClusterTuning], @@ -260,6 +261,7 @@ class KafkaManager(akkaConfig: Config) extends Logging { jmxUser = jmxUser, jmxPass = jmxPass, jmxSsl = jmxSsl, + restrictOperations = restrictOperations, pollConsumers = pollConsumers, filterConsumers = filterConsumers, logkafkaEnabled = logkafkaEnabled, @@ -277,6 +279,7 @@ class KafkaManager(akkaConfig: Config) extends Logging { jmxUser: Option[String], jmxPass: Option[String], jmxSsl: Boolean, + restrictOperations: Boolean, pollConsumers: Boolean, filterConsumers: Boolean, tuning: Option[ClusterTuning], @@ -293,6 +296,7 @@ class KafkaManager(akkaConfig: Config) extends Logging { jmxUser = jmxUser, jmxPass = jmxPass, jmxSsl = jmxSsl, + restrictOperations = restrictOperations, pollConsumers = pollConsumers, filterConsumers = filterConsumers, logkafkaEnabled = logkafkaEnabled, diff --git a/app/kafka/manager/actor/KafkaManagerActor.scala b/app/kafka/manager/actor/KafkaManagerActor.scala index d86439709..b33213be9 100644 --- a/app/kafka/manager/actor/KafkaManagerActor.scala +++ b/app/kafka/manager/actor/KafkaManagerActor.scala @@ -479,6 +479,7 @@ class KafkaManagerActor(kafkaManagerConfig: KafkaManagerActorConfig) && newConfig.jmxUser == currentConfig.jmxUser && newConfig.jmxPass == currentConfig.jmxPass && newConfig.logkafkaEnabled == currentConfig.logkafkaEnabled + && newConfig.restrictOperations == currentConfig.restrictOperations && newConfig.pollConsumers == currentConfig.pollConsumers && newConfig.filterConsumers == currentConfig.filterConsumers && newConfig.activeOffsetCacheEnabled == currentConfig.activeOffsetCacheEnabled diff --git a/app/kafka/manager/features/KMFeature.scala b/app/kafka/manager/features/KMFeature.scala index a5c2916d8..487608c8a 100644 --- a/app/kafka/manager/features/KMFeature.scala +++ b/app/kafka/manager/features/KMFeature.scala @@ -20,6 +20,7 @@ sealed trait ClusterFeature extends KMFeature case object KMLogKafkaFeature extends ClusterFeature case object KMDeleteTopicFeature extends ClusterFeature +case object KMRestrictedFeature extends ClusterFeature case object KMJMXMetricsFeature extends ClusterFeature case object KMDisplaySizeFeature extends ClusterFeature case object KMPollConsumersFeature extends ClusterFeature @@ -73,6 +74,9 @@ object ClusterFeatures { if(clusterConfig.pollConsumers) buffer+=KMPollConsumersFeature + if(clusterConfig.restrictOperations) + buffer+=KMRestrictedFeature + ClusterFeatures(buffer.toSet) } } diff --git a/app/kafka/manager/model/model.scala b/app/kafka/manager/model/model.scala index 1c7dd5e27..2fd8a701f 100644 --- a/app/kafka/manager/model/model.scala +++ b/app/kafka/manager/model/model.scala @@ -106,6 +106,7 @@ object ClusterConfig { , jmxUser: Option[String] , jmxPass: Option[String] , jmxSsl: Boolean + , restrictOperations:Boolean , pollConsumers: Boolean , filterConsumers: Boolean , logkafkaEnabled: Boolean = false @@ -128,6 +129,7 @@ object ClusterConfig { , jmxUser , jmxPass , jmxSsl + , restrictOperations , pollConsumers , filterConsumers , logkafkaEnabled @@ -138,10 +140,10 @@ object ClusterConfig { } def customUnapply(cc: ClusterConfig) : Option[( - String, String, String, Int, Boolean, Option[String], Option[String], Boolean, Boolean, Boolean, Boolean, Boolean, Boolean, Option[ClusterTuning])] = { + String, String, String, Int, Boolean, Option[String], Option[String], Boolean, Boolean, Boolean, Boolean, Boolean, Boolean, Boolean, Option[ClusterTuning])] = { Some(( cc.name, cc.version.toString, cc.curatorConfig.zkConnect, cc.curatorConfig.zkMaxRetry, - cc.jmxEnabled, cc.jmxUser, cc.jmxPass, cc.jmxSsl, cc.pollConsumers, cc.filterConsumers, + cc.jmxEnabled, cc.jmxUser, cc.jmxPass, cc.jmxSsl, cc.restrictOperations, cc.pollConsumers, cc.filterConsumers, cc.logkafkaEnabled, cc.activeOffsetCacheEnabled, cc.displaySizeEnabled, cc.tuning ) ) @@ -178,6 +180,7 @@ object ClusterConfig { :: ("jmxUser" -> toJSON(config.jmxUser)) :: ("jmxPass" -> toJSON(config.jmxPass)) :: ("jmxSsl" -> toJSON(config.jmxSsl)) + :: ("restrictOperations" -> toJSON(config.restrictOperations)) :: ("pollConsumers" -> toJSON(config.pollConsumers)) :: ("filterConsumers" -> toJSON(config.filterConsumers)) :: ("logkafkaEnabled" -> toJSON(config.logkafkaEnabled)) @@ -201,6 +204,7 @@ object ClusterConfig { val jmxUser = fieldExtended[Option[String]]("jmxUser")(json) val jmxPass = fieldExtended[Option[String]]("jmxPass")(json) val jmxSsl = fieldExtended[Boolean]("jmxSsl")(json) + val restrictOperations = fieldExtended[Boolean]("restrictOperations")(json) val pollConsumers = fieldExtended[Boolean]("pollConsumers")(json) val filterConsumers = fieldExtended[Boolean]("filterConsumers")(json) val logkafkaEnabled = fieldExtended[Boolean]("logkafkaEnabled")(json) @@ -216,6 +220,7 @@ object ClusterConfig { jmxUser.getOrElse(None), jmxPass.getOrElse(None), jmxSsl.getOrElse(false), + restrictOperations.getOrElse(false), pollConsumers.getOrElse(false), filterConsumers.getOrElse(true), logkafkaEnabled.getOrElse(false), @@ -335,6 +340,7 @@ case class ClusterConfig (name: String , jmxUser: Option[String] , jmxPass: Option[String] , jmxSsl: Boolean + , restrictOperations:Boolean , pollConsumers: Boolean , filterConsumers: Boolean , logkafkaEnabled: Boolean diff --git a/app/models/form/ClusterOperation.scala b/app/models/form/ClusterOperation.scala index 0fbc7b48f..4f81cd97c 100644 --- a/app/models/form/ClusterOperation.scala +++ b/app/models/form/ClusterOperation.scala @@ -39,6 +39,7 @@ object ClusterOperation { , jmxUser: Option[String] , jmxPass: Option[String] , jmxSsl: Boolean + , restrictOperations: Boolean , pollConsumers: Boolean , filterConsumers: Boolean , logkafkaEnabled: Boolean @@ -47,14 +48,14 @@ object ClusterOperation { , tuning: Option[ClusterTuning] ): ClusterOperation = { ClusterOperation(operation,ClusterConfig(name, version, zkHosts, zkMaxRetry, jmxEnabled, jmxUser, jmxPass, jmxSsl, - pollConsumers, filterConsumers, logkafkaEnabled, activeOffsetCacheEnabled, displaySizeEnabled, tuning)) + restrictOperations, pollConsumers, filterConsumers, logkafkaEnabled, activeOffsetCacheEnabled, displaySizeEnabled, tuning)) } - def customUnapply(co: ClusterOperation) : Option[(String, String, String, String, Int, Boolean, Option[String], Option[String], Boolean, Boolean, Boolean, Boolean, Boolean, Boolean, Option[ClusterTuning])] = { + def customUnapply(co: ClusterOperation) : Option[(String, String, String, String, Int, Boolean, Option[String], Option[String], Boolean, Boolean, Boolean, Boolean, Boolean, Boolean, Boolean, Option[ClusterTuning])] = { Option((co.op.toString, co.clusterConfig.name, co.clusterConfig.version.toString, co.clusterConfig.curatorConfig.zkConnect, co.clusterConfig.curatorConfig.zkMaxRetry, co.clusterConfig.jmxEnabled, co.clusterConfig.jmxUser, co.clusterConfig.jmxPass, co.clusterConfig.jmxSsl, - co.clusterConfig.pollConsumers, co.clusterConfig.filterConsumers, co.clusterConfig.logkafkaEnabled, + co.clusterConfig.restrictOperations, co.clusterConfig.pollConsumers, co.clusterConfig.filterConsumers, co.clusterConfig.logkafkaEnabled, co.clusterConfig.activeOffsetCacheEnabled, co.clusterConfig.displaySizeEnabled, co.clusterConfig.tuning)) } } diff --git a/app/models/navigation/Menus.scala b/app/models/navigation/Menus.scala index dc7120bbe..916afeba0 100644 --- a/app/models/navigation/Menus.scala +++ b/app/models/navigation/Menus.scala @@ -5,8 +5,8 @@ package models.navigation -import features.{KMTopicManagerFeature, KMClusterManagerFeature, ApplicationFeatures} -import kafka.manager.features.{KMLogKafkaFeature, ClusterFeatures} +import features.{ApplicationFeatures, KMClusterManagerFeature, KMTopicManagerFeature} +import kafka.manager.features.{ClusterFeatures, KMLogKafkaFeature, KMRestrictedFeature} /** * @author hiral @@ -27,11 +27,11 @@ class Menus(implicit applicationFeatures: ApplicationFeatures) { Option(Menu("Cluster", items, None)) } - private[this] def topicMenu(cluster: String) : Option[Menu] = { + private[this] def topicMenu(cluster: String, clusterFeatures: ClusterFeatures) : Option[Menu] = { val defaultItems = IndexedSeq("List".clusterRouteMenuItem(cluster)) val items = { - if(applicationFeatures.features(KMTopicManagerFeature)) + if(applicationFeatures.features(KMTopicManagerFeature) && clusterFeatures.features(KMRestrictedFeature)) defaultItems.+:("Create".clusterRouteMenuItem(cluster)) else defaultItems @@ -71,7 +71,7 @@ class Menus(implicit applicationFeatures: ApplicationFeatures) { IndexedSeq( clusterMenu(cluster), brokersMenu(cluster), - topicMenu(cluster), + topicMenu(cluster, clusterFeatures), preferredReplicaElectionMenu(cluster), reassignPartitionsMenu(cluster), consumersMenu(cluster), diff --git a/app/views/cluster/addCluster.scala.html b/app/views/cluster/addCluster.scala.html index c2bb11fe8..bd801f0c6 100644 --- a/app/views/cluster/addCluster.scala.html +++ b/app/views/cluster/addCluster.scala.html @@ -32,6 +32,7 @@ @b3.text(form("jmxUser"), '_label -> "JMX Auth Username") @b3.text(form("jmxPass"), '_label -> "JMX Auth Password") @b3.checkbox(form("jmxSsl"), '_text -> "JMX with SSL") + @b3.checkbox(form("restrictOperations"), '_text -> "Restrict the Operations(Non-Disruptive View only mode)") @b3.checkbox(form("logkafkaEnabled"), '_text -> "Enable Logkafka") @b3.checkbox(form("pollConsumers"), '_text -> "Poll consumer information (Not recommended for large # of consumers)") @b3.checkbox(form("filterConsumers"), '_text -> "Filter out inactive consumers") diff --git a/app/views/cluster/updateCluster.scala.html b/app/views/cluster/updateCluster.scala.html index fb4dc18d0..0ef9c73b8 100644 --- a/app/views/cluster/updateCluster.scala.html +++ b/app/views/cluster/updateCluster.scala.html @@ -35,6 +35,7 @@ @b3.text(form("jmxUser"), '_label -> "JMX Auth Username") @b3.text(form("jmxPass"), '_label -> "JMX Auth Password") @b3.checkbox(form("jmxSsl"), '_text -> "JMX with SSL") + @b3.checkbox(form("restrictOperations"), '_text -> "Restrict the Operations(Non-Disruptive View only mode)") @b3.checkbox(form("pollConsumers"), '_text -> "Poll consumer information (Not recommended for large # of consumers)") @b3.checkbox(form("filterConsumers"), '_text -> "Filter out inactive consumers") @b3.checkbox(form("logkafkaEnabled"), '_text -> "Enable Logkafka") diff --git a/app/views/topic/topicList.scala.html b/app/views/topic/topicList.scala.html index 75d04d2f0..1eb7719b9 100644 --- a/app/views/topic/topicList.scala.html +++ b/app/views/topic/topicList.scala.html @@ -32,6 +32,32 @@ } } +@renderOperations = { + + @if(errorOrTopics.fold(err=>false,tl=>tl.list.headOption.map(opt => opt._2.map(ti => ti.clusterContext.config.restrictOperations && ti.clusterContext.config.name.equals(cluster)).getOrElse(false)).getOrElse(false))){ + + } else { + + + @features.app(features.KMReassignPartitionsFeature) { + + + } + + + +
+ Generate Partition Assignments + + Run Partition Assignments + + Add Partitions +
+ } +} + @main( "Topic List", menu = theMenu, @@ -41,22 +67,7 @@ @features.app(features.KMTopicManagerFeature) {

Operations

- - - @features.app(features.KMReassignPartitionsFeature) { - - - } - - - -
- Generate Partition Assignments - - Run Partition Assignments - - Add Partitions -
+ @renderOperations
} diff --git a/app/views/topic/topicViewContent.scala.html b/app/views/topic/topicViewContent.scala.html index 3237924da..f0435194b 100644 --- a/app/views/topic/topicViewContent.scala.html +++ b/app/views/topic/topicViewContent.scala.html @@ -101,6 +101,64 @@ } } +@renderOperations = { + @if(topicIdentity.clusterContext.config.restrictOperations){ + + } else { + + + + @if(topicIdentity.clusterContext.clusterFeatures.features(kafka.manager.features.KMDeleteTopicFeature)) { + + } + @features.app(features.KMReassignPartitionsFeature) { + + + } + + + + + @features.app(features.KMReassignPartitionsFeature) { + + } + + +
+ Delete Topic + + @b3.form(routes.ReassignPartitions.handleOperation(cluster,topic)) { + @reassignPartitionOperation match { + case ForceRunAssignment => { +
+ + + @b3.submit('class -> "btn btn-primary btn-block"){ Force Reassign Partitions } +
+ } + case _ => { +
+ + + @b3.submit('class -> "btn btn-primary btn-block"){ Reassign Partitions } +
+ } + } + } +
+ Generate Partition Assignments +
+ Add Partitions + + Update Config + + Manual Partition Assignments +
+ } +} +
@@ -128,7 +186,7 @@ @topicIdentity.topicBrokers - Preferred Replicas % + Preferred Replicas % @topicIdentity.preferredReplicasPercentage @@ -184,55 +242,7 @@ @features.app(features.KMTopicManagerFeature) {

Operations

- - - - @if(topicIdentity.clusterContext.clusterFeatures.features(kafka.manager.features.KMDeleteTopicFeature)) { - - } - @features.app(features.KMReassignPartitionsFeature) { - - - } - - - - - @features.app(features.KMReassignPartitionsFeature) { - - } - - -
- Delete Topic - - @b3.form(routes.ReassignPartitions.handleOperation(cluster,topic)) { - @reassignPartitionOperation match { - case ForceRunAssignment => { -
- - - @b3.submit('class -> "btn btn-primary btn-block"){ Force Reassign Partitions } -
- } - case _ => { -
- - - @b3.submit('class -> "btn btn-primary btn-block"){ Reassign Partitions } -
- } - } - } -
- Generate Partition Assignments -
- Add Partitions - - Update Config - - Manual Partition Assignments -
+ @renderOperations
}
diff --git a/test/kafka/manager/TestClusterManagerActor.scala b/test/kafka/manager/TestClusterManagerActor.scala index 9d2279830..061113663 100644 --- a/test/kafka/manager/TestClusterManagerActor.scala +++ b/test/kafka/manager/TestClusterManagerActor.scala @@ -46,7 +46,7 @@ class TestClusterManagerActor extends CuratorAwareTest with BaseTest { override protected def beforeAll(): Unit = { super.beforeAll() - val clusterConfig = ClusterConfig("dev","0.8.2.0",kafkaServerZkPath, jmxEnabled = false, pollConsumers = true, filterConsumers = true, logkafkaEnabled = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = Option(defaultTuning)) + val clusterConfig = ClusterConfig("dev","0.8.2.0",kafkaServerZkPath, jmxEnabled = false, restrictOperations=false, pollConsumers = true, filterConsumers = true, logkafkaEnabled = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = Option(defaultTuning)) val curatorConfig = CuratorConfig(testServer.getConnectString) val config = ClusterManagerActorConfig( "pinned-dispatcher" diff --git a/test/kafka/manager/TestKafkaManager.scala b/test/kafka/manager/TestKafkaManager.scala index 5f031108d..bf5c3984f 100644 --- a/test/kafka/manager/TestKafkaManager.scala +++ b/test/kafka/manager/TestKafkaManager.scala @@ -118,7 +118,7 @@ class TestKafkaManager extends CuratorAwareTest with BaseTest { } test("add cluster") { - val future = kafkaManager.addCluster("dev","0.8.2.0",kafkaServerZkPath, jmxEnabled = false, pollConsumers = true, filterConsumers = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = Option(kafkaManager.defaultTuning)) + val future = kafkaManager.addCluster("dev","0.8.2.0",kafkaServerZkPath, jmxEnabled = false, restrictOperations=false, pollConsumers = true, filterConsumers = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = Option(kafkaManager.defaultTuning)) val result = Await.result(future,duration) assert(result.isRight === true) Thread.sleep(2000) @@ -368,7 +368,7 @@ class TestKafkaManager extends CuratorAwareTest with BaseTest { } test("update cluster zkhost") { - val future = kafkaManager.updateCluster("dev","0.8.2.0",testServer.getConnectString, jmxEnabled = false, pollConsumers = true, filterConsumers = true, jmxUser = None, jmxSsl = false, jmxPass = None, tuning = Option(defaultTuning)) + val future = kafkaManager.updateCluster("dev","0.8.2.0",testServer.getConnectString, jmxEnabled = false, restrictOperations = false, pollConsumers = true, filterConsumers = true, jmxUser = None, jmxSsl = false, jmxPass = None, tuning = Option(defaultTuning)) val result = Await.result(future,duration) assert(result.isRight === true) @@ -403,7 +403,7 @@ class TestKafkaManager extends CuratorAwareTest with BaseTest { } test("update cluster version") { - val future = kafkaManager.updateCluster("dev","0.8.1.1",testServer.getConnectString, jmxEnabled = false, pollConsumers = true, filterConsumers = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = Option(defaultTuning)) + val future = kafkaManager.updateCluster("dev","0.8.1.1",testServer.getConnectString, jmxEnabled = false, restrictOperations = false, pollConsumers = true, filterConsumers = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = Option(defaultTuning)) val result = Await.result(future,duration) assert(result.isRight === true) Thread.sleep(2000) @@ -425,7 +425,7 @@ class TestKafkaManager extends CuratorAwareTest with BaseTest { } test("update cluster logkafka enabled and activeOffsetCache enabled") { - val future = kafkaManager.updateCluster("dev","0.8.2.0",testServer.getConnectString, jmxEnabled = false, pollConsumers = true, filterConsumers = true, logkafkaEnabled = true, activeOffsetCacheEnabled = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = Option(defaultTuning)) + val future = kafkaManager.updateCluster("dev","0.8.2.0",testServer.getConnectString, jmxEnabled = false, restrictOperations=false, pollConsumers = true, filterConsumers = true, logkafkaEnabled = true, activeOffsetCacheEnabled = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = Option(defaultTuning)) val result = Await.result(future,duration) assert(result.isRight === true) diff --git a/test/kafka/manager/utils/TestClusterConfig.scala b/test/kafka/manager/utils/TestClusterConfig.scala index 84a6d4174..3e3178c50 100644 --- a/test/kafka/manager/utils/TestClusterConfig.scala +++ b/test/kafka/manager/utils/TestClusterConfig.scala @@ -14,18 +14,18 @@ class TestClusterConfig extends FunSuite with Matchers { test("invalid name") { intercept[IllegalArgumentException] { - ClusterConfig("qa!","0.8.1.1","localhost",jmxEnabled = false, pollConsumers = true, filterConsumers = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = None) + ClusterConfig("qa!","0.8.1.1","localhost",jmxEnabled = false, restrictOperations=false, pollConsumers = true, filterConsumers = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = None) } } test("invalid kafka version") { intercept[IllegalArgumentException] { - ClusterConfig("qa","0.8.1","localhost:2181",jmxEnabled = false, pollConsumers = true, filterConsumers = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = None) + ClusterConfig("qa","0.8.1","localhost:2181",jmxEnabled = false, restrictOperations=false, pollConsumers = true, filterConsumers = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = None) } } test("serialize and deserialize 0.8.1.1") { - val cc = ClusterConfig("qa","0.8.2.0","localhost:2181", jmxEnabled = true, pollConsumers = true, filterConsumers = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = None) + val cc = ClusterConfig("qa","0.8.2.0","localhost:2181", jmxEnabled = true, restrictOperations=false, pollConsumers = true, filterConsumers = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = None) val serialize: String = ClusterConfig.serialize(cc) val deserialize = ClusterConfig.deserialize(serialize) assert(deserialize.isSuccess === true) @@ -33,7 +33,7 @@ class TestClusterConfig extends FunSuite with Matchers { } test("serialize and deserialize 0.8.2.0 +jmx credentials") { - val cc = ClusterConfig("qa","0.8.2.0","localhost:2181", jmxEnabled = true, jmxUser = Some("mario"), jmxPass = Some("rossi"), jmxSsl = false, pollConsumers = true, filterConsumers = true, tuning = None) + val cc = ClusterConfig("qa","0.8.2.0","localhost:2181", jmxEnabled = true, jmxUser = Some("mario"), jmxPass = Some("rossi"), jmxSsl = false, restrictOperations=false, pollConsumers = true, filterConsumers = true, tuning = None) val serialize: String = ClusterConfig.serialize(cc) val deserialize = ClusterConfig.deserialize(serialize) assert(deserialize.isSuccess === true) @@ -41,7 +41,7 @@ class TestClusterConfig extends FunSuite with Matchers { } test("serialize and deserialize 0.8.2.0") { - val cc = ClusterConfig("qa","0.8.2.0","localhost:2181", jmxEnabled = true, pollConsumers = true, filterConsumers = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = None) + val cc = ClusterConfig("qa","0.8.2.0","localhost:2181", jmxEnabled = true, restrictOperations=false, pollConsumers = true, filterConsumers = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = None) val serialize: String = ClusterConfig.serialize(cc) val deserialize = ClusterConfig.deserialize(serialize) assert(deserialize.isSuccess === true) @@ -49,7 +49,7 @@ class TestClusterConfig extends FunSuite with Matchers { } test("serialize and deserialize 0.8.2.1") { - val cc = ClusterConfig("qa","0.8.2.1","localhost:2181", jmxEnabled = true, pollConsumers = true, filterConsumers = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = None) + val cc = ClusterConfig("qa","0.8.2.1","localhost:2181", jmxEnabled = true, restrictOperations=false, pollConsumers = true, filterConsumers = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = None) val serialize: String = ClusterConfig.serialize(cc) val deserialize = ClusterConfig.deserialize(serialize) assert(deserialize.isSuccess === true) @@ -57,7 +57,7 @@ class TestClusterConfig extends FunSuite with Matchers { } test("serialize and deserialize 0.8.2.2") { - val cc = ClusterConfig("qa","0.8.2.2","localhost:2181", jmxEnabled = true, pollConsumers = true, filterConsumers = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = None) + val cc = ClusterConfig("qa","0.8.2.2","localhost:2181", jmxEnabled = true, restrictOperations=false, pollConsumers = true, filterConsumers = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = None) val serialize: String = ClusterConfig.serialize(cc) val deserialize = ClusterConfig.deserialize(serialize) assert(deserialize.isSuccess === true) @@ -65,7 +65,7 @@ class TestClusterConfig extends FunSuite with Matchers { } test("deserialize without version and jmxEnabled") { - val cc = ClusterConfig("qa","0.8.2.0","localhost:2181", jmxEnabled = false, pollConsumers = true, filterConsumers = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = None) + val cc = ClusterConfig("qa","0.8.2.0","localhost:2181", jmxEnabled = false, restrictOperations=false, pollConsumers = true, filterConsumers = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = None) val serialize: String = ClusterConfig.serialize(cc) val noverison = serialize.replace(""","kafkaVersion":"0.8.2.0"""","").replace(""","jmxEnabled":false""","").replace(""","jmxSsl":false""","") assert(!noverison.contains("kafkaVersion")) @@ -77,7 +77,7 @@ class TestClusterConfig extends FunSuite with Matchers { } test("deserialize from 0.8.2-beta as 0.8.2.0") { - val cc = ClusterConfig("qa","0.8.2-beta","localhost:2181", jmxEnabled = false, pollConsumers = true, filterConsumers = true, activeOffsetCacheEnabled = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = None) + val cc = ClusterConfig("qa","0.8.2-beta","localhost:2181", jmxEnabled = false, restrictOperations=false, pollConsumers = true, filterConsumers = true, activeOffsetCacheEnabled = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = None) val serialize: String = ClusterConfig.serialize(cc) val noverison = serialize.replace(""","kafkaVersion":"0.8.2.0"""",""","kafkaVersion":"0.8.2-beta"""") val deserialize = ClusterConfig.deserialize(noverison) @@ -86,7 +86,7 @@ class TestClusterConfig extends FunSuite with Matchers { } test("deserialize from 0.9.0.1") { - val cc = ClusterConfig("qa","0.9.0.1","localhost:2181", jmxEnabled = false, pollConsumers = true, filterConsumers = true, activeOffsetCacheEnabled = true, jmxUser = None, jmxPass = None, jmxSsl = false, + val cc = ClusterConfig("qa","0.9.0.1","localhost:2181", jmxEnabled = false, restrictOperations=false, pollConsumers = true, filterConsumers = true, activeOffsetCacheEnabled = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = Option(ClusterTuning(Option(1),Option(2),Option(3), Option(4), Option(5), Option(6), Option(7), Option(8), Option(9), Option(10), Option(11), Option(12), Option(13), Option(14), Option(15))) ) val serialize: String = ClusterConfig.serialize(cc) From f77f59c5c334e215da9d2d358c591754c60061fc Mon Sep 17 00:00:00 2001 From: Praveen K Palaniswamy Date: Thu, 23 Mar 2017 18:37:10 -0500 Subject: [PATCH 5/8] Kafka_0_10_2_0 as option --- app/kafka/manager/model/model.scala | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/app/kafka/manager/model/model.scala b/app/kafka/manager/model/model.scala index 2fd8a701f..19f97165b 100644 --- a/app/kafka/manager/model/model.scala +++ b/app/kafka/manager/model/model.scala @@ -48,6 +48,10 @@ case object Kafka_0_10_1_0 extends KafkaVersion { override def toString = "0.10.1.0" } +case object Kafka_0_10_2_0 extends KafkaVersion { + override def toString = "0.10.2.0" +} + object KafkaVersion { val supportedVersions: Map[String,KafkaVersion] = Map( "0.8.1.1" -> Kafka_0_8_1_1, @@ -59,7 +63,8 @@ object KafkaVersion { "0.9.0.1" -> Kafka_0_9_0_1, "0.10.0.0" -> Kafka_0_10_0_0, "0.10.0.1" -> Kafka_0_10_0_1, - "0.10.1.0" -> Kafka_0_10_1_0 + "0.10.1.0" -> Kafka_0_10_1_0, + "0.10.2.0" -> Kafka_0_10_2_0 ) val formSelectList : IndexedSeq[(String,String)] = supportedVersions.toIndexedSeq.filterNot(_._1.contains("beta")).map(t => (t._1,t._2.toString)) From 69aa2b8d54b5cce8b010efb9778cbe2fd6f75642 Mon Sep 17 00:00:00 2001 From: Praveen K Palaniswamy Date: Thu, 23 Mar 2017 18:37:40 -0500 Subject: [PATCH 6/8] Version in sorted order --- app/kafka/manager/model/model.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/app/kafka/manager/model/model.scala b/app/kafka/manager/model/model.scala index 19f97165b..7a269f26a 100644 --- a/app/kafka/manager/model/model.scala +++ b/app/kafka/manager/model/model.scala @@ -67,7 +67,7 @@ object KafkaVersion { "0.10.2.0" -> Kafka_0_10_2_0 ) - val formSelectList : IndexedSeq[(String,String)] = supportedVersions.toIndexedSeq.filterNot(_._1.contains("beta")).map(t => (t._1,t._2.toString)) + val formSelectList : IndexedSeq[(String,String)] = supportedVersions.toIndexedSeq.filterNot(_._1.contains("beta")).map(t => (t._1,t._2.toString)).sortBy(_._1) def apply(s: String) : KafkaVersion = { supportedVersions.get(s) match { From 55cc242e7acefeb5ce5610e232eb2e5a4be99477 Mon Sep 17 00:00:00 2001 From: Praveen K Palaniswamy Date: Thu, 23 Mar 2017 18:58:24 -0500 Subject: [PATCH 7/8] lock cluster in read only mode --- app/models/navigation/Menus.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/app/models/navigation/Menus.scala b/app/models/navigation/Menus.scala index 916afeba0..c5b5fac57 100644 --- a/app/models/navigation/Menus.scala +++ b/app/models/navigation/Menus.scala @@ -31,7 +31,7 @@ class Menus(implicit applicationFeatures: ApplicationFeatures) { val defaultItems = IndexedSeq("List".clusterRouteMenuItem(cluster)) val items = { - if(applicationFeatures.features(KMTopicManagerFeature) && clusterFeatures.features(KMRestrictedFeature)) + if(applicationFeatures.features(KMTopicManagerFeature) && !clusterFeatures.features(KMRestrictedFeature)) defaultItems.+:("Create".clusterRouteMenuItem(cluster)) else defaultItems From 1da7f5bc02376cd915ac456e2259d3de0e05e084 Mon Sep 17 00:00:00 2001 From: Praveen K Palaniswamy Date: Thu, 23 Mar 2017 23:40:51 -0500 Subject: [PATCH 8/8] Kafka_0_10_2_0 as option --- app/kafka/manager/actor/cluster/KafkaStateActor.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/app/kafka/manager/actor/cluster/KafkaStateActor.scala b/app/kafka/manager/actor/cluster/KafkaStateActor.scala index 812100c5d..a543f7007 100644 --- a/app/kafka/manager/actor/cluster/KafkaStateActor.scala +++ b/app/kafka/manager/actor/cluster/KafkaStateActor.scala @@ -143,7 +143,7 @@ class KafkaAdminClient(context: => ActorContext, adminClientActorPath: ActorPath object KafkaManagedOffsetCache { - val supportedVersions: Set[KafkaVersion] = Set(Kafka_0_8_2_0, Kafka_0_8_2_1, Kafka_0_8_2_2, Kafka_0_9_0_0, Kafka_0_9_0_1, Kafka_0_10_0_0, Kafka_0_10_0_1, Kafka_0_10_1_0) + val supportedVersions: Set[KafkaVersion] = Set(Kafka_0_8_2_0, Kafka_0_8_2_1, Kafka_0_8_2_2, Kafka_0_9_0_0, Kafka_0_9_0_1, Kafka_0_10_0_0, Kafka_0_10_0_1, Kafka_0_10_1_0, Kafka_0_10_2_0) val ConsumerOffsetTopic = "__consumer_offsets" def isSupported(version: KafkaVersion) : Boolean = {