From 5281295a5d97af11a2c010b2abfcaaf7b847909a Mon Sep 17 00:00:00 2001 From: Viswa Vutharkar Date: Thu, 12 Mar 2015 10:33:31 -0700 Subject: [PATCH 1/3] Removed add clusters and modify cluster --- .gitignore | 10 ++++++ app/GlobalKafkaManager.scala | 41 ++++++++++++++++++++++-- app/kafka/manager/KafkaManager.scala | 2 ++ app/models/navigation/BreadCrumbs.scala | 3 +- app/models/navigation/Menus.scala | 6 ++-- app/models/navigation/QuickRoutes.scala | 3 +- app/views/cluster/clusterList.scala.html | 8 ----- build.sbt | 3 ++ conf/application.conf | 6 ++++ 9 files changed, 64 insertions(+), 18 deletions(-) diff --git a/.gitignore b/.gitignore index 99e98aeb6..7bdf26605 100644 --- a/.gitignore +++ b/.gitignore @@ -12,3 +12,13 @@ dist activator* RUNNING_PID .DS_Store +/bin +.cache +.classpath +.project +.sbtserver/ +.tmpBin +project/.sbtserver +project/.sbtserver.lock +project/play-fork-run.sbt +project/sbt-ui.sbt diff --git a/app/GlobalKafkaManager.scala b/app/GlobalKafkaManager.scala index 90aec67b5..455a11a6c 100644 --- a/app/GlobalKafkaManager.scala +++ b/app/GlobalKafkaManager.scala @@ -3,20 +3,25 @@ * See accompanying LICENSE file. */ +import collection.JavaConversions._ import controllers.KafkaManagerContext import kafka.manager.KafkaManager import play.api._ +import com.typesafe.config.ConfigException +import models.navigation.BreadCrumbs /** * @author hiral */ -object GlobalKafkaManager extends GlobalSettings { +object Global extends GlobalSettings { private[this] var kafkaManager: KafkaManager = null + val ClusterZks = "managed-kafka-clusters" override def beforeStart(app: Application): Unit = { Logger.info("Init kafka manager...") - KafkaManagerContext.getKafkaManger + kafkaManager = KafkaManagerContext.getKafkaManger + initialize_with_clusters_from_conf() Thread.sleep(5000) } @@ -24,6 +29,38 @@ object GlobalKafkaManager extends GlobalSettings { KafkaManagerContext.shutdown() Logger.info("Application shutdown...") } + + + def initialize_with_clusters_from_conf() { + Logger.info("initialize_with_clusters_from_conf(): Going to read kafka list from config and initialize") + + val configWithDefaults = kafkaManager.getConfigWithDefaults + + var member_zkhosts_list = List[String]() + try { + member_zkhosts_list = configWithDefaults.getStringList(ClusterZks).toList + } catch { + case cfge: ConfigException => Logger.error("managed-kafka-clusters param not defined in the conf or is in wrong format") + } + + + member_zkhosts_list.foreach{ item => + + // Each item in the list from config will be a string of format + // "clustername, version, zk1:port1, zk2:port2...." + // + val mylist = item.split(",") + val name = mylist(0) + val version = mylist(1) + val zkhosts = mylist.takeRight(mylist.size - 2).mkString(",") + + + kafkaManager.addCluster(name, version, zkhosts) + + } + } + + } diff --git a/app/kafka/manager/KafkaManager.scala b/app/kafka/manager/KafkaManager.scala index 78d429caf..432a00d0a 100644 --- a/app/kafka/manager/KafkaManager.scala +++ b/app/kafka/manager/KafkaManager.scala @@ -146,6 +146,8 @@ class KafkaManager(akkaConfig: Config) { } } + def getConfigWithDefaults = configWithDefaults + def addCluster(clusterName: String, version: String, zkHosts: String) : Future[ApiError \/ Unit] = { val cc = ClusterConfig(clusterName, version, zkHosts) tryWithKafkaManagerActor(KMAddCluster(cc)) { result: KMCommandResult => diff --git a/app/models/navigation/BreadCrumbs.scala b/app/models/navigation/BreadCrumbs.scala index fa3becb7c..28edc38b2 100644 --- a/app/models/navigation/BreadCrumbs.scala +++ b/app/models/navigation/BreadCrumbs.scala @@ -27,8 +27,7 @@ object BreadCrumbs { import models.navigation.QuickRoutes._ val baseBreadCrumbs: Map[String, IndexedSeq[BreadCrumb]] = Map( - "Clusters" -> IndexedSeq.empty[BreadCrumb], - "Add Cluster" -> IndexedSeq("Clusters".baseRouteBreadCrumb) + "Clusters" -> IndexedSeq.empty[BreadCrumb] ) val clusterBreadCrumbs: Map[String, IndexedSeq[BreadCrumb]] = Map( diff --git a/app/models/navigation/Menus.scala b/app/models/navigation/Menus.scala index 09657e692..77b6e1f37 100644 --- a/app/models/navigation/Menus.scala +++ b/app/models/navigation/Menus.scala @@ -13,8 +13,7 @@ object Menus { def clusterMenus(cluster: String) : IndexedSeq[Menu] = IndexedSeq( Menu("Cluster",IndexedSeq( "Summary".clusterRouteMenuItem(cluster), - "List".baseRouteMenuItem, - "Add Cluster".baseRouteMenuItem), + "List".baseRouteMenuItem), None), "Brokers".clusterMenu(cluster), Menu("Topic",IndexedSeq( @@ -27,8 +26,7 @@ object Menus { def indexMenu : IndexedSeq[Menu] = IndexedSeq( Menu("Cluster",IndexedSeq( - "List".baseRouteMenuItem, - "Add Cluster".baseRouteMenuItem), + "List".baseRouteMenuItem), None) ) } diff --git a/app/models/navigation/QuickRoutes.scala b/app/models/navigation/QuickRoutes.scala index 66a3e5e7d..040ce59c7 100644 --- a/app/models/navigation/QuickRoutes.scala +++ b/app/models/navigation/QuickRoutes.scala @@ -15,8 +15,7 @@ object QuickRoutes { val baseRoutes : Map[String, Call] = Map( "Clusters" -> controllers.routes.Application.index(), - "List" -> controllers.routes.Application.index(), - "Add Cluster" -> controllers.routes.Cluster.addCluster() + "List" -> controllers.routes.Application.index() ) val clusterRoutes : Map[String, String => Call] = Map( "Update Cluster" -> controllers.routes.Cluster.updateCluster, diff --git a/app/views/cluster/clusterList.scala.html b/app/views/cluster/clusterList.scala.html index 16bd750d1..6dcc2df3d 100644 --- a/app/views/cluster/clusterList.scala.html +++ b/app/views/cluster/clusterList.scala.html @@ -23,7 +23,6 @@
@if(cluster.enabled) { - Modify @b3.form(routes.Cluster.handleUpdateCluster(cluster.name)) { @@ -39,13 +38,6 @@ @b3.submit('class -> "btn btn-success ops-button"){ Enable } } - @b3.form(routes.Cluster.handleUpdateCluster(cluster.name)) { - - - - - @b3.submit('class -> "btn btn-danger ops-button"){ Delete } - } }
diff --git a/build.sbt b/build.sbt index 3a5b39725..7c059e185 100644 --- a/build.sbt +++ b/build.sbt @@ -82,3 +82,6 @@ rpmLicense := Some("Apache") + + +fork in run := true diff --git a/conf/application.conf b/conf/application.conf index 8ed2809e4..461293fca 100644 --- a/conf/application.conf +++ b/conf/application.conf @@ -65,3 +65,9 @@ kafka-manager.zkhosts=${?ZK_HOSTS} pinned-dispatcher.type="PinnedDispatcher" pinned-dispatcher.executor="thread-pool-executor" +# These will be added into Kafka Manager at startup +# Value of this param is a list of strings representing info about kafka clusters to be managed +# ["clusterNmae, Version, zk1:port1, zk2:port2,...", ...] +# Supported versions currently are "0.8.1.1" and "0.8.2-beta" only +# (line 38: kafka/manager/KafkaManagerActor.scala) +managed-kafka-clusters=["kmgr1,0.8.2-beta,zk1:2181,zk2:2181"] From 1982bee255cc9a9a67348fb21c089203b0edf529 Mon Sep 17 00:00:00 2001 From: Viswa Vutharkar Date: Wed, 27 May 2015 13:27:11 -0700 Subject: [PATCH 2/3] do not convert cluster name and zookeeper hosts to lowercase --- app/kafka/manager/KafkaManagerActor.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/app/kafka/manager/KafkaManagerActor.scala b/app/kafka/manager/KafkaManagerActor.scala index 51e50a594..5e98df030 100644 --- a/app/kafka/manager/KafkaManagerActor.scala +++ b/app/kafka/manager/KafkaManagerActor.scala @@ -82,8 +82,8 @@ object ClusterConfig { validateName(name) //validate zk hosts validateZkHosts(zkHosts) - val cleanZkHosts = zkHosts.replaceAll(" ","").toLowerCase - new ClusterConfig(name.toLowerCase, CuratorConfig(cleanZkHosts, zkMaxRetry), true, kafkaVersion) + val cleanZkHosts = zkHosts.replaceAll(" ", "") + new ClusterConfig(name, CuratorConfig(cleanZkHosts, zkMaxRetry), true, kafkaVersion) } def customUnapply(cc: ClusterConfig) : Option[(String, String, String, Int)] = { From 3c3c3496be9c6baf7276578bf521416af0d966b0 Mon Sep 17 00:00:00 2001 From: Viswa Vutharkar Date: Mon, 17 Aug 2015 13:06:11 -0700 Subject: [PATCH 3/3] Adding 0.8.2.1 in supported versions and also bumping .deb version to 1.2 --- app/kafka/manager/KafkaManagerActor.scala | 3 ++- build.sbt | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/app/kafka/manager/KafkaManagerActor.scala b/app/kafka/manager/KafkaManagerActor.scala index 5e98df030..b7f96aa5b 100644 --- a/app/kafka/manager/KafkaManagerActor.scala +++ b/app/kafka/manager/KafkaManagerActor.scala @@ -38,7 +38,8 @@ object KafkaVersion { val supportedVersions: Map[String,KafkaVersion] = Map( "0.8.1.1" -> Kafka_0_8_1_1, "0.8.2-beta" -> Kafka_0_8_2_0, - "0.8.2.0" -> Kafka_0_8_2_0) + "0.8.2.0" -> Kafka_0_8_2_0, + "0.8.2.1" -> Kafka_0_8_2_0) val formSelectList : IndexedSeq[(String,String)] = supportedVersions.toIndexedSeq.filterNot(_._1.contains("beta")).map(t => (t._1,t._2.toString)) diff --git a/build.sbt b/build.sbt index 7c059e185..311cba3f2 100644 --- a/build.sbt +++ b/build.sbt @@ -5,7 +5,7 @@ name := """kafka-manager""" /* For packaging purposes, -SNAPSHOT MUST contain a digit */ -version := "1.1" +version := "1.2" scalaVersion := "2.11.5"