Skip to content

Commit

Permalink
Merge pull request #93 from Mogztter/message-count-history
Browse files Browse the repository at this point in the history
Messages count history with chart
  • Loading branch information
patelh committed Jul 30, 2015
2 parents 41cf533 + b1f6334 commit 9975d31
Show file tree
Hide file tree
Showing 7 changed files with 90 additions and 17 deletions.
5 changes: 5 additions & 0 deletions app/kafka/manager/ActorModel.scala
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import org.joda.time.DateTime
import kafka.manager.utils.TopicAndPartition
import org.slf4j.LoggerFactory

import scala.collection.immutable.Queue
import scala.util.Try
import scalaz.{NonEmptyList, Validation}

Expand Down Expand Up @@ -39,6 +40,7 @@ object ActorModel {
case object BVGetBrokerMetrics extends BVRequest
case class BVView(topicPartitions: Map[TopicIdentity, IndexedSeq[Int]], clusterConfig: ClusterConfig,
metrics: Option[BrokerMetrics] = None,
messagesPerSecCountHistory: Option[Queue[BrokerMessagesPerSecCount]] = None,
stats: Option[BrokerClusterStats] = None) extends QueryResponse {
def numTopics : Int = topicPartitions.size
def numPartitions : Int = topicPartitions.values.foldLeft(0)((acc,i) => acc + i.size)
Expand Down Expand Up @@ -311,6 +313,9 @@ object ActorModel {
}
}

case class BrokerMessagesPerSecCount(date: DateTime,
count: Long)

case class BrokerMetrics(bytesInPerSec: MeterMetric,
bytesOutPerSec: MeterMetric,
bytesRejectedPerSec: MeterMetric,
Expand Down
51 changes: 34 additions & 17 deletions app/kafka/manager/BrokerViewCacheActor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@
package kafka.manager

import akka.actor.{ActorRef, Cancellable, ActorPath}
import kafka.manager.utils.FiniteQueue
import org.joda.time.DateTime

import scala.collection.immutable.Queue
import scala.collection.mutable
import scala.concurrent.Future
import scala.concurrent.duration._
Expand All @@ -15,12 +19,12 @@ import scala.util.Try
* @author hiral
*/
import ActorModel._
case class BrokerViewCacheActorConfig(kafkaStateActorPath: ActorPath,
clusterConfig: ClusterConfig,
longRunningPoolConfig: LongRunningPoolConfig,
case class BrokerViewCacheActorConfig(kafkaStateActorPath: ActorPath,
clusterConfig: ClusterConfig,
longRunningPoolConfig: LongRunningPoolConfig,
updatePeriod: FiniteDuration = 10 seconds)
class BrokerViewCacheActor(config: BrokerViewCacheActorConfig) extends LongRunningPoolActor {

private[this] val ZERO = BigDecimal(0)

private[this] var cancellable : Option[Cancellable] = None
Expand All @@ -32,16 +36,18 @@ class BrokerViewCacheActor(config: BrokerViewCacheActorConfig) extends LongRunni
private[this] var brokerListOption : Option[BrokerList] = None

private[this] var brokerMetrics : Map[Int, BrokerMetrics] = Map.empty

private[this] val brokerTopicPartitions : mutable.Map[Int, BVView] = new mutable.HashMap[Int, BVView]

private[this] val topicMetrics: mutable.Map[String, mutable.Map[Int, BrokerMetrics]] =
new mutable.HashMap[String, mutable.Map[Int, BrokerMetrics]]()

private[this] var combinedBrokerMetric : Option[BrokerMetrics] = None

private[this] val EMPTY_BVVIEW = BVView(Map.empty, config.clusterConfig, Option(BrokerMetrics.DEFAULT))


private[this] var brokerMessagesPerSecCountHistory : Map[Int, Queue[BrokerMessagesPerSecCount]] = Map.empty

override def preStart() = {
log.info("Started actor %s".format(self.path))
log.info("Scheduling updater for %s".format(config.updatePeriod))
Expand All @@ -67,7 +73,7 @@ class BrokerViewCacheActor(config: BrokerViewCacheActorConfig) extends LongRunni
log.error("Long running pool queue full, skipping!")
}

private def produceBViewWithBrokerClusterState(bv: BVView) : BVView = {
private def produceBViewWithBrokerClusterState(bv: BVView, id: Int) : BVView = {
val bcs = for {
metrics <- bv.metrics
cbm <- combinedBrokerMetric
Expand All @@ -83,17 +89,18 @@ class BrokerViewCacheActor(config: BrokerViewCacheActorConfig) extends LongRunni
} else ZERO
BrokerClusterStats(perMessages, perIncoming, perOutgoing)
}
val messagesPerSecCountHistory = brokerMessagesPerSecCountHistory.get(id)
if(bcs.isDefined) {
bv.copy(stats=bcs)
bv.copy(stats = bcs, messagesPerSecCountHistory = messagesPerSecCountHistory)
} else {
bv
bv.copy(messagesPerSecCountHistory = messagesPerSecCountHistory)
}
}

private def allBrokerViews(): Seq[BVView] = {
var bvs = mutable.MutableList[BVView]()
for (key <- brokerTopicPartitions.keySet.toSeq.sorted) {
val bv = brokerTopicPartitions.get(key).map { bv => produceBViewWithBrokerClusterState(bv) }
val bv = brokerTopicPartitions.get(key).map { bv => produceBViewWithBrokerClusterState(bv, key) }
if (bv.isDefined) {
bvs += bv.get
}
Expand All @@ -116,9 +123,9 @@ class BrokerViewCacheActor(config: BrokerViewCacheActorConfig) extends LongRunni

case BVGetView(id) =>
sender ! brokerTopicPartitions.get(id).map { bv =>
produceBViewWithBrokerClusterState(bv)
produceBViewWithBrokerClusterState(bv, id)
}

case BVGetBrokerMetrics =>
sender ! brokerMetrics

Expand All @@ -135,13 +142,21 @@ class BrokerViewCacheActor(config: BrokerViewCacheActorConfig) extends LongRunni
tm.put(id, bm)
topicMetrics.put(topic, tm)
}

case BVUpdateBrokerMetrics(id, metrics) =>
brokerMetrics += (id -> metrics)
combinedBrokerMetric = Option(brokerMetrics.values.foldLeft(BrokerMetrics.DEFAULT)((acc, m) => acc + m))

val updatedBVView = brokerTopicPartitions.getOrElse(id, EMPTY_BVVIEW).copy(metrics = Option(metrics))
brokerTopicPartitions.put(id, updatedBVView)
val now = DateTime.now()
val messagesCount = BrokerMessagesPerSecCount(now, metrics.messagesInPerSec.count)
brokerMessagesPerSecCountHistory += (id -> brokerMessagesPerSecCountHistory.get(id).map {
history =>
history.enqueueFinite(messagesCount, 10)
}.getOrElse {
Queue(messagesCount)
})

case any: Any => log.warning("bvca : processActorRequest : Received unknown message: {}", any)
}
Expand All @@ -161,6 +176,8 @@ class BrokerViewCacheActor(config: BrokerViewCacheActorConfig) extends LongRunni
}
}

implicit def queue2finitequeue[A](q: Queue[A]): FiniteQueue[A] = new FiniteQueue[A](q)

private[this] def updateView(): Unit = {
for {
brokerList <- brokerListOption
Expand All @@ -187,7 +204,7 @@ class BrokerViewCacheActor(config: BrokerViewCacheActorConfig) extends LongRunni
mbsc =>
topicPartitions.map {
case (topic, id, partitions) =>
(topic.topic,
(topic.topic,
KafkaMetrics.getBrokerMetrics(config.clusterConfig.version, mbsc, Option(topic.topic)))
}
}
Expand Down Expand Up @@ -231,12 +248,12 @@ class BrokerViewCacheActor(config: BrokerViewCacheActorConfig) extends LongRunni

topicPartitionByBroker.foreach {
case (brokerId, topicPartitions) =>
val topicPartitionsMap : Map[TopicIdentity, IndexedSeq[Int]] = topicPartitions.map {
val topicPartitionsMap: Map[TopicIdentity, IndexedSeq[Int]] = topicPartitions.map {
case (topic, id, partitions) =>
(topic, partitions)
}.toMap
brokerTopicPartitions.put(
brokerId,BVView(topicPartitionsMap, config.clusterConfig, brokerMetrics.get(brokerId)))
brokerId, BVView(topicPartitionsMap, config.clusterConfig, brokerMetrics.get(brokerId)))
}
}
}
Expand Down
14 changes: 14 additions & 0 deletions app/kafka/manager/utils/FiniteQueue.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package kafka.manager.utils

import scala.collection.immutable.Queue

class FiniteQueue[A](q: Queue[A]) {

def enqueueFinite[B >: A](elem: B, maxSize: Int): Queue[B] = {
var ret = q.enqueue(elem)
while (ret.size > maxSize) {
ret = ret.dequeue._2
}
ret
}
}
25 changes: 25 additions & 0 deletions app/views/broker/brokerViewContent.scala.html
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,31 @@
</div>
</div>
</div>
<div class="row">
<div class="col-md-12">
<div class="panel panel-default">
<div class="panel-heading"><h4>Messages count</h4></div>
<div class="ct-chart"></div>
<script>
var options = {
axisY: {
type: Chartist.AutoScaleAxis,
low: @brokerView.messagesPerSecCountHistory.map(v => v.head.count - 1).getOrElse(0),
high: @brokerView.messagesPerSecCountHistory.map(v => v.last.count + 1).getOrElse(0),
onlyInteger: true
}
};
var data = {
labels: [@Html(brokerView.messagesPerSecCountHistory.map(_.map(v => s"'${v.date.toString("HH:mm:ss")}'").mkString(",")).getOrElse(""))],
series: [
[@brokerView.messagesPerSecCountHistory.map(_.map(_.count).mkString(","))]
]
};
new Chartist.Line('.ct-chart', data, options);
</script>
</div>
</div>
</div>
<div class="panel panel-default">
<div class="panel-heading"><h4>Per Topic Detail</h4></div>
<table class="table">
Expand Down
2 changes: 2 additions & 0 deletions app/views/main.scala.html
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@
</script>
<link rel="stylesheet" media="screen" href="@routes.Assets.versioned("stylesheets/index.min.css")">
<link rel="stylesheet" media="screen" href="@routes.Assets.versioned("dataTables/stylesheets/dataTables.bootstrap.css")">
<link rel="stylesheet" media="screen" href="@routes.Assets.versioned("chartist/stylesheets/chartist.min.css")">
<!--
<script data-main="@routes.Assets.versioned("javascripts/index.js")" src="@routes.WebJarAssets.at(WebJarAssets.locate("require.min.js"))"></script>
-->
<script src="@routes.Assets.versioned("chartist/javascripts/chartist.min.js")"></script>
</head>
<body role="document">

Expand Down
9 changes: 9 additions & 0 deletions public/chartist/javascripts/chartist.min.js

Large diffs are not rendered by default.

Loading

0 comments on commit 9975d31

Please sign in to comment.