Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merge with upstream and fixing build #8

Merged
merged 45 commits into from
Nov 18, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
883f54f
fix warning level of broker skew percentage
jisookim0513 Aug 7, 2015
a243276
fix warning level of under-replicated percentage
jisookim0513 Aug 7, 2015
eebd59e
Add consumer-level information, along with the latest-produced offset…
cvcal Aug 19, 2015
e9008d2
Refactoring
patelh Sep 4, 2015
6f843c5
Add passive/active offset cache and test
patelh Sep 20, 2015
d8858d6
Increment version
patelh Sep 20, 2015
52bf7c2
Fix cluster config
patelh Sep 20, 2015
6a99447
Increment version
patelh Sep 20, 2015
09d4215
Fix update of cluster config
patelh Sep 20, 2015
c8524cf
Increment version
patelh Sep 20, 2015
e6bf39c
Another bug fix for cluster config
patelh Sep 20, 2015
d8f9b12
Fix consumer list in active mode
patelh Sep 20, 2015
a98cc74
Merge pull request #131 from cvcal/cvcal
patelh Sep 23, 2015
1a9cb19
Merge pull request #110 from metamx/fix-warning-display
patelh Sep 23, 2015
b30eeb8
Move check before getting topic descriptions
patelh Aug 17, 2015
8e06123
Update to check before and after
patelh Aug 23, 2015
0daafd0
Merge pull request #116 from patelh/check-before
patelh Sep 23, 2015
9d09807
Increment version
patelh Sep 23, 2015
baec2a6
Parallel offset requests in bulk
patelh Sep 24, 2015
2f7710f
Cleanup, add configs
patelh Sep 24, 2015
0f5b85e
Update more often
patelh Sep 24, 2015
bf56d01
Add producer message rate to topic list
patelh Sep 24, 2015
ea146de
Merge pull request #132 from patelh/par-prod-offset
patelh Sep 24, 2015
7e13c00
Broker Down bugfix
Aug 26, 2015
ee720e3
Fix owners list
patelh Sep 25, 2015
4492659
Bump version
patelh Sep 25, 2015
b0fb7f2
Merge pull request #133 from patelh/broker-list-interface-bugfix
patelh Sep 25, 2015
bed7558
adding colors to manual assignments
yazgoo Sep 30, 2015
4b42c55
logkafka: check if hostname is localhost
zheolong Oct 13, 2015
8ce8207
logkafka: update log collecting state
zheolong Oct 21, 2015
61b26d2
logkafka: little fix about config deleting
zheolong Oct 23, 2015
b6bc8b9
Add default logging for akka
bjoernhaeuser Oct 24, 2015
4ab38d7
logkafka: add config operations column
zheolong Oct 25, 2015
e76c2ee
Merge pull request #144 from bjoernhaeuser/logging
patelh Oct 27, 2015
629868d
Merge pull request #140 from zheolong/master
patelh Oct 27, 2015
b6db5c0
Merge pull request #136 from yazgoo/master
patelh Oct 27, 2015
56c05a4
Add hints to addCluster and updateCluster
zheolong Nov 6, 2015
3728c11
Merge pull request #151 from zheolong/master
patelh Nov 6, 2015
1f4a03a
logkafka: add config item regex_filter_pattern
zheolong Nov 12, 2015
8a858a4
logkafka: check if regex_filter_pattern is legal
zheolong Nov 13, 2015
ace56fe
logkafka: add logkafka creation test
zheolong Nov 14, 2015
b4ba8d7
logkafka: change duplicated case name
zheolong Nov 14, 2015
a84296e
Merge pull request #153 from zheolong/master
patelh Nov 14, 2015
1b46d3e
Merge remote-tracking branch 'upstream/master'
sjoerdmulder Nov 18, 2015
39e2d0d
Fixing build by excluding oauth (that contains logging)
sjoerdmulder Nov 18, 2015
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 17 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ A tool for managing [Apache Kafka](http://kafka.apache.org).
It supports the following :

- Manage multiple clusters
- Easy inspection of cluster state (topics, brokers, replica distribution, partition distribution)
- Easy inspection of cluster state (topics, consumers, offsets, brokers, replica distribution, partition distribution)
- Run preferred replica election
- Generate partition assignments with option to select brokers to use
- Run reassignment of partition (based on generated assignments)
Expand All @@ -18,6 +18,7 @@ It supports the following :
- Add partitions to existing topic
- Update config for existing topic
- Optionally enable JMX polling for broker level and topic level metrics.
- Optionally filter out consumers that do not have ids/ owners/ & offsets/ directories in zookeeper.

Cluster Management

Expand All @@ -37,9 +38,21 @@ Topic View

***

Consumer List View

![consumer](/img/consumer-list.png)

***

Consumed Topic View

![consumer](/img/consumed-topic.png)

***

Broker List

![topic](/img/broker-list.png)
![broker](/img/broker-list.png)

***

Expand All @@ -52,9 +65,9 @@ Broker View
Requirements
------------

1. [Kafka 0.8.1.1 or 0.8.2.0](http://kafka.apache.org/downloads.html)
1. [Kafka 0.8.1.1 or 0.8.2.1](http://kafka.apache.org/downloads.html)
2. [sbt 0.13.x](http://www.scala-sbt.org/download.html)
3. Java 7+
3. Java 8+

Configuration
-------------
Expand Down
18 changes: 13 additions & 5 deletions app/controllers/Cluster.scala
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,9 @@ object Cluster extends Controller {
"zkHosts" -> nonEmptyText.verifying(validateZkHosts),
"zkMaxRetry" -> ignored(100 : Int),
"jmxEnabled" -> boolean,
"logkafkaEnabled" -> boolean
"filterConsumers" -> boolean,
"logkafkaEnabled" -> boolean,
"activeOffsetCacheEnabled" -> boolean
)(ClusterConfig.apply)(ClusterConfig.customUnapply)
)

Expand All @@ -93,7 +95,9 @@ object Cluster extends Controller {
"zkHosts" -> nonEmptyText.verifying(validateZkHosts),
"zkMaxRetry" -> ignored(100 : Int),
"jmxEnabled" -> boolean,
"logkafkaEnabled" -> boolean
"filterConsumers" -> boolean,
"logkafkaEnabled" -> boolean,
"activeOffsetCacheEnabled" -> boolean
)(ClusterOperation.apply)(ClusterOperation.customUnapply)
)

Expand Down Expand Up @@ -154,7 +158,9 @@ object Cluster extends Controller {
cc.curatorConfig.zkConnect,
cc.curatorConfig.zkMaxRetry,
cc.jmxEnabled,
cc.logkafkaEnabled))
cc.filterConsumers,
cc.logkafkaEnabled,
cc.activeOffsetCacheEnabled))
}))
}
}
Expand All @@ -166,7 +172,7 @@ object Cluster extends Controller {
clusterConfigForm.bindFromRequest.fold(
formWithErrors => Future.successful(BadRequest(views.html.cluster.addCluster(formWithErrors))),
clusterConfig => {
kafkaManager.addCluster(clusterConfig.name, clusterConfig.version.toString, clusterConfig.curatorConfig.zkConnect, clusterConfig.jmxEnabled, clusterConfig.logkafkaEnabled).map { errorOrSuccess =>
kafkaManager.addCluster(clusterConfig.name, clusterConfig.version.toString, clusterConfig.curatorConfig.zkConnect, clusterConfig.jmxEnabled, clusterConfig.filterConsumers, clusterConfig.logkafkaEnabled, clusterConfig.activeOffsetCacheEnabled).map { errorOrSuccess =>
Ok(views.html.common.resultOfCommand(
views.html.navigation.defaultMenu(),
models.navigation.BreadCrumbs.withView("Add Cluster"),
Expand Down Expand Up @@ -225,7 +231,9 @@ object Cluster extends Controller {
clusterOperation.clusterConfig.version.toString,
clusterOperation.clusterConfig.curatorConfig.zkConnect,
clusterOperation.clusterConfig.jmxEnabled,
clusterOperation.clusterConfig.logkafkaEnabled
clusterOperation.clusterConfig.filterConsumers,
clusterOperation.clusterConfig.logkafkaEnabled,
clusterOperation.clusterConfig.activeOffsetCacheEnabled
).map { errorOrSuccess =>
Ok(views.html.common.resultOfCommand(
views.html.navigation.defaultMenu(),
Expand Down
37 changes: 37 additions & 0 deletions app/controllers/Consumer.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/**
* Copyright 2015 Yahoo Inc. Licensed under the Apache License, Version 2.0
* See accompanying LICENSE file.
*/

package controllers

import features.ApplicationFeatures
import play.api.mvc._

/**
* @author cvcal
*/
object Consumer extends Controller{
import play.api.libs.concurrent.Execution.Implicits.defaultContext

private[this] val kafkaManager = KafkaManagerContext.getKafkaManager
private[this] implicit val af: ApplicationFeatures = ApplicationFeatures.features

def consumers(cluster: String) = Action.async {
kafkaManager.getConsumerListExtended(cluster).map { errorOrConsumerList =>
Ok(views.html.consumer.consumerList(cluster, errorOrConsumerList))
}
}

def consumer(cluster: String, consumerGroup: String) = Action.async {
kafkaManager.getConsumerIdentity(cluster,consumerGroup).map { errorOrConsumerIdentity =>
Ok(views.html.consumer.consumerView(cluster,consumerGroup,errorOrConsumerIdentity))
}
}

def consumerAndTopic(cluster: String, consumerGroup: String, topic: String) = Action.async {
kafkaManager.getConsumedTopicState(cluster,consumerGroup,topic).map { errorOrConsumedTopicState =>
Ok(views.html.consumer.consumedTopicView(cluster,consumerGroup,topic,errorOrConsumedTopicState))
}
}
}
36 changes: 36 additions & 0 deletions app/controllers/Logkafka.scala
Original file line number Diff line number Diff line change
Expand Up @@ -233,4 +233,40 @@ object Logkafka extends Controller{
)
}
}

def handleEnableConfig(clusterName: String, hostname: String, log_path: String) = Action.async { implicit request =>
clusterFeatureGate(clusterName, KMLogKafkaFeature) { clusterContext =>
implicit val clusterFeatures = clusterContext.clusterFeatures
val props = new Properties();
props.put("valid", true.toString);
kafkaManager.updateLogkafkaConfig(clusterName, hostname, log_path, props, false).map { errorOrSuccess =>
Ok(views.html.common.resultOfCommand(
views.html.navigation.clusterMenu(clusterName, "Logkafka", "Logkafka View", Menus.clusterMenus(clusterName)),
models.navigation.BreadCrumbs.withNamedViewAndClusterAndLogkafka("Logkafka View", clusterName, hostname, log_path, "Update Config"),
errorOrSuccess,
"Enable Config",
FollowLink("Go to logkafka view.", routes.Logkafka.logkafka(clusterName, hostname, log_path).toString()),
FollowLink("Try again.", routes.Logkafka.updateConfig(clusterName, hostname, log_path).toString())
))
}
}
}

def handleDisableConfig(clusterName: String, hostname: String, log_path: String) = Action.async { implicit request =>
clusterFeatureGate(clusterName, KMLogKafkaFeature) { clusterContext =>
implicit val clusterFeatures = clusterContext.clusterFeatures
val props = new Properties();
props.put("valid", false.toString);
kafkaManager.updateLogkafkaConfig(clusterName, hostname, log_path, props, false).map { errorOrSuccess =>
Ok(views.html.common.resultOfCommand(
views.html.navigation.clusterMenu(clusterName, "Logkafka", "Logkafka View", Menus.clusterMenus(clusterName)),
models.navigation.BreadCrumbs.withNamedViewAndClusterAndLogkafka("Logkafka View", clusterName, hostname, log_path, "Update Config"),
errorOrSuccess,
"Disable Config",
FollowLink("Go to logkafka view.", routes.Logkafka.logkafka(clusterName, hostname, log_path).toString()),
FollowLink("Try again.", routes.Logkafka.updateConfig(clusterName, hostname, log_path).toString())
))
}
}
}
}
34 changes: 19 additions & 15 deletions app/controllers/ReassignPartitions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ import play.api.data.Forms._
import play.api.data.validation.{Valid, Invalid, Constraint}
import play.api.mvc._

import scala.collection.mutable

import scala.concurrent.Future
import scalaz.{\/, \/-, -\/}

Expand Down Expand Up @@ -204,7 +206,8 @@ object ReassignPartitions extends Controller{
bl <- blOrError
} yield {
Ok(views.html.topic.manualAssignments(
c, t, manualReassignmentForm.fill(List(flattenTopicIdentity(ti))), bl, bv, manualReassignmentForm.errors
//c, t, manualReassignmentForm.fill(List(flattenTopicIdentity(ti))), bl, bv, manualReassignmentForm.errors
c, t, List(flattenTopicIdentity(ti)), bl, bv, manualReassignmentForm.errors
))
}
errorOrResult.fold(err => {
Expand All @@ -227,18 +230,19 @@ object ReassignPartitions extends Controller{
}, { topics: TopicListExtended =>
kafkaManager.getBrokerList(c).flatMap { errOrCV =>
errOrCV.fold(
{ err: ApiError =>
Future.successful(Ok(views.html.topic.confirmMultipleAssignments(c, -\/(err))))
}, { brokers: BrokerListExtended => {
brokersViews.flatMap { errorOrBVs =>
errorOrBVs.fold(
{ err: ApiError => Future.successful(Ok(views.html.topic.confirmMultipleAssignments(c, -\/(err))))}, { bVs: Seq[BVView] => Future {
Ok(views.html.topic.manualMultipleAssignments(
c, manualReassignmentForm.fill(flattenedTopicListExtended(topics)), brokers, bVs, manualReassignmentForm.errors
))
}
}
)
{err: ApiError =>
Future.successful( Ok(views.html.topic.confirmMultipleAssignments( c, -\/(err) )))
},
{ brokers: BrokerListExtended => {
brokersViews.flatMap { errorOrBVs =>
errorOrBVs.fold (
{err: ApiError => Future.successful( Ok(views.html.topic.confirmMultipleAssignments( c, -\/(err) )))},
{bVs => Future {
Ok(views.html.topic.manualMultipleAssignments(
c, flattenedTopicListExtended(topics), brokers , bVs, manualReassignmentForm.errors
))
}}
)
}
}
}
Expand Down Expand Up @@ -283,7 +287,7 @@ object ReassignPartitions extends Controller{
errors => kafkaManager.getClusterList.flatMap { errorOrClusterList =>
responseScreen(
"Manual Reassign Partitions Failure",
-\/(IndexedSeq(ApiError("There is something really wrong with your submitted data!")))
-\/(IndexedSeq(ApiError("There is something really wrong with your submitted data!\n\n" + errors.toString)))
)
},
assignment => {
Expand Down Expand Up @@ -397,7 +401,7 @@ object ReassignPartitions extends Controller{
),
cc =>
reassignPartitionsForm.bindFromRequest.fold(
formWithErrors => Future.successful(BadRequest(views.html.topic.topicView(c, t, -\/(ApiError("Unknown operation!"))))),
formWithErrors => Future.successful(BadRequest(views.html.topic.topicView(c, t, -\/(ApiError("Unknown operation!")), None))),
op => op match {
case RunAssignment =>
implicit val clusterFeatures = cc.clusterFeatures
Expand Down
10 changes: 7 additions & 3 deletions app/controllers/Topic.scala
Original file line number Diff line number Diff line change
Expand Up @@ -139,8 +139,11 @@ object Topic extends Controller{
}

def topic(c: String, t: String) = Action.async {
kafkaManager.getTopicIdentity(c,t).map { errorOrTopicIdentity =>
Ok(views.html.topic.topicView(c,t,errorOrTopicIdentity))
val futureErrorOrTopicIdentity = kafkaManager.getTopicIdentity(c,t)
val futureErrorOrConsumerList = kafkaManager.getConsumersForTopic(c,t)

futureErrorOrTopicIdentity.zip(futureErrorOrConsumerList).map {case (errorOrTopicIdentity,errorOrConsumerList) =>
Ok(views.html.topic.topicView(c,t,errorOrTopicIdentity,errorOrConsumerList))
}
}

Expand Down Expand Up @@ -197,7 +200,8 @@ object Topic extends Controller{
BadRequest(views.html.topic.topicView(
clusterName,
topic,
-\/(ApiError(formWithErrors.error("topic").map(_.toString).getOrElse("Unknown error deleting topic!")))))
-\/(ApiError(formWithErrors.error("topic").map(_.toString).getOrElse("Unknown error deleting topic!"))),
None))
),
deleteTopic => {
kafkaManager.deleteTopic(clusterName, deleteTopic.topic).map { errorOrSuccess =>
Expand Down
15 changes: 13 additions & 2 deletions app/features/ApplicationFeature.scala
Original file line number Diff line number Diff line change
Expand Up @@ -50,16 +50,27 @@ case class ApplicationFeatures(features: Set[ApplicationFeature])

object ApplicationFeatures {
import play.api.Play.current
private lazy val log = LoggerFactory.getLogger(classOf[ApplicationFeatures])

lazy val default : List[String] = List(
KMClusterManagerFeature,
KMTopicManagerFeature,
KMPreferredReplicaElectionFeature,
KMReassignPartitionsFeature).map(_.getClass.getSimpleName)

lazy val features = {
getApplicationFeatures(play.api.Play.configuration.underlying)
}

def getApplicationFeatures(config: Config) : ApplicationFeatures = {
import scala.collection.JavaConverters._
val configFeatures: List[String] = config.getStringList("application.features").asScala.toList
val configFeatures: Option[List[String]] = Try(config.getStringList("application.features").asScala.toList).toOption

if(configFeatures.isEmpty) {
log.warn(s"application.features not found in conf file, using default values $default")
}

val f = configFeatures.map(ApplicationFeature.from).flatten
val f = configFeatures.getOrElse(default).map(ApplicationFeature.from).flatten
ApplicationFeatures(f.toSet)
}
}
Loading