From 4b42c557f6e652a27d7954e3c02437f91adc54aa Mon Sep 17 00:00:00 2001 From: zheolong Date: Tue, 13 Oct 2015 12:06:17 +0800 Subject: [PATCH 1/4] logkafka: check if hostname is localhost --- app/kafka/manager/utils/Logkafka.scala | 3 +++ 1 file changed, 3 insertions(+) diff --git a/app/kafka/manager/utils/Logkafka.scala b/app/kafka/manager/utils/Logkafka.scala index 63e43c4e0..79279b0aa 100644 --- a/app/kafka/manager/utils/Logkafka.scala +++ b/app/kafka/manager/utils/Logkafka.scala @@ -46,6 +46,7 @@ object Logkafka { case None => checkCondition(false, IllegalCharacterInName(hostname)) } + checkCondition(!hostname.matches("^localhost$"), HostnameIsLocalhost) checkCondition(hostname.matches(validHostnameRegex), InvalidHostname) } @@ -86,6 +87,7 @@ object Logkafka { object LogkafkaErrors { class HostnameEmpty private[LogkafkaErrors] extends UtilError("hostname is illegal, can't be empty") + class HostnameIsLocalhost private[LogkafkaErrors] extends UtilError("hostname is illegal, can't be localhost") class LogPathEmpty private[LogkafkaErrors] extends UtilError("log path is illegal, can't be empty") class LogPathNotAbsolute private[LogkafkaErrors] extends UtilError("log path is illegal, must be absolute") class InvalidHostname private[LogkafkaErrors] extends UtilError(s"hostname is illegal, does not match regex ${Logkafka.validHostnameRegex}") @@ -100,6 +102,7 @@ object LogkafkaErrors { class HostnameNotExists private[LogkafkaErrors] (hostname: String) extends UtilError(s"Hostname not exists : $hostname") val HostnameEmpty = new HostnameEmpty + val HostnameIsLocalhost = new HostnameIsLocalhost val LogPathEmpty = new LogPathEmpty val LogPathNotAbsolute = new LogPathNotAbsolute val InvalidHostname = new InvalidHostname From 8ce8207fb4df6fdc5cda0b6d92fe440f8891e20c Mon Sep 17 00:00:00 2001 From: zheolong Date: Wed, 21 Oct 2015 19:26:55 +0800 Subject: [PATCH 2/4] logkafka: update log collecting state --- .../logkafka/logkafkaListContent.scala.html | 24 ++++++++++++------- 1 file changed, 15 insertions(+), 9 deletions(-) diff --git a/app/views/logkafka/logkafkaListContent.scala.html b/app/views/logkafka/logkafkaListContent.scala.html index 3ae690ad0..580b7a835 100644 --- a/app/views/logkafka/logkafkaListContent.scala.html +++ b/app/views/logkafka/logkafkaListContent.scala.html @@ -5,14 +5,14 @@ @(cluster: String, logkafkas: IndexedSeq[((String, Option[kafka.manager.ActorModel.LogkafkaIdentity]),Boolean)]) @getFilesizeStatus(filesize: Int) = { -@filesize match { - case i if i < 0 => {warning} - case i => {} -} + @filesize match { + case i if i < 0 => {warning} + case i => {} + } } -@getLogkafkaStatus(active: Boolean) = { - @if(active) {} else {danger} +@getLogkafkaStatus(flag: Boolean, s1: String, s2: String) = { + @if(flag) {@s1} else {@s2} } @@ -24,7 +24,9 @@ @logkafkaIdentity.map{ li => @for( (log_path, im) <- li.identityMap) { - + @im._1.map { c => @@ -33,8 +35,12 @@ @im._2.map { c => @c.get("realpath").map { d => - }.getOrElse{} - }.getOrElse{} + }.getOrElse{} + }.getOrElse{ + + } @im._2.map { c => @c.get("filepos").map { d => From 61b26d28f5baa4ad633789314cb04daeeb663ee4 Mon Sep 17 00:00:00 2001 From: zheolong Date: Fri, 23 Oct 2015 16:47:42 +0800 Subject: [PATCH 3/4] logkafka: little fix about config deleting --- app/kafka/manager/LogkafkaCommandActor.scala | 16 +++++----------- 1 file changed, 5 insertions(+), 11 deletions(-) diff --git a/app/kafka/manager/LogkafkaCommandActor.scala b/app/kafka/manager/LogkafkaCommandActor.scala index 6dcf367e5..6a8e17ae1 100644 --- a/app/kafka/manager/LogkafkaCommandActor.scala +++ b/app/kafka/manager/LogkafkaCommandActor.scala @@ -66,18 +66,12 @@ class LogkafkaCommandActor(logkafkaCommandActorConfig: LogkafkaCommandActorConfi implicit val ec = longRunningExecutionContext request match { case LKCDeleteLogkafka(hostname, log_path, logkafkaConfig) => - if(logkafkaCommandActorConfig.clusterContext.clusterFeatures.features(KMDeleteTopicFeature)) { - longRunning { - Future { - LKCCommandResult(Try { - logkafkaAdminUtils.deleteLogkafka(logkafkaCommandActorConfig.curator, hostname, log_path, logkafkaConfig) - }) - } + longRunning { + Future { + LKCCommandResult(Try { + logkafkaAdminUtils.deleteLogkafka(logkafkaCommandActorConfig.curator, hostname, log_path, logkafkaConfig) + }) } - } else { - val result : LKCCommandResult = LKCCommandResult(Failure(new UnsupportedOperationException( - s"Delete logkafka not supported for kafka version ${logkafkaCommandActorConfig.clusterContext.config.version}"))) - sender ! result } case LKCCreateLogkafka(hostname, log_path, config, logkafkaConfig) => longRunning { From 4ab38d78fe54497fb08c5c2110d9b46149446254 Mon Sep 17 00:00:00 2001 From: zheolong Date: Sun, 25 Oct 2015 12:34:33 +0800 Subject: [PATCH 4/4] logkafka: add config operations column --- app/controllers/Logkafka.scala | 36 +++++++++++++++++++ app/kafka/manager/ActorModel.scala | 8 +++-- app/kafka/manager/ClusterManagerActor.scala | 4 +-- app/kafka/manager/KafkaManager.scala | 5 +-- app/kafka/manager/LogkafkaCommandActor.scala | 4 +-- .../manager/utils/LogkafkaAdminUtils.scala | 13 ++++--- .../logkafka/logkafkaListContent.scala.html | 29 ++++++++++++++- conf/routes | 2 ++ 8 files changed, 88 insertions(+), 13 deletions(-) diff --git a/app/controllers/Logkafka.scala b/app/controllers/Logkafka.scala index 34c47f075..52cd6b163 100644 --- a/app/controllers/Logkafka.scala +++ b/app/controllers/Logkafka.scala @@ -233,4 +233,40 @@ object Logkafka extends Controller{ ) } } + + def handleEnableConfig(clusterName: String, hostname: String, log_path: String) = Action.async { implicit request => + clusterFeatureGate(clusterName, KMLogKafkaFeature) { clusterContext => + implicit val clusterFeatures = clusterContext.clusterFeatures + val props = new Properties(); + props.put("valid", true.toString); + kafkaManager.updateLogkafkaConfig(clusterName, hostname, log_path, props, false).map { errorOrSuccess => + Ok(views.html.common.resultOfCommand( + views.html.navigation.clusterMenu(clusterName, "Logkafka", "Logkafka View", Menus.clusterMenus(clusterName)), + models.navigation.BreadCrumbs.withNamedViewAndClusterAndLogkafka("Logkafka View", clusterName, hostname, log_path, "Update Config"), + errorOrSuccess, + "Enable Config", + FollowLink("Go to logkafka view.", routes.Logkafka.logkafka(clusterName, hostname, log_path).toString()), + FollowLink("Try again.", routes.Logkafka.updateConfig(clusterName, hostname, log_path).toString()) + )) + } + } + } + + def handleDisableConfig(clusterName: String, hostname: String, log_path: String) = Action.async { implicit request => + clusterFeatureGate(clusterName, KMLogKafkaFeature) { clusterContext => + implicit val clusterFeatures = clusterContext.clusterFeatures + val props = new Properties(); + props.put("valid", false.toString); + kafkaManager.updateLogkafkaConfig(clusterName, hostname, log_path, props, false).map { errorOrSuccess => + Ok(views.html.common.resultOfCommand( + views.html.navigation.clusterMenu(clusterName, "Logkafka", "Logkafka View", Menus.clusterMenus(clusterName)), + models.navigation.BreadCrumbs.withNamedViewAndClusterAndLogkafka("Logkafka View", clusterName, hostname, log_path, "Update Config"), + errorOrSuccess, + "Disable Config", + FollowLink("Go to logkafka view.", routes.Logkafka.logkafka(clusterName, hostname, log_path).toString()), + FollowLink("Try again.", routes.Logkafka.updateConfig(clusterName, hostname, log_path).toString()) + )) + } + } + } } diff --git a/app/kafka/manager/ActorModel.scala b/app/kafka/manager/ActorModel.scala index 273feb27c..bf155674f 100644 --- a/app/kafka/manager/ActorModel.scala +++ b/app/kafka/manager/ActorModel.scala @@ -92,7 +92,9 @@ object ActorModel { ) extends CommandRequest case class CMUpdateLogkafkaConfig(hostname: String, log_path: String, - config: Properties) extends CommandRequest + config: Properties, + checkConfig: Boolean = true + ) extends CommandRequest case class CMDeleteLogkafka(hostname: String, log_path: String) extends CommandRequest //########## @@ -584,7 +586,9 @@ object ActorModel { case class LKCUpdateLogkafkaConfig(hostname: String, log_path: String, config: Properties, - logkafkaConfig: Option[LogkafkaConfig]) extends CommandRequest + logkafkaConfig: Option[LogkafkaConfig], + checkConfig: Boolean = true + ) extends CommandRequest case class LKCCommandResult(result: Try[Unit]) extends CommandResponse diff --git a/app/kafka/manager/ClusterManagerActor.scala b/app/kafka/manager/ClusterManagerActor.scala index f2b2950be..678b1751c 100644 --- a/app/kafka/manager/ClusterManagerActor.scala +++ b/app/kafka/manager/ClusterManagerActor.scala @@ -523,11 +523,11 @@ class ClusterManagerActor(cmConfig: ClusterManagerActorConfig) } } pipeTo sender() - case CMUpdateLogkafkaConfig(hostname, log_path, config) => + case CMUpdateLogkafkaConfig(hostname, log_path, config, checkConfig) => implicit val ec = longRunningExecutionContext val eventualLogkafkaConfig = withLogkafkaStateActor(LKSGetLogkafkaConfig(hostname))(identity[Option[LogkafkaConfig]]) eventualLogkafkaConfig.map { logkafkaConfigOption => - withLogkafkaCommandActor(LKCUpdateLogkafkaConfig(hostname, log_path, config, logkafkaConfigOption)) { + withLogkafkaCommandActor(LKCUpdateLogkafkaConfig(hostname, log_path, config, logkafkaConfigOption, checkConfig)) { lkcResponse: LKCCommandResult => CMCommandResult(lkcResponse.result) } diff --git a/app/kafka/manager/KafkaManager.scala b/app/kafka/manager/KafkaManager.scala index 3b11da7ee..33712d69d 100644 --- a/app/kafka/manager/KafkaManager.scala +++ b/app/kafka/manager/KafkaManager.scala @@ -440,14 +440,15 @@ class KafkaManager(akkaConfig: Config) clusterName: String, hostname: String, log_path: String, - config: Properties + config: Properties, + checkConfig: Boolean = true ): Future[ApiError \/ ClusterContext] = { implicit val ec = apiExecutionContext withKafkaManagerActor( KMClusterCommandRequest( clusterName, - CMUpdateLogkafkaConfig(hostname, log_path, config) + CMUpdateLogkafkaConfig(hostname, log_path, config, checkConfig) ) ) { result: Future[CMCommandResult] => diff --git a/app/kafka/manager/LogkafkaCommandActor.scala b/app/kafka/manager/LogkafkaCommandActor.scala index 6a8e17ae1..8751cf973 100644 --- a/app/kafka/manager/LogkafkaCommandActor.scala +++ b/app/kafka/manager/LogkafkaCommandActor.scala @@ -81,11 +81,11 @@ class LogkafkaCommandActor(logkafkaCommandActorConfig: LogkafkaCommandActorConfi }) } } - case LKCUpdateLogkafkaConfig(hostname, log_path, config, logkafkaConfig) => + case LKCUpdateLogkafkaConfig(hostname, log_path, config, logkafkaConfig, checkConfig) => longRunning { Future { LKCCommandResult(Try { - logkafkaAdminUtils.changeLogkafkaConfig(logkafkaCommandActorConfig.curator, hostname, log_path, config, logkafkaConfig) + logkafkaAdminUtils.changeLogkafkaConfig(logkafkaCommandActorConfig.curator, hostname, log_path, config, logkafkaConfig, checkConfig) }) } } diff --git a/app/kafka/manager/utils/LogkafkaAdminUtils.scala b/app/kafka/manager/utils/LogkafkaAdminUtils.scala index 8dfaa25fb..573576b29 100644 --- a/app/kafka/manager/utils/LogkafkaAdminUtils.scala +++ b/app/kafka/manager/utils/LogkafkaAdminUtils.scala @@ -70,11 +70,15 @@ class LogkafkaAdminUtils(version: KafkaVersion) { config: Properties = new Properties, logkafkaConfigOption: Option[kafka.manager.ActorModel.LogkafkaConfig], update: Boolean = false, - readVersion: Int = -1 + readVersion: Int = -1, + checkConfig: Boolean = true ) { // validate arguments Logkafka.validateHostname(hostname) - LogkafkaNewConfigs.validate(version,config) + + if (checkConfig) { + LogkafkaNewConfigs.validate(version, config) + } val configMap: mutable.Map[String, String] = { import scala.collection.JavaConverters._ @@ -112,9 +116,10 @@ class LogkafkaAdminUtils(version: KafkaVersion) { hostname: String, log_path: String, config: Properties = new Properties, - logkafkaConfigOption: Option[kafka.manager.ActorModel.LogkafkaConfig] + logkafkaConfigOption: Option[kafka.manager.ActorModel.LogkafkaConfig], + checkConfig: Boolean = true ): Unit = { - createOrUpdateLogkafkaConfigPathInZK(curator, hostname, log_path, config, logkafkaConfigOption, true) + createOrUpdateLogkafkaConfigPathInZK(curator, hostname, log_path, config, logkafkaConfigOption, true, -1, checkConfig) } /** diff --git a/app/views/logkafka/logkafkaListContent.scala.html b/app/views/logkafka/logkafkaListContent.scala.html index 580b7a835..274f11b71 100644 --- a/app/views/logkafka/logkafkaListContent.scala.html +++ b/app/views/logkafka/logkafkaListContent.scala.html @@ -4,6 +4,8 @@ *@ @(cluster: String, logkafkas: IndexedSeq[((String, Option[kafka.manager.ActorModel.LogkafkaIdentity]),Boolean)]) +@import b3.vertical.fieldConstructor + @getFilesizeStatus(filesize: Int) = { @filesize match { case i if i < 0 => {warning} @@ -17,7 +19,7 @@
@hostname + @hostname + @log_path@d no corresponding file to collect logkafka in @hostname is inactive no corresponding file + @getLogkafkaStatus(li.active,"scanning for new file", "logkafka is inactive") +
- + @for( ((hostname, logkafkaIdentity), deleted) <- logkafkas) { @@ -60,6 +62,31 @@ }.getOrElse{} }.getOrElse{} + } + }.getOrElse{} + + + } }.getOrElse{} diff --git a/conf/routes b/conf/routes index a3f309504..dd0e2fb89 100644 --- a/conf/routes +++ b/conf/routes @@ -49,6 +49,8 @@ POST /clusters/:c/logkafkas/create controllers.Logkafka.h POST /clusters/:c/logkafkas/delete controllers.Logkafka.handleDeleteLogkafka(c:String, h:String, l:String) GET /clusters/:c/logkafkas/:h/:l/updateConfig controllers.Logkafka.updateConfig(c:String, h:String, l:String) POST /clusters/:c/logkafkas/:h/:l/updateConfig controllers.Logkafka.handleUpdateConfig(c:String, h:String, l:String) +POST /clusters/:c/logkafkas/:h/:l/disableConfig controllers.Logkafka.handleDisableConfig(c:String, h:String, l:String) +POST /clusters/:c/logkafkas/:h/:l/enableConfig controllers.Logkafka.handleEnableConfig(c:String, h:String, l:String) GET /api/status/:c/availableBrokers controllers.api.KafkaHealthCheck.availableBrokers(c:String) GET /api/status/:c/:t/underReplicatedPartitions controllers.api.KafkaHealthCheck.underReplicatedPartitions(c:String,t:String) GET /api/status/:c/:t/unavailablePartitions controllers.api.KafkaHealthCheck.unavailablePartitions(c:String,t:String)
Hostname# Log Path# Real Path# File Pos# File Size# Topic
HostnameLog Path# Real Path# File Pos# File Size# TopicOperations
+
+ @im._1.map { c => + @c.get("valid").map { enabled => + @if(enabled.toBoolean) { + Modify + @b3.form(routes.Logkafka.handleDisableConfig(cluster, hostname, log_path)) { + + @b3.submit('class -> "btn btn-warning ops-button"){ Disable } + } + } else { + @b3.form(routes.Logkafka.handleEnableConfig(cluster, hostname, log_path)) { + + @b3.submit('class -> "btn btn-success ops-button"){ Enable } + } + @b3.form(routes.Logkafka.handleDeleteLogkafka(cluster, hostname, log_path)) { + + @b3.submit('class -> "btn btn-danger ops-button"){ Delete } + } + } + }.getOrElse{