Skip to content

Commit

Permalink
Merge pull request #140 from zheolong/master
Browse files Browse the repository at this point in the history
logkafka: check if hostname is localhost
  • Loading branch information
patelh committed Oct 27, 2015
2 parents e76c2ee + 4ab38d7 commit 629868d
Show file tree
Hide file tree
Showing 9 changed files with 111 additions and 33 deletions.
36 changes: 36 additions & 0 deletions app/controllers/Logkafka.scala
Original file line number Diff line number Diff line change
Expand Up @@ -233,4 +233,40 @@ object Logkafka extends Controller{
)
}
}

def handleEnableConfig(clusterName: String, hostname: String, log_path: String) = Action.async { implicit request =>
clusterFeatureGate(clusterName, KMLogKafkaFeature) { clusterContext =>
implicit val clusterFeatures = clusterContext.clusterFeatures
val props = new Properties();
props.put("valid", true.toString);
kafkaManager.updateLogkafkaConfig(clusterName, hostname, log_path, props, false).map { errorOrSuccess =>
Ok(views.html.common.resultOfCommand(
views.html.navigation.clusterMenu(clusterName, "Logkafka", "Logkafka View", Menus.clusterMenus(clusterName)),
models.navigation.BreadCrumbs.withNamedViewAndClusterAndLogkafka("Logkafka View", clusterName, hostname, log_path, "Update Config"),
errorOrSuccess,
"Enable Config",
FollowLink("Go to logkafka view.", routes.Logkafka.logkafka(clusterName, hostname, log_path).toString()),
FollowLink("Try again.", routes.Logkafka.updateConfig(clusterName, hostname, log_path).toString())
))
}
}
}

def handleDisableConfig(clusterName: String, hostname: String, log_path: String) = Action.async { implicit request =>
clusterFeatureGate(clusterName, KMLogKafkaFeature) { clusterContext =>
implicit val clusterFeatures = clusterContext.clusterFeatures
val props = new Properties();
props.put("valid", false.toString);
kafkaManager.updateLogkafkaConfig(clusterName, hostname, log_path, props, false).map { errorOrSuccess =>
Ok(views.html.common.resultOfCommand(
views.html.navigation.clusterMenu(clusterName, "Logkafka", "Logkafka View", Menus.clusterMenus(clusterName)),
models.navigation.BreadCrumbs.withNamedViewAndClusterAndLogkafka("Logkafka View", clusterName, hostname, log_path, "Update Config"),
errorOrSuccess,
"Disable Config",
FollowLink("Go to logkafka view.", routes.Logkafka.logkafka(clusterName, hostname, log_path).toString()),
FollowLink("Try again.", routes.Logkafka.updateConfig(clusterName, hostname, log_path).toString())
))
}
}
}
}
8 changes: 6 additions & 2 deletions app/kafka/manager/ActorModel.scala
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,9 @@ object ActorModel {
) extends CommandRequest
case class CMUpdateLogkafkaConfig(hostname: String,
log_path: String,
config: Properties) extends CommandRequest
config: Properties,
checkConfig: Boolean = true
) extends CommandRequest
case class CMDeleteLogkafka(hostname: String, log_path: String) extends CommandRequest
//##########

Expand Down Expand Up @@ -584,7 +586,9 @@ object ActorModel {
case class LKCUpdateLogkafkaConfig(hostname: String,
log_path: String,
config: Properties,
logkafkaConfig: Option[LogkafkaConfig]) extends CommandRequest
logkafkaConfig: Option[LogkafkaConfig],
checkConfig: Boolean = true
) extends CommandRequest

case class LKCCommandResult(result: Try[Unit]) extends CommandResponse

Expand Down
4 changes: 2 additions & 2 deletions app/kafka/manager/ClusterManagerActor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -523,11 +523,11 @@ class ClusterManagerActor(cmConfig: ClusterManagerActorConfig)
}
} pipeTo sender()

case CMUpdateLogkafkaConfig(hostname, log_path, config) =>
case CMUpdateLogkafkaConfig(hostname, log_path, config, checkConfig) =>
implicit val ec = longRunningExecutionContext
val eventualLogkafkaConfig = withLogkafkaStateActor(LKSGetLogkafkaConfig(hostname))(identity[Option[LogkafkaConfig]])
eventualLogkafkaConfig.map { logkafkaConfigOption =>
withLogkafkaCommandActor(LKCUpdateLogkafkaConfig(hostname, log_path, config, logkafkaConfigOption)) {
withLogkafkaCommandActor(LKCUpdateLogkafkaConfig(hostname, log_path, config, logkafkaConfigOption, checkConfig)) {
lkcResponse: LKCCommandResult =>
CMCommandResult(lkcResponse.result)
}
Expand Down
5 changes: 3 additions & 2 deletions app/kafka/manager/KafkaManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -440,14 +440,15 @@ class KafkaManager(akkaConfig: Config)
clusterName: String,
hostname: String,
log_path: String,
config: Properties
config: Properties,
checkConfig: Boolean = true
): Future[ApiError \/ ClusterContext] =
{
implicit val ec = apiExecutionContext
withKafkaManagerActor(
KMClusterCommandRequest(
clusterName,
CMUpdateLogkafkaConfig(hostname, log_path, config)
CMUpdateLogkafkaConfig(hostname, log_path, config, checkConfig)
)
) {
result: Future[CMCommandResult] =>
Expand Down
20 changes: 7 additions & 13 deletions app/kafka/manager/LogkafkaCommandActor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -66,18 +66,12 @@ class LogkafkaCommandActor(logkafkaCommandActorConfig: LogkafkaCommandActorConfi
implicit val ec = longRunningExecutionContext
request match {
case LKCDeleteLogkafka(hostname, log_path, logkafkaConfig) =>
if(logkafkaCommandActorConfig.clusterContext.clusterFeatures.features(KMDeleteTopicFeature)) {
longRunning {
Future {
LKCCommandResult(Try {
logkafkaAdminUtils.deleteLogkafka(logkafkaCommandActorConfig.curator, hostname, log_path, logkafkaConfig)
})
}
longRunning {
Future {
LKCCommandResult(Try {
logkafkaAdminUtils.deleteLogkafka(logkafkaCommandActorConfig.curator, hostname, log_path, logkafkaConfig)
})
}
} else {
val result : LKCCommandResult = LKCCommandResult(Failure(new UnsupportedOperationException(
s"Delete logkafka not supported for kafka version ${logkafkaCommandActorConfig.clusterContext.config.version}")))
sender ! result
}
case LKCCreateLogkafka(hostname, log_path, config, logkafkaConfig) =>
longRunning {
Expand All @@ -87,11 +81,11 @@ class LogkafkaCommandActor(logkafkaCommandActorConfig: LogkafkaCommandActorConfi
})
}
}
case LKCUpdateLogkafkaConfig(hostname, log_path, config, logkafkaConfig) =>
case LKCUpdateLogkafkaConfig(hostname, log_path, config, logkafkaConfig, checkConfig) =>
longRunning {
Future {
LKCCommandResult(Try {
logkafkaAdminUtils.changeLogkafkaConfig(logkafkaCommandActorConfig.curator, hostname, log_path, config, logkafkaConfig)
logkafkaAdminUtils.changeLogkafkaConfig(logkafkaCommandActorConfig.curator, hostname, log_path, config, logkafkaConfig, checkConfig)
})
}
}
Expand Down
3 changes: 3 additions & 0 deletions app/kafka/manager/utils/Logkafka.scala
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ object Logkafka {
case None =>
checkCondition(false, IllegalCharacterInName(hostname))
}
checkCondition(!hostname.matches("^localhost$"), HostnameIsLocalhost)
checkCondition(hostname.matches(validHostnameRegex), InvalidHostname)
}

Expand Down Expand Up @@ -86,6 +87,7 @@ object Logkafka {

object LogkafkaErrors {
class HostnameEmpty private[LogkafkaErrors] extends UtilError("hostname is illegal, can't be empty")
class HostnameIsLocalhost private[LogkafkaErrors] extends UtilError("hostname is illegal, can't be localhost")
class LogPathEmpty private[LogkafkaErrors] extends UtilError("log path is illegal, can't be empty")
class LogPathNotAbsolute private[LogkafkaErrors] extends UtilError("log path is illegal, must be absolute")
class InvalidHostname private[LogkafkaErrors] extends UtilError(s"hostname is illegal, does not match regex ${Logkafka.validHostnameRegex}")
Expand All @@ -100,6 +102,7 @@ object LogkafkaErrors {
class HostnameNotExists private[LogkafkaErrors] (hostname: String) extends UtilError(s"Hostname not exists : $hostname")

val HostnameEmpty = new HostnameEmpty
val HostnameIsLocalhost = new HostnameIsLocalhost
val LogPathEmpty = new LogPathEmpty
val LogPathNotAbsolute = new LogPathNotAbsolute
val InvalidHostname = new InvalidHostname
Expand Down
13 changes: 9 additions & 4 deletions app/kafka/manager/utils/LogkafkaAdminUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -70,11 +70,15 @@ class LogkafkaAdminUtils(version: KafkaVersion) {
config: Properties = new Properties,
logkafkaConfigOption: Option[kafka.manager.ActorModel.LogkafkaConfig],
update: Boolean = false,
readVersion: Int = -1
readVersion: Int = -1,
checkConfig: Boolean = true
) {
// validate arguments
Logkafka.validateHostname(hostname)
LogkafkaNewConfigs.validate(version,config)

if (checkConfig) {
LogkafkaNewConfigs.validate(version, config)
}

val configMap: mutable.Map[String, String] = {
import scala.collection.JavaConverters._
Expand Down Expand Up @@ -112,9 +116,10 @@ class LogkafkaAdminUtils(version: KafkaVersion) {
hostname: String,
log_path: String,
config: Properties = new Properties,
logkafkaConfigOption: Option[kafka.manager.ActorModel.LogkafkaConfig]
logkafkaConfigOption: Option[kafka.manager.ActorModel.LogkafkaConfig],
checkConfig: Boolean = true
): Unit = {
createOrUpdateLogkafkaConfigPathInZK(curator, hostname, log_path, config, logkafkaConfigOption, true)
createOrUpdateLogkafkaConfigPathInZK(curator, hostname, log_path, config, logkafkaConfigOption, true, -1, checkConfig)
}

/**
Expand Down
53 changes: 43 additions & 10 deletions app/views/logkafka/logkafkaListContent.scala.html
Original file line number Diff line number Diff line change
Expand Up @@ -4,27 +4,31 @@
*@
@(cluster: String, logkafkas: IndexedSeq[((String, Option[kafka.manager.ActorModel.LogkafkaIdentity]),Boolean)])

@import b3.vertical.fieldConstructor

@getFilesizeStatus(filesize: Int) = {
@filesize match {
case i if i < 0 => {warning}
case i => {}
}
@filesize match {
case i if i < 0 => {warning}
case i => {}
}
}

@getLogkafkaStatus(active: Boolean) = {
@if(active) {} else {danger}
@getLogkafkaStatus(flag: Boolean, s1: String, s2: String) = {
@if(flag) {@s1} else {@s2}
}

<table class="table" id="logkafkas-table">
<thead>
<tr><th>Hostname</th><th># Log Path</th><th># Real Path</th><th># File Pos</th><th># File Size</th><th># Topic</th></tr>
<tr><th>Hostname</th><th>Log Path</th><th># Real Path</th><th># File Pos</th><th># File Size</th><th># Topic</th><th>Operations</th></tr>
</thead>
<tbody>
@for( ((hostname, logkafkaIdentity), deleted) <- logkafkas) {
@logkafkaIdentity.map{ li =>
@for( (log_path, im) <- li.identityMap) {
<tr>
<td class="@getLogkafkaStatus(li.active)"><a href="@routes.Logkafka.logkafka(cluster, hostname, log_path)">@hostname</a></td>
<td class=@getLogkafkaStatus(li.active, "", "danger")>
<a href="@routes.Logkafka.logkafka(cluster, hostname, log_path)">@hostname</a>
</td>

@im._1.map { c =>
<td>@log_path</td>
Expand All @@ -33,8 +37,12 @@
@im._2.map { c =>
@c.get("realpath").map { d =>
<td>@d</td>
}.getOrElse{<td class = "danger"> no corresponding file to collect </td>}
}.getOrElse{<td class = "danger"> logkafka in @hostname is inactive </td>}
}.getOrElse{<td class = "danger"> no corresponding file </td>}
}.getOrElse{
<td class = @getLogkafkaStatus(li.active, "warning", "danger")>
@getLogkafkaStatus(li.active,"scanning for new file", "logkafka is inactive")
</td>
}

@im._2.map { c =>
@c.get("filepos").map { d =>
Expand All @@ -54,6 +62,31 @@
}.getOrElse{<td> </td>}
}.getOrElse{<td> </td>}

<td>
<div class="btn-group-horizontal" role="group" aria-label="...">
@im._1.map { c =>
@c.get("valid").map { enabled =>
@if(enabled.toBoolean) {
<a href="@routes.Logkafka.updateConfig(cluster, hostname, log_path)" class="btn btn-default ops-button" role="button">Modify</a>
@b3.form(routes.Logkafka.handleDisableConfig(cluster, hostname, log_path)) {
<input type="hidden" name="name" value="@cluster">
@b3.submit('class -> "btn btn-warning ops-button"){ Disable }
}
} else {
@b3.form(routes.Logkafka.handleEnableConfig(cluster, hostname, log_path)) {
<input type="hidden" name="name" value="@cluster">
@b3.submit('class -> "btn btn-success ops-button"){ Enable }
}
@b3.form(routes.Logkafka.handleDeleteLogkafka(cluster, hostname, log_path)) {
<input type="hidden" name="name" value="@cluster">
@b3.submit('class -> "btn btn-danger ops-button"){ Delete }
}
}
}.getOrElse{<td> </td>}
}.getOrElse{<td> </td>}
</div>
</td>

</tr>
}
}.getOrElse{}
Expand Down
2 changes: 2 additions & 0 deletions conf/routes
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ POST /clusters/:c/logkafkas/create controllers.Logkafka.h
POST /clusters/:c/logkafkas/delete controllers.Logkafka.handleDeleteLogkafka(c:String, h:String, l:String)
GET /clusters/:c/logkafkas/:h/:l/updateConfig controllers.Logkafka.updateConfig(c:String, h:String, l:String)
POST /clusters/:c/logkafkas/:h/:l/updateConfig controllers.Logkafka.handleUpdateConfig(c:String, h:String, l:String)
POST /clusters/:c/logkafkas/:h/:l/disableConfig controllers.Logkafka.handleDisableConfig(c:String, h:String, l:String)
POST /clusters/:c/logkafkas/:h/:l/enableConfig controllers.Logkafka.handleEnableConfig(c:String, h:String, l:String)
GET /api/status/:c/availableBrokers controllers.api.KafkaHealthCheck.availableBrokers(c:String)
GET /api/status/:c/:t/underReplicatedPartitions controllers.api.KafkaHealthCheck.underReplicatedPartitions(c:String,t:String)
GET /api/status/:c/:t/unavailablePartitions controllers.api.KafkaHealthCheck.unavailablePartitions(c:String,t:String)
Expand Down

0 comments on commit 629868d

Please sign in to comment.