Skip to content

Commit

Permalink
Merge pull request #83 from patelh/jmx-cleanup
Browse files Browse the repository at this point in the history
Add user notification for enabling jmx
  • Loading branch information
patelh committed Jun 24, 2015
2 parents a1ba206 + 7aab951 commit ac71562
Show file tree
Hide file tree
Showing 27 changed files with 229 additions and 86 deletions.
8 changes: 6 additions & 2 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
language: scala
sudo: false
jdk: oraclejdk8
install: true
script: "./sbt clean test"
script: "./sbt clean coverage assembly"
scala:
- 2.11.5
- 2.11.5
#after_success:
# - sbt coverageReport coveralls
9 changes: 6 additions & 3 deletions app/controllers/Topic.scala
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import java.util.Properties

import kafka.manager.ActorModel.TopicIdentity
import kafka.manager.utils.TopicConfigs
import kafka.manager.{ApiError, Kafka_0_8_2_0, Kafka_0_8_1_1}
import kafka.manager.{Kafka_0_8_2_1, ApiError, Kafka_0_8_2_0, Kafka_0_8_1_1}
import models.FollowLink
import models.form._
import models.navigation.Menus
Expand Down Expand Up @@ -42,7 +42,8 @@ object Topic extends Controller{

val kafka_0_8_1_1_Default = CreateTopic("",1,1,TopicConfigs.configNames(Kafka_0_8_1_1).map(n => TConfig(n,None)).toList)
val kafka_0_8_2_0_Default = CreateTopic("",1,1,TopicConfigs.configNames(Kafka_0_8_2_0).map(n => TConfig(n,None)).toList)

val kafka_0_8_2_1_Default = CreateTopic("",1,1,TopicConfigs.configNames(Kafka_0_8_2_1).map(n => TConfig(n,None)).toList)

val defaultCreateForm = Form(
mapping(
"topic" -> nonEmptyText.verifying(maxLength(250), validateName),
Expand Down Expand Up @@ -97,13 +98,14 @@ object Topic extends Controller{
clusterConfig.version match {
case Kafka_0_8_1_1 => defaultCreateForm.fill(kafka_0_8_1_1_Default)
case Kafka_0_8_2_0 => defaultCreateForm.fill(kafka_0_8_2_0_Default)
case Kafka_0_8_2_1 => defaultCreateForm.fill(kafka_0_8_2_1_Default)
}
}
}
}

def topics(c: String) = Action.async {
kafkaManager.getTopicListWithMoreInfo(c).map { errorOrTopicList =>
kafkaManager.getTopicListExtended(c).map { errorOrTopicList =>
Ok(views.html.topic.topicList(c,errorOrTopicList))
}
}
Expand Down Expand Up @@ -201,6 +203,7 @@ object Topic extends Controller{
val defaultConfigMap = clusterConfig.version match {
case Kafka_0_8_1_1 => TopicConfigs.configNames(Kafka_0_8_1_1).map(n => (n,TConfig(n,None))).toMap
case Kafka_0_8_2_0 => TopicConfigs.configNames(Kafka_0_8_2_0).map(n => (n,TConfig(n,None))).toMap
case Kafka_0_8_2_1 => TopicConfigs.configNames(Kafka_0_8_2_1).map(n => (n,TConfig(n,None))).toMap
}
val combinedMap = defaultConfigMap ++ ti.config.toMap.map(tpl => tpl._1 -> TConfig(tpl._1,Option(tpl._2)))
defaultUpdateConfigForm.fill(UpdateTopicConfig(ti.topic,combinedMap.toList.map(_._2),ti.configReadVersion))
Expand Down
5 changes: 5 additions & 0 deletions app/controllers/api/KafkaHealthCheck.scala
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
/**
* Copyright 2015 Yahoo Inc. Licensed under the Apache License, Version 2.0
* See accompanying LICENSE file.
*/

package controllers.api

import controllers.KafkaManagerContext
Expand Down
18 changes: 10 additions & 8 deletions app/kafka/manager/ActorModel.scala
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ object ActorModel {
case class BVGetView(id: Int) extends BVRequest
case class BVGetTopicMetrics(topic: String) extends BVRequest
case object BVGetBrokerMetrics extends BVRequest
case class BVView(topicPartitions: Map[TopicIdentity, IndexedSeq[Int]],
metrics: Option[BrokerMetrics] = None,
case class BVView(topicPartitions: Map[TopicIdentity, IndexedSeq[Int]], clusterConfig: ClusterConfig,
metrics: Option[BrokerMetrics] = None,
stats: Option[BrokerClusterStats] = None) extends QueryResponse {
def numTopics : Int = topicPartitions.size
def numPartitions : Int = topicPartitions.values.foldLeft(0)((acc,i) => acc + i.size)
Expand Down Expand Up @@ -134,7 +134,7 @@ object ActorModel {
deleteSupported: Boolean) extends QueryResponse
case class TopicDescriptions(descriptions: IndexedSeq[TopicDescription], lastUpdateMillis: Long) extends QueryResponse

case class BrokerList(list: IndexedSeq[BrokerIdentity]) extends QueryResponse
case class BrokerList(list: IndexedSeq[BrokerIdentity], clusterConfig: ClusterConfig) extends QueryResponse

case class PreferredReplicaElection(startTime: DateTime, topicAndPartition: Set[TopicAndPartition], endTime: Option[DateTime]) extends QueryResponse
case class ReassignPartitions(startTime: DateTime, partitionsToBeReassigned: Map[TopicAndPartition, Seq[Int]], endTime: Option[DateTime]) extends QueryResponse
Expand Down Expand Up @@ -200,7 +200,8 @@ object ActorModel {
numBrokers: Int,
configReadVersion: Int,
config: List[(String,String)],
deleteSupported: Boolean,
deleteSupported: Boolean,
clusterConfig: ClusterConfig,
metrics: Option[BrokerMetrics] = None) {

val replicationFactor : Int = partitionsIdentity.head._2.replicas.size
Expand Down Expand Up @@ -248,7 +249,7 @@ object ActorModel {
import org.json4s.scalaz.JsonScalaz._
import scala.language.reflectiveCalls

implicit def from(brokers: Int,td: TopicDescription, tm: Option[BrokerMetrics]) : TopicIdentity = {
implicit def from(brokers: Int,td: TopicDescription, tm: Option[BrokerMetrics], clusterConfig: ClusterConfig) : TopicIdentity = {
val descJson = parse(td.description._2)
//val partMap = (descJson \ "partitions").as[Map[String,Seq[Int]]]
val partMap = field[Map[String,List[Int]]]("partitions")(descJson).fold({ e =>
Expand Down Expand Up @@ -276,11 +277,11 @@ object ActorModel {
(-1,Map.empty[String, String])
}
}
TopicIdentity(td.topic,td.description._1,partMap.size,tpi,brokers,config._1,config._2.toList,td.deleteSupported, tm)
TopicIdentity(td.topic,td.description._1,partMap.size,tpi,brokers,config._1,config._2.toList,td.deleteSupported, clusterConfig, tm)
}

implicit def from(bl: BrokerList,td: TopicDescription, tm: Option[BrokerMetrics]) : TopicIdentity = {
from(bl.list.size, td, tm)
implicit def from(bl: BrokerList,td: TopicDescription, tm: Option[BrokerMetrics], clusterConfig: ClusterConfig) : TopicIdentity = {
from(bl.list.size, td, tm, clusterConfig)
}

implicit def reassignReplicas(currentTopicIdentity: TopicIdentity,
Expand All @@ -302,6 +303,7 @@ object ActorModel {
currentTopicIdentity.configReadVersion,
currentTopicIdentity.config,
currentTopicIdentity.deleteSupported,
currentTopicIdentity.clusterConfig,
currentTopicIdentity.metrics)
}
}
Expand Down
20 changes: 14 additions & 6 deletions app/kafka/manager/BrokerViewCacheActor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ class BrokerViewCacheActor(config: BrokerViewCacheActorConfig) extends LongRunni
log.info("Stopped actor %s".format(self.path))
log.info("Cancelling updater...")
Try(cancellable.map(_.cancel()))
super.postStop()
}

override protected def longRunningPoolConfig: LongRunningPoolConfig = config.longRunningPoolConfig
Expand Down Expand Up @@ -146,11 +147,14 @@ class BrokerViewCacheActor(config: BrokerViewCacheActorConfig) extends LongRunni
brokerList <- brokerListOption
topicDescriptions <- topicDescriptionsOption
} {
val topicIdentity : IndexedSeq[TopicIdentity] = topicDescriptions.descriptions.map(TopicIdentity.from(brokerList.list.size,_,None))
val topicIdentity : IndexedSeq[TopicIdentity] = topicDescriptions.descriptions.map(
TopicIdentity.from(brokerList.list.size,_,None, config.clusterConfig))
topicIdentities = topicIdentity.map(ti => (ti.topic, ti)).toMap
val topicPartitionByBroker = topicIdentity.flatMap(ti => ti.partitionsByBroker.map(btp => (ti,btp.id,btp.partitions))).groupBy(_._2)
val topicPartitionByBroker = topicIdentity.flatMap(
ti => ti.partitionsByBroker.map(btp => (ti,btp.id,btp.partitions))).groupBy(_._2)

if (config.clusterConfig.jmxEnabled) {
//check for 2*broker list size since we schedule 2 jmx calls for each broker
if (config.clusterConfig.jmxEnabled && hasCapacityFor(2*brokerListOption.size)) {
implicit val ec = longRunningExecutionContext
val brokerLookup = brokerList.list.map(bi => bi.id -> bi).toMap
topicPartitionByBroker.foreach {
Expand All @@ -164,7 +168,8 @@ class BrokerViewCacheActor(config: BrokerViewCacheActorConfig) extends LongRunni
mbsc =>
topicPartitions.map {
case (topic, id, partitions) =>
(topic.topic, KafkaMetrics.getBrokerMetrics(mbsc, Option(topic.topic)))
(topic.topic,
KafkaMetrics.getBrokerMetrics(config.clusterConfig.version, mbsc, Option(topic.topic)))
}
}
val result = tryResult match {
Expand All @@ -188,7 +193,7 @@ class BrokerViewCacheActor(config: BrokerViewCacheActorConfig) extends LongRunni
Future {
val tryResult = KafkaJMX.doWithConnection(broker.host, broker.jmxPort) {
mbsc =>
KafkaMetrics.getBrokerMetrics(mbsc)
KafkaMetrics.getBrokerMetrics(config.clusterConfig.version, mbsc)
}

val result = tryResult match {
Expand All @@ -201,6 +206,8 @@ class BrokerViewCacheActor(config: BrokerViewCacheActorConfig) extends LongRunni
}
}
}
} else if(config.clusterConfig.jmxEnabled) {
log.warning("Not scheduling update of JMX for all brokers, not enough capacity!")
}

topicPartitionByBroker.foreach {
Expand All @@ -209,7 +216,8 @@ class BrokerViewCacheActor(config: BrokerViewCacheActorConfig) extends LongRunni
case (topic, id, partitions) =>
(topic, partitions)
}.toMap
brokerTopicPartitions.put(brokerId,BVView(topicPartitionsMap, brokerMetrics.get(brokerId)))
brokerTopicPartitions.put(
brokerId,BVView(topicPartitionsMap, config.clusterConfig, brokerMetrics.get(brokerId)))
}
}
}
Expand Down
10 changes: 5 additions & 5 deletions app/kafka/manager/ClusterManagerActor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ class ClusterManagerActor(cmConfig: ClusterManagerActorConfig)

private[this] val adminUtils = new AdminUtils(cmConfig.clusterConfig.version)

private[this] val ksProps = Props(classOf[KafkaStateActor],sharedClusterCurator, adminUtils.isDeleteSupported)
private[this] val ksProps = Props(classOf[KafkaStateActor],sharedClusterCurator, adminUtils.isDeleteSupported, cmConfig.clusterConfig)
private[this] val kafkaStateActor : ActorPath = context.actorOf(ksProps.withDispatcher(cmConfig.pinnedDispatcherName),"kafka-state").path

private[this] val bvConfig = BrokerViewCacheActorConfig(
Expand Down Expand Up @@ -176,7 +176,7 @@ class ClusterManagerActor(cmConfig: ClusterManagerActorConfig)
bl <- eventualBrokerList
tm <- eventualTopicMetrics
tdO <- eventualTopicDescription
} yield tdO.map( td => CMTopicIdentity(Try(TopicIdentity.from(bl,td,tm))))
} yield tdO.map( td => CMTopicIdentity(Try(TopicIdentity.from(bl,td,tm,cmConfig.clusterConfig))))
result pipeTo sender

case any: Any => log.warning("cma : processQueryResponse : Received unknown message: {}", any)
Expand Down Expand Up @@ -263,7 +263,7 @@ class ClusterManagerActor(cmConfig: ClusterManagerActorConfig)
val generated: Future[IndexedSeq[(String, Map[Int, Seq[Int]])]] = for {
bl <- eventualBrokerList
tds <- eventualDescriptions
tis = tds.descriptions.map(TopicIdentity.from(bl, _, None))
tis = tds.descriptions.map(TopicIdentity.from(bl, _, None,cmConfig.clusterConfig))
} yield {
bl.list.map(_.id.toInt)
// check if any nonexistent broker got selected for reassignment
Expand Down Expand Up @@ -301,7 +301,7 @@ class ClusterManagerActor(cmConfig: ClusterManagerActorConfig)
val preferredLeaderElections = for {
bl <- eventualBrokerList
tds <- eventualDescriptions
tis = tds.descriptions.map(TopicIdentity.from(bl, _, None))
tis = tds.descriptions.map(TopicIdentity.from(bl, _, None, cmConfig.clusterConfig))
toElect = tis.map(ti => ti.partitionsIdentity.values.filter(!_.isPreferredLeader).map(tpi => TopicAndPartition(ti.topic, tpi.partNum))).flatten.toSet
} yield toElect
preferredLeaderElections.map { toElect =>
Expand All @@ -317,7 +317,7 @@ class ClusterManagerActor(cmConfig: ClusterManagerActorConfig)
val topicsAndReassignments = for {
bl <- eventualBrokerList
tds <- eventualDescriptions
tis = tds.descriptions.map(TopicIdentity.from(bl, _, None))
tis = tds.descriptions.map(TopicIdentity.from(bl, _, None, cmConfig.clusterConfig))
} yield {
val reassignments = tis.map { ti =>
val topicZkPath = zkPathFrom(baseTopicsZkPath, ti.topic)
Expand Down
4 changes: 1 addition & 3 deletions app/kafka/manager/KafkaCommandActor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,6 @@ class KafkaCommandActor(kafkaCommandActorConfig: KafkaCommandActorConfig) extend

@scala.throws[Exception](classOf[Exception])
override def postStop(): Unit = {
log.info("Shutting down long running executor...")
Try(longRunningExecutor.shutdown())
super.postStop()
}

Expand All @@ -75,7 +73,7 @@ class KafkaCommandActor(kafkaCommandActorConfig: KafkaCommandActorConfig) extend
val result : KCCommandResult = KCCommandResult(Failure(new UnsupportedOperationException(
s"Delete topic not supported for kafka version ${kafkaCommandActorConfig.version}")))
sender ! result
case Kafka_0_8_2_0 =>
case Kafka_0_8_2_0 | Kafka_0_8_2_1 =>
longRunning {
Future {
KCCommandResult(Try {
Expand Down
69 changes: 44 additions & 25 deletions app/kafka/manager/KafkaJMX.scala
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
/**
* Copyright 2015 Yahoo Inc. Licensed under the Apache License, Version 2.0
* See accompanying LICENSE file.
*/

package kafka.manager

import javax.management._
Expand Down Expand Up @@ -46,37 +51,51 @@ object KafkaJMX {

object KafkaMetrics {

def getBytesInPerSec(mbsc: MBeanServerConnection, topicOption: Option[String] = None) = {
getBrokerTopicMeterMetrics(mbsc, "BytesInPerSec", topicOption)
def getBytesInPerSec(kafkaVersion: KafkaVersion, mbsc: MBeanServerConnection, topicOption: Option[String] = None) = {
getBrokerTopicMeterMetrics(kafkaVersion, mbsc, "BytesInPerSec", topicOption)
}

def getBytesOutPerSec(mbsc: MBeanServerConnection, topicOption: Option[String] = None) = {
getBrokerTopicMeterMetrics(mbsc, "BytesOutPerSec", topicOption)
def getBytesOutPerSec(kafkaVersion: KafkaVersion, mbsc: MBeanServerConnection, topicOption: Option[String] = None) = {
getBrokerTopicMeterMetrics(kafkaVersion, mbsc, "BytesOutPerSec", topicOption)
}

def getBytesRejectedPerSec(mbsc: MBeanServerConnection, topicOption: Option[String] = None) = {
getBrokerTopicMeterMetrics(mbsc, "BytesRejectedPerSec", topicOption)
def getBytesRejectedPerSec(kafkaVersion: KafkaVersion, mbsc: MBeanServerConnection, topicOption: Option[String] = None) = {
getBrokerTopicMeterMetrics(kafkaVersion, mbsc, "BytesRejectedPerSec", topicOption)
}

def getFailedFetchRequestsPerSec(mbsc: MBeanServerConnection, topicOption: Option[String] = None) = {
getBrokerTopicMeterMetrics(mbsc, "FailedFetchRequestsPerSec", topicOption)
def getFailedFetchRequestsPerSec(kafkaVersion: KafkaVersion, mbsc: MBeanServerConnection, topicOption: Option[String] = None) = {
getBrokerTopicMeterMetrics(kafkaVersion, mbsc, "FailedFetchRequestsPerSec", topicOption)
}

def getFailedProduceRequestsPerSec(mbsc: MBeanServerConnection, topicOption: Option[String] = None) = {
getBrokerTopicMeterMetrics(mbsc, "FailedProduceRequestsPerSec", topicOption)
def getFailedProduceRequestsPerSec(kafkaVersion: KafkaVersion, mbsc: MBeanServerConnection, topicOption: Option[String] = None) = {
getBrokerTopicMeterMetrics(kafkaVersion, mbsc, "FailedProduceRequestsPerSec", topicOption)
}

def getMessagesInPerSec(mbsc: MBeanServerConnection, topicOption: Option[String] = None) = {
getBrokerTopicMeterMetrics(mbsc, "MessagesInPerSec", topicOption)
def getMessagesInPerSec(kafkaVersion: KafkaVersion, mbsc: MBeanServerConnection, topicOption: Option[String] = None) = {
getBrokerTopicMeterMetrics(kafkaVersion, mbsc, "MessagesInPerSec", topicOption)
}

private def getBrokerTopicMeterMetrics(mbsc: MBeanServerConnection, metricName: String, topicOption: Option[String]) = {
getMeterMetric(mbsc, getObjectName(metricName, topicOption))
private def getBrokerTopicMeterMetrics(kafkaVersion: KafkaVersion, mbsc: MBeanServerConnection, metricName: String, topicOption: Option[String]) = {
getMeterMetric(mbsc, getObjectName(kafkaVersion, metricName, topicOption))
}

private def getSep(kafkaVersion: KafkaVersion) : String = {
kafkaVersion match {
case Kafka_0_8_1_1 => "\""
case _ => ""
}
}

private def getObjectName(name: String, topicOption: Option[String] = None) = {
val topicProp = topicOption.map(topic => s",topic=$topic").getOrElse("")
new ObjectName(s"kafka.server:type=BrokerTopicMetrics,name=$name$topicProp")
def getObjectName(kafkaVersion: KafkaVersion, name: String, topicOption: Option[String] = None) = {
val sep = getSep(kafkaVersion)
val topicAndName = kafkaVersion match {
case Kafka_0_8_1_1 =>
topicOption.map( topic => s"${sep}$topic-$name${sep}").getOrElse(s"${sep}AllTopics$name${sep}")
case _ =>
val topicProp = topicOption.map(topic => s",topic=$topic").getOrElse("")
s"$name$topicProp"
}
new ObjectName(s"${sep}kafka.server${sep}:type=${sep}BrokerTopicMetrics${sep},name=$topicAndName")
}

/* Gauge, Value : 0 */
Expand Down Expand Up @@ -122,14 +141,14 @@ object KafkaMetrics {
attributes.find(_.getName == name).map(_.getValue.asInstanceOf[Double]).getOrElse(0D)
}

def getBrokerMetrics(mbsc: MBeanServerConnection, topic: Option[String] = None) : BrokerMetrics = {
def getBrokerMetrics(kafkaVersion: KafkaVersion, mbsc: MBeanServerConnection, topic: Option[String] = None) : BrokerMetrics = {
BrokerMetrics(
KafkaMetrics.getBytesInPerSec(mbsc, topic),
KafkaMetrics.getBytesOutPerSec(mbsc, topic),
KafkaMetrics.getBytesRejectedPerSec(mbsc, topic),
KafkaMetrics.getFailedFetchRequestsPerSec(mbsc, topic),
KafkaMetrics.getFailedProduceRequestsPerSec(mbsc, topic),
KafkaMetrics.getMessagesInPerSec(mbsc, topic))
KafkaMetrics.getBytesInPerSec(kafkaVersion, mbsc, topic),
KafkaMetrics.getBytesOutPerSec(kafkaVersion, mbsc, topic),
KafkaMetrics.getBytesRejectedPerSec(kafkaVersion, mbsc, topic),
KafkaMetrics.getFailedFetchRequestsPerSec(kafkaVersion, mbsc, topic),
KafkaMetrics.getFailedProduceRequestsPerSec(kafkaVersion, mbsc, topic),
KafkaMetrics.getMessagesInPerSec(kafkaVersion, mbsc, topic))
}
}

Expand Down Expand Up @@ -192,4 +211,4 @@ object FormatMetric {
}
}
}
}
}
Loading

0 comments on commit ac71562

Please sign in to comment.