Skip to content

Commit

Permalink
Merge pull request yahoo#100 from zheolong/logkafka
Browse files Browse the repository at this point in the history
Add logkafka - Collect logs and send lines to Apache Kafka
  • Loading branch information
patelh committed Aug 26, 2015
2 parents aeb7edf + acacac2 commit b1c2fa4
Show file tree
Hide file tree
Showing 78 changed files with 3,568 additions and 668 deletions.
9 changes: 9 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,15 @@ Alternatively, use the environment variable `ZK_HOSTS` if you don't want to hard

ZK_HOSTS="my.zookeeper.host.com:2181"

You can optionally enable/disable the following functionality by modifying the default list in application.conf :

application.features=["KMClusterManagerFeature","KMTopicManagerFeature","KMPreferredReplicaElectionFeature","KMReassignPartitionsFeature"]

- KMClusterManagerFeature - allows adding, updating, deleting cluster from Kafka Manager
- KMTopicManagerFeature - allows adding, updating, deleting topic from a Kafka cluster
- KMPreferredReplicaElectionFeature - allows running of preferred replica election for a Kafka cluster
- KMReassignPartitionsFeature - allows generating partition assignments and reassigning partitions

Deployment
----------

Expand Down
24 changes: 4 additions & 20 deletions app/controllers/Application.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

package controllers

import features.ApplicationFeatures
import kafka.manager.features.ClusterFeatures
import play.api.mvc._

/**
Expand All @@ -16,29 +18,11 @@ object Application extends Controller {

private[this] val kafkaManager = KafkaManagerContext.getKafkaManager

private[this] implicit val af: ApplicationFeatures = ApplicationFeatures.features

def index = Action.async {
kafkaManager.getClusterList.map { errorOrClusterList =>
Ok(views.html.index(errorOrClusterList))
}
}

def cluster(c: String) = Action.async {
kafkaManager.getClusterView(c).map { errorOrClusterView =>
Ok(views.html.cluster.clusterView(c,errorOrClusterView))
}
}

def brokers(c: String) = Action.async {
kafkaManager.getBrokerList(c).map { errorOrBrokerList =>
Ok(views.html.broker.brokerList(c,errorOrBrokerList))
}
}

def broker(c: String, b: Int) = Action.async {
kafkaManager.getBrokerView(c,b).map { errorOrBrokerView =>
Ok(views.html.broker.brokerView(c,b,errorOrBrokerView))
}
}


}
203 changes: 121 additions & 82 deletions app/controllers/Cluster.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

package controllers

import features.{KMClusterManagerFeature, ApplicationFeatures}
import kafka.manager.{KafkaVersion, ApiError, ClusterConfig}
import models.FollowLink
import models.form._
Expand All @@ -25,6 +26,7 @@ object Cluster extends Controller {
import play.api.libs.concurrent.Execution.Implicits.defaultContext

private[this] val kafkaManager = KafkaManagerContext.getKafkaManager
private[this] implicit val af: ApplicationFeatures = ApplicationFeatures.features

val validateName : Constraint[String] = Constraint("validate name") { name =>
Try {
Expand Down Expand Up @@ -67,7 +69,8 @@ object Cluster extends Controller {
"kafkaVersion" -> nonEmptyText.verifying(validateKafkaVersion),
"zkHosts" -> nonEmptyText.verifying(validateZkHosts),
"zkMaxRetry" -> ignored(100 : Int),
"jmxEnabled" -> boolean
"jmxEnabled" -> boolean,
"logkafkaEnabled" -> boolean
)(ClusterConfig.apply)(ClusterConfig.customUnapply)
)

Expand All @@ -78,103 +81,139 @@ object Cluster extends Controller {
"kafkaVersion" -> nonEmptyText.verifying(validateKafkaVersion),
"zkHosts" -> nonEmptyText.verifying(validateZkHosts),
"zkMaxRetry" -> ignored(100 : Int),
"jmxEnabled" -> boolean
"jmxEnabled" -> boolean,
"logkafkaEnabled" -> boolean
)(ClusterOperation.apply)(ClusterOperation.customUnapply)
)

def addCluster = Action.async { implicit request =>
Future.successful(Ok(views.html.cluster.addCluster(clusterConfigForm)))
def cluster(c: String) = Action.async {
kafkaManager.getClusterView(c).map { errorOrClusterView =>
Ok(views.html.cluster.clusterView(c,errorOrClusterView))
}
}

def brokers(c: String) = Action.async {
kafkaManager.getBrokerList(c).map { errorOrBrokerList =>
Ok(views.html.broker.brokerList(c,errorOrBrokerList))
}
}

def updateCluster(c: String) = Action.async { implicit request =>
kafkaManager.getClusterConfig(c).map { errorOrClusterConfig =>
Ok(views.html.cluster.updateCluster(c,errorOrClusterConfig.map { cc =>
updateForm.fill(ClusterOperation.apply(Update.toString,cc.name,cc.version.toString,cc.curatorConfig.zkConnect,cc.curatorConfig.zkMaxRetry,cc.jmxEnabled))
}))
def broker(c: String, b: Int) = Action.async {
kafkaManager.getBrokerView(c,b).map { errorOrBrokerView =>
Ok(views.html.broker.brokerView(c,b,errorOrBrokerView))
}
}

def addCluster = Action.async { implicit request =>
featureGate(KMClusterManagerFeature) {
Future.successful(Ok(views.html.cluster.addCluster(clusterConfigForm)))
}
}

def handleAddCluster = Action.async { implicit request =>
clusterConfigForm.bindFromRequest.fold(
formWithErrors => Future.successful(BadRequest(views.html.cluster.addCluster(formWithErrors))),
clusterConfig => {
kafkaManager.addCluster(clusterConfig.name, clusterConfig.version.toString, clusterConfig.curatorConfig.zkConnect, clusterConfig.jmxEnabled).map { errorOrSuccess =>
Ok(views.html.common.resultOfCommand(
views.html.navigation.defaultMenu(),
models.navigation.BreadCrumbs.withView("Add Cluster"),
errorOrSuccess,
"Add Cluster",
FollowLink("Go to cluster view.",routes.Application.cluster(clusterConfig.name).toString()),
FollowLink("Try again.",routes.Cluster.addCluster().toString())
))
}
def updateCluster(c: String) = Action.async { implicit request =>
featureGate(KMClusterManagerFeature) {
kafkaManager.getClusterConfig(c).map { errorOrClusterConfig =>
Ok(views.html.cluster.updateCluster(c,errorOrClusterConfig.map { cc =>
updateForm.fill(ClusterOperation.apply(
Update.toString,
cc.name,
cc.version.toString,
cc.curatorConfig.zkConnect,
cc.curatorConfig.zkMaxRetry,
cc.jmxEnabled,
cc.logkafkaEnabled))
}))
}
)
}

}

def handleUpdateCluster(c: String) = Action.async { implicit request =>
updateForm.bindFromRequest.fold(
formWithErrors => Future.successful(BadRequest(views.html.cluster.updateCluster(c,\/-(formWithErrors)))),
clusterOperation => clusterOperation.op match {
case Enable =>
kafkaManager.enableCluster(c).map { errorOrSuccess =>
Ok(views.html.common.resultOfCommand(
views.html.navigation.defaultMenu(),
models.navigation.BreadCrumbs.withViewAndCluster("Enable Cluster",c),
errorOrSuccess,
"Enable Cluster",
FollowLink("Go to cluster list.",routes.Application.index().toString()),
FollowLink("Back to cluster list.",routes.Application.index().toString())
))
}
case Disable =>
kafkaManager.disableCluster(c).map { errorOrSuccess =>
Ok(views.html.common.resultOfCommand(
views.html.navigation.defaultMenu(),
models.navigation.BreadCrumbs.withViewAndCluster("Disable Cluster",c),
errorOrSuccess,
"Disable Cluster",
FollowLink("Back to cluster list.",routes.Application.index().toString()),
FollowLink("Back to cluster list.",routes.Application.index().toString())
))
}
case Delete =>
kafkaManager.deleteCluster(c).map { errorOrSuccess =>
def handleAddCluster = Action.async { implicit request =>
featureGate(KMClusterManagerFeature) {
clusterConfigForm.bindFromRequest.fold(
formWithErrors => Future.successful(BadRequest(views.html.cluster.addCluster(formWithErrors))),
clusterConfig => {
kafkaManager.addCluster(clusterConfig.name, clusterConfig.version.toString, clusterConfig.curatorConfig.zkConnect, clusterConfig.jmxEnabled, clusterConfig.logkafkaEnabled).map { errorOrSuccess =>
Ok(views.html.common.resultOfCommand(
views.html.navigation.defaultMenu(),
models.navigation.BreadCrumbs.withViewAndCluster("Delete Cluster",c),
models.navigation.BreadCrumbs.withView("Add Cluster"),
errorOrSuccess,
"Delete Cluster",
FollowLink("Back to cluster list.",routes.Application.index().toString()),
FollowLink("Back to cluster list.",routes.Application.index().toString())
"Add Cluster",
FollowLink("Go to cluster view.",routes.Cluster.cluster(clusterConfig.name).toString()),
FollowLink("Try again.",routes.Cluster.addCluster().toString())
))
}
case Update =>
kafkaManager.updateCluster(
clusterOperation.clusterConfig.name,
clusterOperation.clusterConfig.version.toString,
clusterOperation.clusterConfig.curatorConfig.zkConnect,
clusterOperation.clusterConfig.jmxEnabled
).map { errorOrSuccess =>
Ok(views.html.common.resultOfCommand(
}
)
}
}

def handleUpdateCluster(c: String) = Action.async { implicit request =>
featureGate(KMClusterManagerFeature) {
updateForm.bindFromRequest.fold(
formWithErrors => Future.successful(BadRequest(views.html.cluster.updateCluster(c, \/-(formWithErrors)))),
clusterOperation => clusterOperation.op match {
case Enable =>
kafkaManager.enableCluster(c).map { errorOrSuccess =>
Ok(views.html.common.resultOfCommand(
views.html.navigation.defaultMenu(),
models.navigation.BreadCrumbs.withViewAndCluster("Enable Cluster", c),
errorOrSuccess,
"Enable Cluster",
FollowLink("Go to cluster list.", routes.Application.index().toString()),
FollowLink("Back to cluster list.", routes.Application.index().toString())
))
}
case Disable =>
kafkaManager.disableCluster(c).map { errorOrSuccess =>
Ok(views.html.common.resultOfCommand(
views.html.navigation.defaultMenu(),
models.navigation.BreadCrumbs.withViewAndCluster("Disable Cluster", c),
errorOrSuccess,
"Disable Cluster",
FollowLink("Back to cluster list.", routes.Application.index().toString()),
FollowLink("Back to cluster list.", routes.Application.index().toString())
))
}
case Delete =>
kafkaManager.deleteCluster(c).map { errorOrSuccess =>
Ok(views.html.common.resultOfCommand(
views.html.navigation.defaultMenu(),
models.navigation.BreadCrumbs.withViewAndCluster("Delete Cluster", c),
errorOrSuccess,
"Delete Cluster",
FollowLink("Back to cluster list.", routes.Application.index().toString()),
FollowLink("Back to cluster list.", routes.Application.index().toString())
))
}
case Update =>
kafkaManager.updateCluster(
clusterOperation.clusterConfig.name,
clusterOperation.clusterConfig.version.toString,
clusterOperation.clusterConfig.curatorConfig.zkConnect,
clusterOperation.clusterConfig.jmxEnabled,
clusterOperation.clusterConfig.logkafkaEnabled
).map { errorOrSuccess =>
Ok(views.html.common.resultOfCommand(
views.html.navigation.defaultMenu(),
models.navigation.BreadCrumbs.withViewAndCluster("Update Cluster", c),
errorOrSuccess,
"Update Cluster",
FollowLink("Go to cluster view.", routes.Cluster.cluster(clusterOperation.clusterConfig.name).toString()),
FollowLink("Try again.", routes.Cluster.updateCluster(c).toString())
))
}
case Unknown(opString) =>
Future.successful(Ok(views.html.common.resultOfCommand(
views.html.navigation.defaultMenu(),
models.navigation.BreadCrumbs.withViewAndCluster("Update Cluster",c),
errorOrSuccess,
"Update Cluster",
FollowLink("Go to cluster view.",routes.Application.cluster(clusterOperation.clusterConfig.name).toString()),
FollowLink("Try again.",routes.Cluster.updateCluster(c).toString())
))
}
case Unknown(opString) =>
Future.successful(Ok(views.html.common.resultOfCommand(
views.html.navigation.defaultMenu(),
models.navigation.BreadCrumbs.withViewAndCluster("Unknown Cluster Operation",c),
-\/(ApiError(s"Unknown operation $opString")),
"Unknown Cluster Operation",
FollowLink("Back to cluster list.",routes.Application.index().toString()),
FollowLink("Back to cluster list.",routes.Application.index().toString())
)))
}
)
models.navigation.BreadCrumbs.withViewAndCluster("Unknown Cluster Operation", c),
-\/(ApiError(s"Unknown operation $opString")),
"Unknown Cluster Operation",
FollowLink("Back to cluster list.", routes.Application.index().toString()),
FollowLink("Back to cluster list.", routes.Application.index().toString())
)))
}
)
}
}
}
Loading

0 comments on commit b1c2fa4

Please sign in to comment.