Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upstream changes #6

Merged
merged 36 commits into from
Sep 14, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
b1f6334
Messages count history with chart
ggrossetie May 7, 2015
4e443ba
add partitions to multiple topics at once
jisookim0513 Jul 17, 2015
3ef8cf5
add a test for addPartitionsToMultipleTopics
jisookim0513 Jul 20, 2015
a6ff559
throw an error when trying to add partitions to topics without read v…
jisookim0513 Jul 20, 2015
848ee6c
fix a bug of not being able to fill in readVersions in addMultipleTop…
jisookim0513 Jul 20, 2015
2e0c0ee
Add logkafka - Collect logs and send lines to Apache Kafka, refer to
zheolong Jul 21, 2015
dc93f76
Logkafka
zheolong Jul 23, 2015
1ccd191
Logkafka
zheolong Jul 27, 2015
e69b35e
Logkafka
zheolong Jul 27, 2015
29812f5
Logkafka
zheolong Jul 28, 2015
657e4fa
change syntax for forloop in addPartitionsToTopics
jisookim0513 Jul 28, 2015
951f035
Logkafka
zheolong Jul 29, 2015
38ef80d
Fix sbt download repo
Jul 29, 2015
fd27962
Follow 3xx responses when downloading launcher
Jul 29, 2015
9f330b1
break word for partitions; add DataTable plugin for broker view table…
jisookim0513 Jul 29, 2015
9975d31
Merge pull request #93 from Mogztter/message-count-history
patelh Jul 30, 2015
4d762db
Merge pull request #98 from metamx/add-partitions-to-multiple-topics
patelh Jul 30, 2015
c4dd481
Merge pull request #104 from patricklucas/fix_sbt_download_repo
patelh Jul 30, 2015
ed3eee1
Merge pull request #105 from metamx/ui-improvements
patelh Jul 30, 2015
07e2494
Increment version
patelh Jul 30, 2015
3ad04a2
Logkafka
zheolong Jul 30, 2015
b40e0c6
Merge remote-tracking branch 'upstream/master' into logkafka
zheolong Jul 30, 2015
9e21e13
Logkafka
zheolong Aug 1, 2015
5469b71
Logkafka
zheolong Aug 1, 2015
18d370d
First pass at adding feature gates
patelh Aug 24, 2015
4688fba
Update sbt, play, scala versions
patelh Aug 26, 2015
b9c1908
workaround for missing interpolator
patelh Aug 26, 2015
34be574
Move button on topic view
patelh Aug 26, 2015
eafb0a4
Update button text on topic view
patelh Aug 26, 2015
1948c93
Add references
patelh Aug 26, 2015
3cdde75
update readme
patelh Aug 26, 2015
aeb7edf
Merge pull request #118 from patelh/update-sbt
patelh Aug 26, 2015
347862c
Add config references
patelh Aug 26, 2015
7add672
Update view
patelh Aug 26, 2015
acacac2
Increment version
patelh Aug 26, 2015
b1c2fa4
Merge pull request #100 from zheolong/logkafka
patelh Aug 26, 2015
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,15 @@ Alternatively, use the environment variable `ZK_HOSTS` if you don't want to hard

ZK_HOSTS="my.zookeeper.host.com:2181"

You can optionally enable/disable the following functionality by modifying the default list in application.conf :

application.features=["KMClusterManagerFeature","KMTopicManagerFeature","KMPreferredReplicaElectionFeature","KMReassignPartitionsFeature"]

- KMClusterManagerFeature - allows adding, updating, deleting cluster from Kafka Manager
- KMTopicManagerFeature - allows adding, updating, deleting topic from a Kafka cluster
- KMPreferredReplicaElectionFeature - allows running of preferred replica election for a Kafka cluster
- KMReassignPartitionsFeature - allows generating partition assignments and reassigning partitions

Deployment
----------

Expand Down
24 changes: 4 additions & 20 deletions app/controllers/Application.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

package controllers

import features.ApplicationFeatures
import kafka.manager.features.ClusterFeatures
import play.api.mvc._

/**
Expand All @@ -16,29 +18,11 @@ object Application extends Controller {

private[this] val kafkaManager = KafkaManagerContext.getKafkaManager

private[this] implicit val af: ApplicationFeatures = ApplicationFeatures.features

def index = Action.async {
kafkaManager.getClusterList.map { errorOrClusterList =>
Ok(views.html.index(errorOrClusterList))
}
}

def cluster(c: String) = Action.async {
kafkaManager.getClusterView(c).map { errorOrClusterView =>
Ok(views.html.cluster.clusterView(c,errorOrClusterView))
}
}

def brokers(c: String) = Action.async {
kafkaManager.getBrokerList(c).map { errorOrBrokerList =>
Ok(views.html.broker.brokerList(c,errorOrBrokerList))
}
}

def broker(c: String, b: Int) = Action.async {
kafkaManager.getBrokerView(c,b).map { errorOrBrokerView =>
Ok(views.html.broker.brokerView(c,b,errorOrBrokerView))
}
}


}
203 changes: 121 additions & 82 deletions app/controllers/Cluster.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

package controllers

import features.{KMClusterManagerFeature, ApplicationFeatures}
import kafka.manager.{KafkaVersion, ApiError, ClusterConfig}
import models.FollowLink
import models.form._
Expand All @@ -25,6 +26,7 @@ object Cluster extends Controller {
import play.api.libs.concurrent.Execution.Implicits.defaultContext

private[this] val kafkaManager = KafkaManagerContext.getKafkaManager
private[this] implicit val af: ApplicationFeatures = ApplicationFeatures.features

val validateName : Constraint[String] = Constraint("validate name") { name =>
Try {
Expand Down Expand Up @@ -67,7 +69,8 @@ object Cluster extends Controller {
"kafkaVersion" -> nonEmptyText.verifying(validateKafkaVersion),
"zkHosts" -> nonEmptyText.verifying(validateZkHosts),
"zkMaxRetry" -> ignored(100 : Int),
"jmxEnabled" -> boolean
"jmxEnabled" -> boolean,
"logkafkaEnabled" -> boolean
)(ClusterConfig.apply)(ClusterConfig.customUnapply)
)

Expand All @@ -78,103 +81,139 @@ object Cluster extends Controller {
"kafkaVersion" -> nonEmptyText.verifying(validateKafkaVersion),
"zkHosts" -> nonEmptyText.verifying(validateZkHosts),
"zkMaxRetry" -> ignored(100 : Int),
"jmxEnabled" -> boolean
"jmxEnabled" -> boolean,
"logkafkaEnabled" -> boolean
)(ClusterOperation.apply)(ClusterOperation.customUnapply)
)

def addCluster = Action.async { implicit request =>
Future.successful(Ok(views.html.cluster.addCluster(clusterConfigForm)))
def cluster(c: String) = Action.async {
kafkaManager.getClusterView(c).map { errorOrClusterView =>
Ok(views.html.cluster.clusterView(c,errorOrClusterView))
}
}

def brokers(c: String) = Action.async {
kafkaManager.getBrokerList(c).map { errorOrBrokerList =>
Ok(views.html.broker.brokerList(c,errorOrBrokerList))
}
}

def updateCluster(c: String) = Action.async { implicit request =>
kafkaManager.getClusterConfig(c).map { errorOrClusterConfig =>
Ok(views.html.cluster.updateCluster(c,errorOrClusterConfig.map { cc =>
updateForm.fill(ClusterOperation.apply(Update.toString,cc.name,cc.version.toString,cc.curatorConfig.zkConnect,cc.curatorConfig.zkMaxRetry,cc.jmxEnabled))
}))
def broker(c: String, b: Int) = Action.async {
kafkaManager.getBrokerView(c,b).map { errorOrBrokerView =>
Ok(views.html.broker.brokerView(c,b,errorOrBrokerView))
}
}

def addCluster = Action.async { implicit request =>
featureGate(KMClusterManagerFeature) {
Future.successful(Ok(views.html.cluster.addCluster(clusterConfigForm)))
}
}

def handleAddCluster = Action.async { implicit request =>
clusterConfigForm.bindFromRequest.fold(
formWithErrors => Future.successful(BadRequest(views.html.cluster.addCluster(formWithErrors))),
clusterConfig => {
kafkaManager.addCluster(clusterConfig.name, clusterConfig.version.toString, clusterConfig.curatorConfig.zkConnect, clusterConfig.jmxEnabled).map { errorOrSuccess =>
Ok(views.html.common.resultOfCommand(
views.html.navigation.defaultMenu(),
models.navigation.BreadCrumbs.withView("Add Cluster"),
errorOrSuccess,
"Add Cluster",
FollowLink("Go to cluster view.",routes.Application.cluster(clusterConfig.name).toString()),
FollowLink("Try again.",routes.Cluster.addCluster().toString())
))
}
def updateCluster(c: String) = Action.async { implicit request =>
featureGate(KMClusterManagerFeature) {
kafkaManager.getClusterConfig(c).map { errorOrClusterConfig =>
Ok(views.html.cluster.updateCluster(c,errorOrClusterConfig.map { cc =>
updateForm.fill(ClusterOperation.apply(
Update.toString,
cc.name,
cc.version.toString,
cc.curatorConfig.zkConnect,
cc.curatorConfig.zkMaxRetry,
cc.jmxEnabled,
cc.logkafkaEnabled))
}))
}
)
}

}

def handleUpdateCluster(c: String) = Action.async { implicit request =>
updateForm.bindFromRequest.fold(
formWithErrors => Future.successful(BadRequest(views.html.cluster.updateCluster(c,\/-(formWithErrors)))),
clusterOperation => clusterOperation.op match {
case Enable =>
kafkaManager.enableCluster(c).map { errorOrSuccess =>
Ok(views.html.common.resultOfCommand(
views.html.navigation.defaultMenu(),
models.navigation.BreadCrumbs.withViewAndCluster("Enable Cluster",c),
errorOrSuccess,
"Enable Cluster",
FollowLink("Go to cluster list.",routes.Application.index().toString()),
FollowLink("Back to cluster list.",routes.Application.index().toString())
))
}
case Disable =>
kafkaManager.disableCluster(c).map { errorOrSuccess =>
Ok(views.html.common.resultOfCommand(
views.html.navigation.defaultMenu(),
models.navigation.BreadCrumbs.withViewAndCluster("Disable Cluster",c),
errorOrSuccess,
"Disable Cluster",
FollowLink("Back to cluster list.",routes.Application.index().toString()),
FollowLink("Back to cluster list.",routes.Application.index().toString())
))
}
case Delete =>
kafkaManager.deleteCluster(c).map { errorOrSuccess =>
def handleAddCluster = Action.async { implicit request =>
featureGate(KMClusterManagerFeature) {
clusterConfigForm.bindFromRequest.fold(
formWithErrors => Future.successful(BadRequest(views.html.cluster.addCluster(formWithErrors))),
clusterConfig => {
kafkaManager.addCluster(clusterConfig.name, clusterConfig.version.toString, clusterConfig.curatorConfig.zkConnect, clusterConfig.jmxEnabled, clusterConfig.logkafkaEnabled).map { errorOrSuccess =>
Ok(views.html.common.resultOfCommand(
views.html.navigation.defaultMenu(),
models.navigation.BreadCrumbs.withViewAndCluster("Delete Cluster",c),
models.navigation.BreadCrumbs.withView("Add Cluster"),
errorOrSuccess,
"Delete Cluster",
FollowLink("Back to cluster list.",routes.Application.index().toString()),
FollowLink("Back to cluster list.",routes.Application.index().toString())
"Add Cluster",
FollowLink("Go to cluster view.",routes.Cluster.cluster(clusterConfig.name).toString()),
FollowLink("Try again.",routes.Cluster.addCluster().toString())
))
}
case Update =>
kafkaManager.updateCluster(
clusterOperation.clusterConfig.name,
clusterOperation.clusterConfig.version.toString,
clusterOperation.clusterConfig.curatorConfig.zkConnect,
clusterOperation.clusterConfig.jmxEnabled
).map { errorOrSuccess =>
Ok(views.html.common.resultOfCommand(
}
)
}
}

def handleUpdateCluster(c: String) = Action.async { implicit request =>
featureGate(KMClusterManagerFeature) {
updateForm.bindFromRequest.fold(
formWithErrors => Future.successful(BadRequest(views.html.cluster.updateCluster(c, \/-(formWithErrors)))),
clusterOperation => clusterOperation.op match {
case Enable =>
kafkaManager.enableCluster(c).map { errorOrSuccess =>
Ok(views.html.common.resultOfCommand(
views.html.navigation.defaultMenu(),
models.navigation.BreadCrumbs.withViewAndCluster("Enable Cluster", c),
errorOrSuccess,
"Enable Cluster",
FollowLink("Go to cluster list.", routes.Application.index().toString()),
FollowLink("Back to cluster list.", routes.Application.index().toString())
))
}
case Disable =>
kafkaManager.disableCluster(c).map { errorOrSuccess =>
Ok(views.html.common.resultOfCommand(
views.html.navigation.defaultMenu(),
models.navigation.BreadCrumbs.withViewAndCluster("Disable Cluster", c),
errorOrSuccess,
"Disable Cluster",
FollowLink("Back to cluster list.", routes.Application.index().toString()),
FollowLink("Back to cluster list.", routes.Application.index().toString())
))
}
case Delete =>
kafkaManager.deleteCluster(c).map { errorOrSuccess =>
Ok(views.html.common.resultOfCommand(
views.html.navigation.defaultMenu(),
models.navigation.BreadCrumbs.withViewAndCluster("Delete Cluster", c),
errorOrSuccess,
"Delete Cluster",
FollowLink("Back to cluster list.", routes.Application.index().toString()),
FollowLink("Back to cluster list.", routes.Application.index().toString())
))
}
case Update =>
kafkaManager.updateCluster(
clusterOperation.clusterConfig.name,
clusterOperation.clusterConfig.version.toString,
clusterOperation.clusterConfig.curatorConfig.zkConnect,
clusterOperation.clusterConfig.jmxEnabled,
clusterOperation.clusterConfig.logkafkaEnabled
).map { errorOrSuccess =>
Ok(views.html.common.resultOfCommand(
views.html.navigation.defaultMenu(),
models.navigation.BreadCrumbs.withViewAndCluster("Update Cluster", c),
errorOrSuccess,
"Update Cluster",
FollowLink("Go to cluster view.", routes.Cluster.cluster(clusterOperation.clusterConfig.name).toString()),
FollowLink("Try again.", routes.Cluster.updateCluster(c).toString())
))
}
case Unknown(opString) =>
Future.successful(Ok(views.html.common.resultOfCommand(
views.html.navigation.defaultMenu(),
models.navigation.BreadCrumbs.withViewAndCluster("Update Cluster",c),
errorOrSuccess,
"Update Cluster",
FollowLink("Go to cluster view.",routes.Application.cluster(clusterOperation.clusterConfig.name).toString()),
FollowLink("Try again.",routes.Cluster.updateCluster(c).toString())
))
}
case Unknown(opString) =>
Future.successful(Ok(views.html.common.resultOfCommand(
views.html.navigation.defaultMenu(),
models.navigation.BreadCrumbs.withViewAndCluster("Unknown Cluster Operation",c),
-\/(ApiError(s"Unknown operation $opString")),
"Unknown Cluster Operation",
FollowLink("Back to cluster list.",routes.Application.index().toString()),
FollowLink("Back to cluster list.",routes.Application.index().toString())
)))
}
)
models.navigation.BreadCrumbs.withViewAndCluster("Unknown Cluster Operation", c),
-\/(ApiError(s"Unknown operation $opString")),
"Unknown Cluster Operation",
FollowLink("Back to cluster list.", routes.Application.index().toString()),
FollowLink("Back to cluster list.", routes.Application.index().toString())
)))
}
)
}
}
}
Loading