diff --git a/app/controllers/Cluster.scala b/app/controllers/Cluster.scala
index 5e002cca3..90377ec2c 100644
--- a/app/controllers/Cluster.scala
+++ b/app/controllers/Cluster.scala
@@ -93,6 +93,7 @@ class Cluster (val cc: ControllerComponents, val kafkaManagerContext: KafkaManag
, "jmxUser" -> optional(text)
, "jmxPass" -> optional(text)
, "jmxSsl" -> boolean
+ , "restrictOperations" -> boolean
, "pollConsumers" -> boolean
, "filterConsumers" -> boolean
, "logkafkaEnabled" -> boolean
@@ -137,6 +138,7 @@ class Cluster (val cc: ControllerComponents, val kafkaManagerContext: KafkaManag
"jmxUser" -> optional(text),
"jmxPass" -> optional(text),
"jmxSsl" -> boolean,
+ "restrictOperations" -> boolean,
"pollConsumers" -> boolean,
"filterConsumers" -> boolean,
"logkafkaEnabled" -> boolean,
@@ -185,6 +187,7 @@ class Cluster (val cc: ControllerComponents, val kafkaManagerContext: KafkaManag
,false
,false
,false
+ ,false
,Option(defaultTuning)
,PLAINTEXT
,None
@@ -230,6 +233,7 @@ class Cluster (val cc: ControllerComponents, val kafkaManagerContext: KafkaManag
cc.jmxUser,
cc.jmxPass,
cc.jmxSsl,
+ cc.restrictOperations,
cc.pollConsumers,
cc.filterConsumers,
cc.logkafkaEnabled,
@@ -258,6 +262,7 @@ class Cluster (val cc: ControllerComponents, val kafkaManagerContext: KafkaManag
clusterConfig.jmxUser,
clusterConfig.jmxPass,
clusterConfig.jmxSsl,
+ clusterConfig.restrictOperations,
clusterConfig.pollConsumers,
clusterConfig.filterConsumers,
clusterConfig.tuning,
@@ -329,6 +334,7 @@ class Cluster (val cc: ControllerComponents, val kafkaManagerContext: KafkaManag
clusterOperation.clusterConfig.jmxUser,
clusterOperation.clusterConfig.jmxPass,
clusterOperation.clusterConfig.jmxSsl,
+ clusterOperation.clusterConfig.restrictOperations,
clusterOperation.clusterConfig.pollConsumers,
clusterOperation.clusterConfig.filterConsumers,
clusterOperation.clusterConfig.tuning,
diff --git a/app/kafka/manager/KafkaManager.scala b/app/kafka/manager/KafkaManager.scala
index 7e563c3d2..1b3983d5b 100644
--- a/app/kafka/manager/KafkaManager.scala
+++ b/app/kafka/manager/KafkaManager.scala
@@ -259,6 +259,7 @@ class KafkaManager(akkaConfig: Config) extends Logging {
jmxUser: Option[String],
jmxPass: Option[String],
jmxSsl: Boolean,
+ restrictOperations: Boolean,
pollConsumers: Boolean,
filterConsumers: Boolean,
tuning: Option[ClusterTuning],
@@ -281,6 +282,7 @@ class KafkaManager(akkaConfig: Config) extends Logging {
jmxUser = jmxUser,
jmxPass = jmxPass,
jmxSsl = jmxSsl,
+ restrictOperations = restrictOperations,
pollConsumers = pollConsumers,
filterConsumers = filterConsumers,
logkafkaEnabled = logkafkaEnabled,
@@ -298,6 +300,7 @@ class KafkaManager(akkaConfig: Config) extends Logging {
jmxUser: Option[String],
jmxPass: Option[String],
jmxSsl: Boolean,
+ restrictOperations: Boolean,
pollConsumers: Boolean,
filterConsumers: Boolean,
tuning: Option[ClusterTuning],
@@ -320,6 +323,7 @@ class KafkaManager(akkaConfig: Config) extends Logging {
jmxUser = jmxUser,
jmxPass = jmxPass,
jmxSsl = jmxSsl,
+ restrictOperations = restrictOperations,
pollConsumers = pollConsumers,
filterConsumers = filterConsumers,
logkafkaEnabled = logkafkaEnabled,
diff --git a/app/kafka/manager/actor/KafkaManagerActor.scala b/app/kafka/manager/actor/KafkaManagerActor.scala
index 94c8eda82..ee1d4cabb 100644
--- a/app/kafka/manager/actor/KafkaManagerActor.scala
+++ b/app/kafka/manager/actor/KafkaManagerActor.scala
@@ -489,6 +489,7 @@ class KafkaManagerActor(kafkaManagerConfig: KafkaManagerActorConfig)
&& newConfig.jmxPass == currentConfig.jmxPass
&& newConfig.jmxSsl == currentConfig.jmxSsl
&& newConfig.logkafkaEnabled == currentConfig.logkafkaEnabled
+ && newConfig.restrictOperations == currentConfig.restrictOperations
&& newConfig.pollConsumers == currentConfig.pollConsumers
&& newConfig.filterConsumers == currentConfig.filterConsumers
&& newConfig.activeOffsetCacheEnabled == currentConfig.activeOffsetCacheEnabled
diff --git a/app/kafka/manager/actor/cluster/KafkaStateActor.scala b/app/kafka/manager/actor/cluster/KafkaStateActor.scala
index 4fcd9116b..e25225612 100644
--- a/app/kafka/manager/actor/cluster/KafkaStateActor.scala
+++ b/app/kafka/manager/actor/cluster/KafkaStateActor.scala
@@ -25,15 +25,22 @@ import kafka.manager.features.{ClusterFeatures, KMDeleteTopicFeature, KMPollCons
import kafka.manager.model.ActorModel._
import kafka.manager.model._
import kafka.manager.utils.ZkUtils
-import kafka.manager.utils.zero81.{PreferredReplicaLeaderElectionCommand, ReassignPartitionCommand}
import kafka.manager.utils.one10.{GroupMetadata, GroupMetadataKey, MemberMetadata, OffsetKey}
+import kafka.manager.utils.zero81.{PreferredReplicaLeaderElectionCommand, ReassignPartitionCommand}
import org.apache.curator.framework.CuratorFramework
import org.apache.curator.framework.recipes.cache.PathChildrenCache.StartMode
import org.apache.curator.framework.recipes.cache._
import org.apache.kafka.clients.CommonClientConfigs
+import org.apache.kafka.clients.CommonClientConfigs.SECURITY_PROTOCOL_CONFIG
+import org.apache.kafka.clients.admin.{AdminClient, DescribeConsumerGroupsOptions}
+import org.apache.kafka.clients.consumer.ConsumerConfig._
+import org.apache.kafka.clients.consumer.internals.ConsumerProtocol
import org.apache.kafka.clients.consumer.{Consumer, ConsumerRecords, KafkaConsumer}
-import org.apache.kafka.common.{ConsumerGroupState, TopicPartition}
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.config.SaslConfigs
+import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests.DescribeGroupsResponse
+import org.apache.kafka.common.utils.Time
import org.joda.time.{DateTime, DateTimeZone}
import scala.collection.concurrent.TrieMap
@@ -41,29 +48,25 @@ import scala.collection.immutable.Map
import scala.collection.mutable
import scala.concurrent.{ExecutionContext, Future}
import scala.util.{Failure, Success, Try}
-import org.apache.kafka.clients.consumer.ConsumerConfig._
-import org.apache.kafka.clients.consumer.internals.ConsumerProtocol
-import org.apache.kafka.common.config.SaslConfigs
-import org.apache.kafka.common.protocol.Errors
-import org.apache.kafka.clients.CommonClientConfigs.SECURITY_PROTOCOL_CONFIG
-import org.apache.kafka.clients.admin.{AdminClient, ConsumerGroupDescription, DescribeConsumerGroupsOptions}
-import org.apache.kafka.common.KafkaFuture.BiConsumer
-import org.apache.kafka.common.utils.Time
/**
* @author hiral
*/
+
import kafka.manager.utils._
import scala.collection.JavaConverters._
case class PartitionOffsetRequestInfo(time: Long, maxNumOffsets: Int)
+
case class KafkaAdminClientActorConfig(clusterContext: ClusterContext, longRunningPoolConfig: LongRunningPoolConfig, kafkaStateActorPath: ActorPath, consumerProperties: Option[Properties])
+
case class KafkaAdminClientActor(config: KafkaAdminClientActorConfig) extends BaseClusterQueryActor with LongRunningPoolActor {
- private[this] var adminClientOption : Option[AdminClient] = None
+ private[this] var adminClientOption: Option[AdminClient] = None
protected implicit val clusterContext: ClusterContext = config.clusterContext
+
override protected def longRunningPoolConfig: LongRunningPoolConfig = config.longRunningPoolConfig
override protected def longRunningQueueFull(): Unit = {
@@ -91,7 +94,7 @@ case class KafkaAdminClientActor(config: KafkaAdminClientActorConfig) extends Ba
}
private def createAdminClient(bl: BrokerList): AdminClient = {
- val targetBrokers : IndexedSeq[BrokerIdentity] = bl.list
+ val targetBrokers: IndexedSeq[BrokerIdentity] = bl.list
val brokerListStr: String = targetBrokers.map {
b =>
val port = b.endpoints(config.clusterContext.config.securityProtocol)
@@ -99,15 +102,18 @@ case class KafkaAdminClientActor(config: KafkaAdminClientActorConfig) extends Ba
}.mkString(",")
val props = new Properties()
config.consumerProperties.foreach {
- cp => props.putAll(cp)
+ //to handle to Scala bug with Java 9+ versions https://github.com/scala/bug/issues/10418
+ cp => {
+ cp.forEach((k, v) => props.put(k, v))
+ }
}
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, config.clusterContext.config.securityProtocol.stringId)
props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, brokerListStr)
- if(config.clusterContext.config.saslMechanism.nonEmpty){
+ if (config.clusterContext.config.saslMechanism.nonEmpty) {
props.put(SaslConfigs.SASL_MECHANISM, config.clusterContext.config.saslMechanism.get.stringId)
log.info(s"SASL Mechanism =${config.clusterContext.config.saslMechanism.get}")
}
- if(config.clusterContext.config.jaasConfig.nonEmpty){
+ if (config.clusterContext.config.jaasConfig.nonEmpty) {
props.put(SaslConfigs.SASL_JAAS_CONFIG, config.clusterContext.config.jaasConfig.get)
log.info(s"SASL JAAS config=${config.clusterContext.config.jaasConfig.get}")
}
@@ -119,7 +125,7 @@ case class KafkaAdminClientActor(config: KafkaAdminClientActorConfig) extends Ba
metadata.error == Errors.NONE && (metadata.state == "Dead" || metadata.state == "Empty" || metadata.protocolType == ConsumerProtocol.PROTOCOL_TYPE)
override def processQueryRequest(request: QueryRequest): Unit = {
- if(adminClientOption.isEmpty) {
+ if (adminClientOption.isEmpty) {
context.actorSelection(config.kafkaStateActorPath).tell(KSGetBrokers, self)
log.error(s"AdminClient not initialized yet, cannot process request : $request")
} else {
@@ -133,17 +139,20 @@ case class KafkaAdminClientActor(config: KafkaAdminClientActorConfig) extends Ba
val options = new DescribeConsumerGroupsOptions
options.timeoutMs(1000)
client.describeConsumerGroups(groupList.asJava, options).all().whenComplete {
- (mapGroupDescription, error) => mapGroupDescription.asScala.foreach {
- case (group, desc) =>
- enqueue.offer(group -> desc.members().asScala.map(m => MemberMetadata.from(group, desc, m)).toList)
- }
+ (mapGroupDescription, error) =>
+ mapGroupDescription.asScala.foreach {
+ case (group, desc) =>
+ enqueue.offer(group -> desc.members().asScala.map(m => MemberMetadata.from(group, desc, m)).toList)
+ }
}
}
} catch {
case e: Exception =>
log.error(e, s"Failed to get group summary with admin client : $groupList")
log.error(e, s"Forcing new admin client initialization...")
- Try { adminClientOption.foreach(_.close()) }
+ Try {
+ adminClientOption.foreach(_.close())
+ }
adminClientOption = None
}
}
@@ -156,7 +165,7 @@ case class KafkaAdminClientActor(config: KafkaAdminClientActorConfig) extends Ba
override def processActorResponse(response: ActorResponse): Unit = {
response match {
case bl: BrokerList =>
- if(bl.list.nonEmpty) {
+ if (bl.list.nonEmpty) {
Try {
adminClientOption = Option(createAdminClient(bl))
}.logError(s"Failed to create admin client with brokerlist : $bl")
@@ -167,7 +176,7 @@ case class KafkaAdminClientActor(config: KafkaAdminClientActorConfig) extends Ba
}
class KafkaAdminClient(context: => ActorContext, adminClientActorPath: ActorPath) {
- def enqueueGroupMetadata(groupList: Seq[String], queue: java.util.Queue[(String, List[MemberMetadata])]) : Unit = {
+ def enqueueGroupMetadata(groupList: Seq[String], queue: java.util.Queue[(String, List[MemberMetadata])]): Unit = {
Try {
context.actorSelection(adminClientActorPath).tell(KAGetGroupSummary(groupList, queue), ActorRef.noSender)
}
@@ -179,7 +188,7 @@ object KafkaManagedOffsetCache {
val supportedVersions: Set[KafkaVersion] = Set(Kafka_0_8_2_0, Kafka_0_8_2_1, Kafka_0_8_2_2, Kafka_0_9_0_0, Kafka_0_9_0_1, Kafka_0_10_0_0, Kafka_0_10_0_1, Kafka_0_10_1_0, Kafka_0_10_1_1, Kafka_0_10_2_0, Kafka_0_10_2_1, Kafka_0_11_0_0, Kafka_0_11_0_2, Kafka_1_0_0, Kafka_1_0_1, Kafka_1_1_0, Kafka_1_1_1, Kafka_2_0_0, Kafka_2_1_0, Kafka_2_1_1, Kafka_2_2_0)
val ConsumerOffsetTopic = "__consumer_offsets"
- def isSupported(version: KafkaVersion) : Boolean = {
+ def isSupported(version: KafkaVersion): Boolean = {
supportedVersions(version)
}
@@ -199,6 +208,7 @@ object KafkaManagedOffsetCacheConfig {
case class KafkaManagedOffsetCacheConfig(groupMemberMetadataCheckMillis: Int = KafkaManagedOffsetCacheConfig.defaultGroupMemberMetadataCheckMillis
, groupTopicPartitionOffsetMaxSize: Int = KafkaManagedOffsetCacheConfig.defaultGroupTopicPartitionOffsetMaxSize
, groupTopicPartitionOffsetExpireDays: Int = KafkaManagedOffsetCacheConfig.defaultGroupTopicPartitionOffsetExpireDays)
+
case class KafkaManagedOffsetCache(clusterContext: ClusterContext
, adminClient: KafkaAdminClient
, consumerProperties: Option[Properties]
@@ -206,7 +216,7 @@ case class KafkaManagedOffsetCache(clusterContext: ClusterContext
, config: KafkaManagedOffsetCacheConfig
) extends Runnable with Closeable with Logging {
val groupTopicPartitionOffsetSet: mutable.Set[(String, String, Int)] = KafkaManagedOffsetCache.createSet()
- val groupTopicPartitionOffsetMap:Cache[(String, String, Int), OffsetAndMetadata] = Caffeine
+ val groupTopicPartitionOffsetMap: Cache[(String, String, Int), OffsetAndMetadata] = Caffeine
.newBuilder()
.maximumSize(config.groupTopicPartitionOffsetMaxSize)
.expireAfterAccess(config.groupTopicPartitionOffsetExpireDays, TimeUnit.DAYS)
@@ -233,9 +243,9 @@ case class KafkaManagedOffsetCache(clusterContext: ClusterContext
private[this] val queue = new ConcurrentLinkedDeque[(String, List[MemberMetadata])]()
@volatile
- private[this] var lastUpdateTimeMillis : Long = 0
+ private[this] var lastUpdateTimeMillis: Long = 0
- private[this] var lastGroupMemberMetadataCheckMillis : Long = System.currentTimeMillis()
+ private[this] var lastGroupMemberMetadataCheckMillis: Long = System.currentTimeMillis()
import KafkaManagedOffsetCache._
//import kafka.manager.utils.zero90.GroupMetadataManager._
@@ -262,13 +272,16 @@ case class KafkaManagedOffsetCache(clusterContext: ClusterContext
props.put(VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer")
props.put(AUTO_OFFSET_RESET_CONFIG, "latest")
consumerProperties.foreach {
- cp => props.putAll(cp)
+ //to handle to Scala bug with Java 9+ versions https://github.com/scala/bug/issues/10418
+ cp => {
+ cp.forEach((k, v) => props.put(k, v))
+ }
}
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, clusterContext.config.securityProtocol.stringId)
- if(clusterContext.config.saslMechanism.nonEmpty){
+ if (clusterContext.config.saslMechanism.nonEmpty) {
props.put(SaslConfigs.SASL_MECHANISM, clusterContext.config.saslMechanism.get.stringId)
info(s"SASL Mechanism =${clusterContext.config.saslMechanism.get}")
- if(clusterContext.config.jaasConfig.nonEmpty){
+ if (clusterContext.config.jaasConfig.nonEmpty) {
props.put(SaslConfigs.SASL_JAAS_CONFIG, clusterContext.config.jaasConfig.get)
info(s"SASL JAAS config=${clusterContext.config.jaasConfig.get}")
}
@@ -282,11 +295,11 @@ case class KafkaManagedOffsetCache(clusterContext: ClusterContext
new KafkaConsumer[Array[Byte], Array[Byte]](props)
}
- private[this] def performGroupMetadataCheck() : Unit = {
+ private[this] def performGroupMetadataCheck(): Unit = {
val currentMillis = System.currentTimeMillis()
- if((lastGroupMemberMetadataCheckMillis + config.groupMemberMetadataCheckMillis) < currentMillis) {
+ if ((lastGroupMemberMetadataCheckMillis + config.groupMemberMetadataCheckMillis) < currentMillis) {
val diff = groupTopicPartitionOffsetSet.diff(groupTopicPartitionMemberSet)
- if(diff.nonEmpty) {
+ if (diff.nonEmpty) {
val groupsToBackfill = diff.map(_._1).toSeq
info(s"Backfilling group metadata for $groupsToBackfill")
adminClient.enqueueGroupMetadata(groupsToBackfill, queue)
@@ -297,7 +310,7 @@ case class KafkaManagedOffsetCache(clusterContext: ClusterContext
}
private[this] def dequeueAndProcessBackFill(): Unit = {
- while(!queue.isEmpty) {
+ while (!queue.isEmpty) {
val (groupId, members) = queue.pop()
members.foreach {
member =>
@@ -306,7 +319,7 @@ case class KafkaManagedOffsetCache(clusterContext: ClusterContext
case (topic, part) =>
val k = (groupId, topic, part)
//only add it if it hasn't already been added through a new update via the offset topic
- if(groupTopicPartitionMemberMap.getIfPresent(k) == null) {
+ if (groupTopicPartitionMemberMap.getIfPresent(k) == null) {
groupTopicPartitionMemberMap.put(k, member)
groupTopicPartitionMemberSet.add(k)
}
@@ -320,7 +333,7 @@ case class KafkaManagedOffsetCache(clusterContext: ClusterContext
}
override def run(): Unit = {
- if(!shutdown) {
+ if (!shutdown) {
for {
consumer <- Try {
val consumer = createKafkaConsumer()
@@ -411,17 +424,20 @@ case class KafkaManagedOffsetCache(clusterContext: ClusterContext
this.shutdown = true
}
- def getOffset(group: String, topic: String, part:Int) : Option[Long] = {
+ def getOffset(group: String, topic: String, part: Int): Option[Long] = {
Option(groupTopicPartitionOffsetMap.getIfPresent((group, topic, part))).map(_.offset)
}
- def getOwner(group: String, topic: String, part:Int) : Option[String] = {
+ def getOwner(group: String, topic: String, part: Int): Option[String] = {
Option(groupTopicPartitionMemberMap.getIfPresent((group, topic, part))).map(mm => s"${mm.memberId}:${mm.clientHost}")
}
- def getConsumerTopics(group: String) : Set[String] = consumerTopicSetMap.get(group).map(_.toSet).getOrElse(Set.empty)
- def getTopicConsumers(topic: String) : Set[String] = topicConsumerSetMap.get(topic).map(_.toSet).getOrElse(Set.empty)
- def getConsumers : IndexedSeq[String] = consumerTopicSetMap.keys.toIndexedSeq
+ def getConsumerTopics(group: String): Set[String] = consumerTopicSetMap.get(group).map(_.toSet).getOrElse(Set.empty)
+
+ def getTopicConsumers(topic: String): Set[String] = topicConsumerSetMap.get(topic).map(_.toSet).getOrElse(Set.empty)
+
+ def getConsumers: IndexedSeq[String] = consumerTopicSetMap.keys.toIndexedSeq
+
def getLastUpdateTimeMillis: Long = lastUpdateTimeMillis
}
@@ -430,12 +446,13 @@ case class ConsumerInstanceSubscriptions private(id: String, subs: Map[String, I
object ConsumerInstanceSubscriptions extends Logging {
//{"version":1,"subscription":{"DXSPreAgg":1},"pattern":"static","timestamp":"1443578242654"}
- def apply(consumer: String, id: String, jsonString: String) : ConsumerInstanceSubscriptions = {
+ def apply(consumer: String, id: String, jsonString: String): ConsumerInstanceSubscriptions = {
import org.json4s.jackson.JsonMethods.parse
import org.json4s.scalaz.JsonScalaz.field
val json = parse(jsonString)
- val subs: Map[String, Int] = field[Map[String,Int]]("subscription")(json).fold({ e =>
- error(s"[consumer=$consumer] Failed to parse consumer instance subscriptions : $id : $jsonString"); Map.empty}, identity)
+ val subs: Map[String, Int] = field[Map[String, Int]]("subscription")(json).fold({ e =>
+ error(s"[consumer=$consumer] Failed to parse consumer instance subscriptions : $id : $jsonString"); Map.empty
+ }, identity)
new ConsumerInstanceSubscriptions(id, subs)
}
}
@@ -464,9 +481,9 @@ trait OffsetCache extends Logging {
// Caches a map of partitions to offsets at a key that is the topic's name.
private[this] lazy val partitionOffsetsCache: LoadingCache[String, Future[PartitionOffsetsCapture]] = CacheBuilder.newBuilder()
- .expireAfterWrite(getCacheTimeoutSecs,TimeUnit.SECONDS) // TODO - update more or less often maybe, or make it configurable
+ .expireAfterWrite(getCacheTimeoutSecs, TimeUnit.SECONDS) // TODO - update more or less often maybe, or make it configurable
.build(
- new CacheLoader[String,Future[PartitionOffsetsCapture]] {
+ new CacheLoader[String, Future[PartitionOffsetsCapture]] {
def load(topic: String): Future[PartitionOffsetsCapture] = {
loadPartitionOffsets(topic)
}
@@ -477,18 +494,19 @@ trait OffsetCache extends Logging {
// Code based off of the GetOffsetShell tool in kafka.tools, kafka 0.8.2.1
private[this] def loadPartitionOffsets(topic: String): Future[PartitionOffsetsCapture] = {
// Get partition leader broker information
- val optPartitionsWithLeaders : Option[List[(Int, Option[BrokerIdentity])]] = getTopicPartitionLeaders(topic)
+ val optPartitionsWithLeaders: Option[List[(Int, Option[BrokerIdentity])]] = getTopicPartitionLeaders(topic)
val clientId = "partitionOffsetGetter"
val time = -1
val nOffsets = 1
val simpleConsumerBufferSize = 256 * 1024
- val currentActiveBrokerSet:Set[String] = getBrokerList().list.map(_.host).toSet
+ val currentActiveBrokerSet: Set[String] = getBrokerList().list.map(_.host).toSet
val partitionsByBroker = optPartitionsWithLeaders.map {
- listOfPartAndBroker => listOfPartAndBroker.collect {
- case (part, broker) if broker.isDefined && currentActiveBrokerSet(broker.get.host) => (broker.get, part)
- }.groupBy(_._1)
+ listOfPartAndBroker =>
+ listOfPartAndBroker.collect {
+ case (part, broker) if broker.isDefined && currentActiveBrokerSet(broker.get.host) => (broker.get, part)
+ }.groupBy(_._1)
}
def getKafkaConsumer() = {
@@ -497,7 +515,7 @@ trait OffsetCache extends Logging {
// Get the latest offset for each partition
val futureMap: Future[PartitionOffsetsCapture] = {
- partitionsByBroker.fold[Future[PartitionOffsetsCapture]]{
+ partitionsByBroker.fold[Future[PartitionOffsetsCapture]] {
Future.failed(new IllegalArgumentException(s"Do not have partitions and their leaders for topic $topic"))
} { partitionsWithLeaders =>
try {
@@ -537,37 +555,37 @@ trait OffsetCache extends Logging {
private[this] def emptyPartitionOffsetsCapture: Future[PartitionOffsetsCapture] = Future.successful(PartitionOffsetsCapture(System.currentTimeMillis(), Map()))
- protected def getTopicPartitionLeaders(topic: String) : Option[List[(Int, Option[BrokerIdentity])]]
+ protected def getTopicPartitionLeaders(topic: String): Option[List[(Int, Option[BrokerIdentity])]]
- protected def getTopicDescription(topic: String, interactive: Boolean) : Option[TopicDescription]
+ protected def getTopicDescription(topic: String, interactive: Boolean): Option[TopicDescription]
- protected def getBrokerList : () => BrokerList
+ protected def getBrokerList: () => BrokerList
- protected def readConsumerOffsetByTopicPartition(consumer: String, topic: String, tpi: Map[Int, TopicPartitionIdentity]) : Map[Int, Long]
+ protected def readConsumerOffsetByTopicPartition(consumer: String, topic: String, tpi: Map[Int, TopicPartitionIdentity]): Map[Int, Long]
- protected def readConsumerOwnerByTopicPartition(consumer: String, topic: String, tpi: Map[Int, TopicPartitionIdentity]) : Map[Int, String]
+ protected def readConsumerOwnerByTopicPartition(consumer: String, topic: String, tpi: Map[Int, TopicPartitionIdentity]): Map[Int, String]
- protected def getConsumerTopicsFromIds(consumer: String) : Set[String]
+ protected def getConsumerTopicsFromIds(consumer: String): Set[String]
- protected def getConsumerTopicsFromOffsets(consumer: String) : Set[String]
+ protected def getConsumerTopicsFromOffsets(consumer: String): Set[String]
- protected def getConsumerTopicsFromOwners(consumer: String) : Set[String]
+ protected def getConsumerTopicsFromOwners(consumer: String): Set[String]
protected def getZKManagedConsumerList: IndexedSeq[ConsumerNameAndType]
- protected def lastUpdateMillisZK : Long
+ protected def lastUpdateMillisZK: Long
- protected def getConsumerTopics(consumer: String) : Set[String] = {
+ protected def getConsumerTopics(consumer: String): Set[String] = {
getConsumerTopicsFromOffsets(consumer) ++ getConsumerTopicsFromOwners(consumer) ++ getConsumerTopicsFromIds(consumer)
}
- private[this] var kafkaManagedOffsetCache : Option[KafkaManagedOffsetCache] = None
+ private[this] var kafkaManagedOffsetCache: Option[KafkaManagedOffsetCache] = None
private[this] lazy val hasNonSecureEndpoint = getBrokerList().list.exists(_.nonSecure)
- def start() : Unit = {
- if(KafkaManagedOffsetCache.isSupported(clusterContext.config.version)) {
- if(kafkaManagedOffsetCache.isEmpty) {
+ def start(): Unit = {
+ if (KafkaManagedOffsetCache.isSupported(clusterContext.config.version)) {
+ if (kafkaManagedOffsetCache.isEmpty) {
info("Starting kafka managed offset cache ...")
Try {
val bl = getBrokerList()
@@ -583,7 +601,7 @@ trait OffsetCache extends Logging {
}
}
- def stop() : Unit = {
+ def stop(): Unit = {
kafkaManagedOffsetCache.foreach { of =>
info("Stopping kafka managed offset cache ...")
Try {
@@ -592,8 +610,8 @@ trait OffsetCache extends Logging {
}
}
- def getTopicPartitionOffsets(topic: String, interactive: Boolean) : Future[PartitionOffsetsCapture] = {
- if((interactive || loadOffsets) && hasNonSecureEndpoint) {
+ def getTopicPartitionOffsets(topic: String, interactive: Boolean): Future[PartitionOffsetsCapture] = {
+ if ((interactive || loadOffsets) && hasNonSecureEndpoint) {
partitionOffsetsCache.get(topic)
} else {
emptyPartitionOffsetsCapture
@@ -602,8 +620,8 @@ trait OffsetCache extends Logging {
protected def readKafkaManagedConsumerOffsetByTopicPartition(consumer: String
, topic: String
- , tpi: Map[Int, TopicPartitionIdentity]) : Map[Int, Long] = {
- kafkaManagedOffsetCache.fold(Map.empty[Int,Long]) {
+ , tpi: Map[Int, TopicPartitionIdentity]): Map[Int, Long] = {
+ kafkaManagedOffsetCache.fold(Map.empty[Int, Long]) {
oc =>
tpi.map {
case (part, _) =>
@@ -614,8 +632,8 @@ trait OffsetCache extends Logging {
protected def readKafkaManagedConsumerOwnerByTopicPartition(consumer: String
, topic: String
- , tpi: Map[Int, TopicPartitionIdentity]) : Map[Int, String] = {
- kafkaManagedOffsetCache.fold(Map.empty[Int,String]) {
+ , tpi: Map[Int, TopicPartitionIdentity]): Map[Int, String] = {
+ kafkaManagedOffsetCache.fold(Map.empty[Int, String]) {
oc =>
tpi.map {
case (part, _) =>
@@ -624,23 +642,23 @@ trait OffsetCache extends Logging {
}
}
- protected def getKafkaManagedConsumerTopics(consumer: String) : Set[String] = {
+ protected def getKafkaManagedConsumerTopics(consumer: String): Set[String] = {
kafkaManagedOffsetCache.fold(Set.empty[String]) {
oc => oc.getConsumerTopics(consumer)
}
}
- protected def getKafkaManagedConsumerList : IndexedSeq[ConsumerNameAndType] = {
+ protected def getKafkaManagedConsumerList: IndexedSeq[ConsumerNameAndType] = {
kafkaManagedOffsetCache.fold(IndexedSeq.empty[ConsumerNameAndType]) {
oc => oc.getConsumers.map(name => ConsumerNameAndType(name, KafkaManagedConsumer))
}
}
- final def lastUpdateMillis : Long = {
+ final def lastUpdateMillis: Long = {
Math.max(lastUpdateMillisZK, kafkaManagedOffsetCache.map(_.getLastUpdateTimeMillis).getOrElse(Long.MinValue))
}
- final def getConsumerDescription(consumer: String, consumerType: ConsumerType) : ConsumerDescription = {
+ final def getConsumerDescription(consumer: String, consumerType: ConsumerType): ConsumerDescription = {
val consumerTopics: Set[String] = getKafkaVersion match {
case Kafka_0_8_1_1 => getConsumerTopicsFromOffsets(consumer)
case _ =>
@@ -659,10 +677,10 @@ trait OffsetCache extends Logging {
ConsumerDescription(consumer, topicDescriptions, consumerType)
}
- final def getConsumedTopicDescription(consumer:String
- , topic:String
+ final def getConsumedTopicDescription(consumer: String
+ , topic: String
, interactive: Boolean
- , consumerType: ConsumerType) : ConsumedTopicDescription = {
+ , consumerType: ConsumerType): ConsumedTopicDescription = {
val optTopic = getTopicDescription(topic, interactive)
val optTpi = optTopic.map(TopicIdentity.getTopicPartitionIdentity(_, None))
val (partitionOffsets, partitionOwners) = consumerType match {
@@ -716,7 +734,7 @@ case class OffsetCacheActive(curator: CuratorFramework
, kafkaVersion: KafkaVersion
, consumerProperties: Option[Properties]
, kafkaManagedOffsetCacheConfig: KafkaManagedOffsetCacheConfig
- , getBrokerList : () => BrokerList
+ , getBrokerList: () => BrokerList
)
(implicit protected[this] val ec: ExecutionContext, val cf: ClusterFeatures) extends OffsetCache {
@@ -743,17 +761,17 @@ case class OffsetCacheActive(curator: CuratorFramework
private[this] val consumersTreeCache = new TreeCache(curator, ZkUtils.ConsumersPath)
@volatile
- private[this] var consumersTreeCacheLastUpdateMillis : Long = System.currentTimeMillis()
+ private[this] var consumersTreeCacheLastUpdateMillis: Long = System.currentTimeMillis()
- private[this] def withConsumersTreeCache[T](fn: TreeCache => T) : Option[T] = {
+ private[this] def withConsumersTreeCache[T](fn: TreeCache => T): Option[T] = {
Option(fn(consumersTreeCache))
}
- protected def getTopicPartitionLeaders(topic: String) : Option[List[(Int, Option[BrokerIdentity])]] = partitionLeaders(topic)
+ protected def getTopicPartitionLeaders(topic: String): Option[List[(Int, Option[BrokerIdentity])]] = partitionLeaders(topic)
- protected def getTopicDescription(topic: String, interactive: Boolean) : Option[TopicDescription] = topicDescriptions(topic, interactive)
+ protected def getTopicDescription(topic: String, interactive: Boolean): Option[TopicDescription] = topicDescriptions(topic, interactive)
- override def start(): Unit = {
+ override def start(): Unit = {
super.start()
info("Starting consumers tree cache...")
consumersTreeCache.start()
@@ -771,9 +789,9 @@ case class OffsetCacheActive(curator: CuratorFramework
Try(consumersTreeCache.close())
}
- protected def lastUpdateMillisZK : Long = consumersTreeCacheLastUpdateMillis
+ protected def lastUpdateMillisZK: Long = consumersTreeCacheLastUpdateMillis
- protected def readConsumerOffsetByTopicPartition(consumer: String, topic: String, tpi: Map[Int, TopicPartitionIdentity]) : Map[Int, Long] = {
+ protected def readConsumerOffsetByTopicPartition(consumer: String, topic: String, tpi: Map[Int, TopicPartitionIdentity]): Map[Int, Long] = {
tpi.map {
case (p, _) =>
val offsetPath = "%s/%s/%s/%s/%s".format(ZkUtils.ConsumersPath, consumer, "offsets", topic, p)
@@ -782,7 +800,7 @@ case class OffsetCacheActive(curator: CuratorFramework
}
- protected def readConsumerOwnerByTopicPartition(consumer: String, topic: String, tpi: Map[Int, TopicPartitionIdentity]) : Map[Int, String] = {
+ protected def readConsumerOwnerByTopicPartition(consumer: String, topic: String, tpi: Map[Int, TopicPartitionIdentity]): Map[Int, String] = {
tpi.map {
case (p, _) =>
val offsetPath = "%s/%s/%s/%s/%s".format(ZkUtils.ConsumersPath, consumer, "owners", topic, p)
@@ -790,20 +808,20 @@ case class OffsetCacheActive(curator: CuratorFramework
}
}
- protected def getConsumerTopicsFromIds(consumer: String) : Set[String] = {
- val zkPath = "%s/%s/%s".format(ZkUtils.ConsumersPath,consumer,"ids")
+ protected def getConsumerTopicsFromIds(consumer: String): Set[String] = {
+ val zkPath = "%s/%s/%s".format(ZkUtils.ConsumersPath, consumer, "ids")
Option(consumersTreeCache.getCurrentChildren(zkPath)).map(_.asScala.toMap.map {
case (id, cd) => ConsumerInstanceSubscriptions.apply(consumer, id, Option(cd).map(_.getData).map(asString).getOrElse("{}"))
}.map(_.subs.keys).flatten.toSet).getOrElse(Set.empty)
}
- protected def getConsumerTopicsFromOffsets(consumer: String) : Set[String] = {
- val zkPath = "%s/%s/%s".format(ZkUtils.ConsumersPath,consumer,"offsets")
+ protected def getConsumerTopicsFromOffsets(consumer: String): Set[String] = {
+ val zkPath = "%s/%s/%s".format(ZkUtils.ConsumersPath, consumer, "offsets")
Option(consumersTreeCache.getCurrentChildren(zkPath)).map(_.asScala.toMap.keySet).getOrElse(Set.empty)
}
- protected def getConsumerTopicsFromOwners(consumer: String) : Set[String] = {
- val zkPath = "%s/%s/%s".format(ZkUtils.ConsumersPath,consumer,"owners")
+ protected def getConsumerTopicsFromOwners(consumer: String): Set[String] = {
+ val zkPath = "%s/%s/%s".format(ZkUtils.ConsumersPath, consumer, "owners")
Option(consumersTreeCache.getCurrentChildren(zkPath)).map(_.asScala.toMap.keySet).getOrElse(Set.empty)
}
@@ -813,7 +831,7 @@ case class OffsetCacheActive(curator: CuratorFramework
}.fold {
IndexedSeq.empty[ConsumerNameAndType]
} { data: java.util.Map[String, ChildData] =>
- data.asScala.filter{
+ data.asScala.filter {
case (consumer, childData) =>
if (clusterContext.config.filterConsumers)
// Defining "inactive consumer" as a consumer that is missing one of three children ids/ offsets/ or owners/
@@ -834,7 +852,7 @@ case class OffsetCachePassive(curator: CuratorFramework
, kafkaVersion: KafkaVersion
, consumerProperties: Option[Properties]
, kafkaManagedOffsetCacheConfig: KafkaManagedOffsetCacheConfig
- , getBrokerList : () => BrokerList
+ , getBrokerList: () => BrokerList
)
(implicit protected[this] val ec: ExecutionContext, val cf: ClusterFeatures) extends OffsetCache {
@@ -861,17 +879,17 @@ case class OffsetCachePassive(curator: CuratorFramework
private[this] val consumersPathChildrenCache = new PathChildrenCache(curator, ZkUtils.ConsumersPath, true)
@volatile
- private[this] var consumersTreeCacheLastUpdateMillis : Long = System.currentTimeMillis()
+ private[this] var consumersTreeCacheLastUpdateMillis: Long = System.currentTimeMillis()
- private[this] def withConsumersPathChildrenCache[T](fn: PathChildrenCache => T) : Option[T] = {
+ private[this] def withConsumersPathChildrenCache[T](fn: PathChildrenCache => T): Option[T] = {
Option(fn(consumersPathChildrenCache))
}
- protected def getTopicPartitionLeaders(topic: String) : Option[List[(Int, Option[BrokerIdentity])]] = partitionLeaders(topic)
+ protected def getTopicPartitionLeaders(topic: String): Option[List[(Int, Option[BrokerIdentity])]] = partitionLeaders(topic)
- protected def getTopicDescription(topic: String, interactive: Boolean) : Option[TopicDescription] = topicDescriptions(topic, interactive)
+ protected def getTopicDescription(topic: String, interactive: Boolean): Option[TopicDescription] = topicDescriptions(topic, interactive)
- override def start(): Unit = {
+ override def start(): Unit = {
super.start()
info("Starting consumers path children cache...")
consumersPathChildrenCache.start(StartMode.BUILD_INITIAL_CACHE)
@@ -889,9 +907,9 @@ case class OffsetCachePassive(curator: CuratorFramework
Try(consumersPathChildrenCache.close())
}
- protected def lastUpdateMillisZK : Long = consumersTreeCacheLastUpdateMillis
+ protected def lastUpdateMillisZK: Long = consumersTreeCacheLastUpdateMillis
- protected def readConsumerOffsetByTopicPartition(consumer: String, topic: String, tpi: Map[Int, TopicPartitionIdentity]) : Map[Int, Long] = {
+ protected def readConsumerOffsetByTopicPartition(consumer: String, topic: String, tpi: Map[Int, TopicPartitionIdentity]): Map[Int, Long] = {
tpi.map {
case (p, _) =>
val offsetPath = "%s/%s/%s/%s/%s".format(ZkUtils.ConsumersPath, consumer, "offsets", topic, p)
@@ -899,7 +917,7 @@ case class OffsetCachePassive(curator: CuratorFramework
}
}
- protected def readConsumerOwnerByTopicPartition(consumer: String, topic: String, tpi: Map[Int, TopicPartitionIdentity]) : Map[Int, String] = {
+ protected def readConsumerOwnerByTopicPartition(consumer: String, topic: String, tpi: Map[Int, TopicPartitionIdentity]): Map[Int, String] = {
tpi.map {
case (p, _) =>
val ownerPath = "%s/%s/%s/%s/%s".format(ZkUtils.ConsumersPath, consumer, "owners", topic, p)
@@ -907,10 +925,10 @@ case class OffsetCachePassive(curator: CuratorFramework
}.filter(_._2 != null)
}
- protected def getConsumerTopicsFromIds(consumer: String) : Set[String] = {
- val zkPath = "%s/%s/%s".format(ZkUtils.ConsumersPath,consumer,"ids")
+ protected def getConsumerTopicsFromIds(consumer: String): Set[String] = {
+ val zkPath = "%s/%s/%s".format(ZkUtils.ConsumersPath, consumer, "ids")
val ids = Try(Option(curator.getChildren.forPath(zkPath)).map(_.asScala.toIterable)).toOption.flatten.getOrElse(Iterable.empty)
- val topicList : Iterable[Iterable[String]] = for {
+ val topicList: Iterable[Iterable[String]] = for {
id <- ids
idPath = "%s/%s".format(zkPath, id)
} yield {
@@ -920,13 +938,13 @@ case class OffsetCachePassive(curator: CuratorFramework
topicList.flatten.toSet
}
- protected def getConsumerTopicsFromOffsets(consumer: String) : Set[String] = {
- val zkPath = "%s/%s/%s".format(ZkUtils.ConsumersPath,consumer,"offsets")
+ protected def getConsumerTopicsFromOffsets(consumer: String): Set[String] = {
+ val zkPath = "%s/%s/%s".format(ZkUtils.ConsumersPath, consumer, "offsets")
Try(Option(curator.getChildren.forPath(zkPath)).map(_.asScala.toSet)).toOption.flatten.getOrElse(Set.empty)
}
- protected def getConsumerTopicsFromOwners(consumer: String) : Set[String] = {
- val zkPath = "%s/%s/%s".format(ZkUtils.ConsumersPath,consumer,"owners")
+ protected def getConsumerTopicsFromOwners(consumer: String): Set[String] = {
+ val zkPath = "%s/%s/%s".format(ZkUtils.ConsumersPath, consumer, "owners")
Try(Option(curator.getChildren.forPath(zkPath)).map(_.asScala.toSet)).toOption.flatten.getOrElse(Set.empty)
}
@@ -952,6 +970,7 @@ case class KafkaStateActorConfig(curator: CuratorFramework
, consumerProperties: Option[Properties]
, kafkaManagedOffsetCacheConfig: KafkaManagedOffsetCacheConfig
)
+
class KafkaStateActor(config: KafkaStateActorConfig) extends BaseClusterQueryCommandActor with LongRunningPoolActor {
protected implicit val clusterContext: ClusterContext = config.clusterContext
@@ -970,23 +989,23 @@ class KafkaStateActor(config: KafkaStateActorConfig) extends BaseClusterQueryCom
self.path,
config.consumerProperties
)
- private[this] val kaProps = Props(classOf[KafkaAdminClientActor],kaConfig)
- private[this] val kafkaAdminClientActor : ActorPath = context.actorOf(kaProps.withDispatcher(config.pinnedDispatcherName),"kafka-admin-client").path
+ private[this] val kaProps = Props(classOf[KafkaAdminClientActor], kaConfig)
+ private[this] val kafkaAdminClientActor: ActorPath = context.actorOf(kaProps.withDispatcher(config.pinnedDispatcherName), "kafka-admin-client").path
private[this] val kafkaAdminClient = new KafkaAdminClient(context, kafkaAdminClientActor)
// e.g. /brokers/topics/analytics_content/partitions/0/state
- private[this] val topicsTreeCache = new TreeCache(config.curator,ZkUtils.BrokerTopicsPath)
+ private[this] val topicsTreeCache = new TreeCache(config.curator, ZkUtils.BrokerTopicsPath)
- private[this] val topicsConfigPathCache = new PathChildrenCache(config.curator,ZkUtils.TopicConfigPath,true)
+ private[this] val topicsConfigPathCache = new PathChildrenCache(config.curator, ZkUtils.TopicConfigPath, true)
- private[this] val brokersPathCache = new PathChildrenCache(config.curator,ZkUtils.BrokerIdsPath,true)
+ private[this] val brokersPathCache = new PathChildrenCache(config.curator, ZkUtils.BrokerIdsPath, true)
- private[this] val adminPathCache = new PathChildrenCache(config.curator,ZkUtils.AdminPath,true)
+ private[this] val adminPathCache = new PathChildrenCache(config.curator, ZkUtils.AdminPath, true)
- private[this] val deleteTopicsPathCache = new PathChildrenCache(config.curator, ZkUtils.DeleteTopicsPath,true)
+ private[this] val deleteTopicsPathCache = new PathChildrenCache(config.curator, ZkUtils.DeleteTopicsPath, true)
@volatile
- private[this] var topicsTreeCacheLastUpdateMillis : Long = System.currentTimeMillis()
+ private[this] var topicsTreeCacheLastUpdateMillis: Long = System.currentTimeMillis()
private[this] val topicsTreeCacheListener = new TreeCacheListener {
override def childEvent(client: CuratorFramework, event: TreeCacheEvent): Unit = {
@@ -1001,10 +1020,10 @@ class KafkaStateActor(config: KafkaStateActorConfig) extends BaseClusterQueryCom
}
@volatile
- private[this] var preferredLeaderElection : Option[PreferredReplicaElection] = None
+ private[this] var preferredLeaderElection: Option[PreferredReplicaElection] = None
@volatile
- private[this] var reassignPartitions : Option[ReassignPartitions] = None
+ private[this] var reassignPartitions: Option[ReassignPartitions] = None
private[this] val adminPathCacheListener = new PathChildrenCacheListener {
override def childEvent(client: CuratorFramework, event: PathChildrenCacheEvent): Unit = {
@@ -1027,7 +1046,7 @@ class KafkaStateActor(config: KafkaStateActorConfig) extends BaseClusterQueryCom
}
private[this] def updatePreferredLeaderElection(cd: ChildData): Unit = {
- if(cd != null && cd.getPath.endsWith(ZkUtils.PreferredReplicaLeaderElectionPath)) {
+ if (cd != null && cd.getPath.endsWith(ZkUtils.PreferredReplicaLeaderElectionPath)) {
Try {
self ! KSUpdatePreferredLeaderElection(cd.getStat.getMtime, cd.getData)
}
@@ -1035,7 +1054,7 @@ class KafkaStateActor(config: KafkaStateActorConfig) extends BaseClusterQueryCom
}
private[this] def updateReassignPartition(cd: ChildData): Unit = {
- if(cd != null && cd.getPath.endsWith(ZkUtils.ReassignPartitionsPath)) {
+ if (cd != null && cd.getPath.endsWith(ZkUtils.ReassignPartitionsPath)) {
Try {
self ! KSUpdateReassignPartition(cd.getStat.getMtime, cd.getData)
}
@@ -1043,7 +1062,7 @@ class KafkaStateActor(config: KafkaStateActorConfig) extends BaseClusterQueryCom
}
private[this] def endPreferredLeaderElection(cd: ChildData): Unit = {
- if(cd != null && cd.getPath.endsWith(ZkUtils.PreferredReplicaLeaderElectionPath)) {
+ if (cd != null && cd.getPath.endsWith(ZkUtils.PreferredReplicaLeaderElectionPath)) {
Try {
self ! KSEndPreferredLeaderElection(cd.getStat.getMtime)
}
@@ -1051,7 +1070,7 @@ class KafkaStateActor(config: KafkaStateActorConfig) extends BaseClusterQueryCom
}
private[this] def endReassignPartition(cd: ChildData): Unit = {
- if(cd != null && cd.getPath.endsWith(ZkUtils.ReassignPartitionsPath)) {
+ if (cd != null && cd.getPath.endsWith(ZkUtils.ReassignPartitionsPath)) {
Try {
self ! KSEndReassignPartition(cd.getStat.getMtime)
}
@@ -1060,7 +1079,7 @@ class KafkaStateActor(config: KafkaStateActorConfig) extends BaseClusterQueryCom
}
private[this] lazy val offsetCache: OffsetCache = {
- if(config.clusterContext.config.activeOffsetCacheEnabled)
+ if (config.clusterContext.config.activeOffsetCacheEnabled)
new OffsetCacheActive(config.curator
, kafkaAdminClient
, config.clusterContext
@@ -1074,13 +1093,13 @@ class KafkaStateActor(config: KafkaStateActorConfig) extends BaseClusterQueryCom
, () => getBrokerList
)(longRunningExecutionContext, cf)
else
- new OffsetCachePassive( config.curator
+ new OffsetCachePassive(config.curator
, kafkaAdminClient
, config.clusterContext
, getPartitionLeaders
, getTopicDescription
, config.partitionOffsetCacheTimeoutSecs
- , config .simpleConsumerSocketTimeoutMillis
+ , config.simpleConsumerSocketTimeoutMillis
, config.clusterContext.config.version
, config.consumerProperties
, config.kafkaManagedOffsetCacheConfig
@@ -1151,9 +1170,9 @@ class KafkaStateActor(config: KafkaStateActorConfig) extends BaseClusterQueryCom
super.postStop()
}
- def getTopicZookeeperData(topic: String): Option[(Int,String)] = {
- val topicPath = "%s/%s".format(ZkUtils.BrokerTopicsPath,topic)
- Option(topicsTreeCache.getCurrentData(topicPath)).map( childData => (childData.getStat.getVersion,asString(childData.getData)))
+ def getTopicZookeeperData(topic: String): Option[(Int, String)] = {
+ val topicPath = "%s/%s".format(ZkUtils.BrokerTopicsPath, topic)
+ Option(topicsTreeCache.getCurrentData(topicPath)).map(childData => (childData.getStat.getVersion, asString(childData.getData)))
}
def getTopicPartitionOffsetsNotFuture(topic: String, interactive: Boolean): PartitionOffsetsCapture = {
@@ -1168,12 +1187,12 @@ class KafkaStateActor(config: KafkaStateActorConfig) extends BaseClusterQueryCom
partitionOffsets
}
- def getTopicDescription(topic: String, interactive: Boolean) : Option[TopicDescription] = {
+ def getTopicDescription(topic: String, interactive: Boolean): Option[TopicDescription] = {
for {
description <- getTopicZookeeperData(topic)
partitionsPath = "%s/%s/partitions".format(ZkUtils.BrokerTopicsPath, topic)
partitions: Map[String, ChildData] <- Option(topicsTreeCache.getCurrentChildren(partitionsPath)).map(_.asScala.toMap)
- states : Map[String, String] = partitions flatMap { case (part, _) =>
+ states: Map[String, String] = partitions flatMap { case (part, _) =>
val statePath = s"$partitionsPath/$part/state"
Option(topicsTreeCache.getCurrentData(statePath)).map(cd => (part, asString(cd.getData)))
}
@@ -1182,34 +1201,35 @@ class KafkaStateActor(config: KafkaStateActorConfig) extends BaseClusterQueryCom
} yield TopicDescription(topic, description, Option(states), partitionOffsets, topicConfig)
}
- def getPartitionLeaders(topic: String) : Option[List[(Int, Option[BrokerIdentity])]] = {
+ def getPartitionLeaders(topic: String): Option[List[(Int, Option[BrokerIdentity])]] = {
val partitionsPath = "%s/%s/partitions".format(ZkUtils.BrokerTopicsPath, topic)
val partitions: Option[Map[String, ChildData]] = Option(topicsTreeCache.getCurrentChildren(partitionsPath)).map(_.asScala.toMap)
- val states : Option[Iterable[(String, String)]] =
- partitions.map[Iterable[(String,String)]]{ partMap: Map[String, ChildData] =>
+ val states: Option[Iterable[(String, String)]] =
+ partitions.map[Iterable[(String, String)]] { partMap: Map[String, ChildData] =>
partMap.flatMap { case (part, _) =>
val statePath = s"$partitionsPath/$part/state"
Option(topicsTreeCache.getCurrentData(statePath)).map(cd => (part, asString(cd.getData)))
}
}
- val targetBrokers : IndexedSeq[BrokerIdentity] = getBrokers
+ val targetBrokers: IndexedSeq[BrokerIdentity] = getBrokers
import org.json4s.jackson.JsonMethods.parse
import org.json4s.scalaz.JsonScalaz.field
- states.map(_.map{case (part, state) =>
+ states.map(_.map { case (part, state) =>
val partition = part.toInt
val descJson = parse(state)
val leaderID = field[Int]("leader")(descJson).fold({ e =>
- log.error(s"[topic=$topic] Failed to get partitions from topic json $state"); 0}, identity)
+ log.error(s"[topic=$topic] Failed to get partitions from topic json $state"); 0
+ }, identity)
val leader = targetBrokers.find(_.id == leaderID)
(partition, leader)
}.toList)
}
- private[this] def getTopicConfigString(topic: String) : Option[(Int,String)] = {
+ private[this] def getTopicConfigString(topic: String): Option[(Int, String)] = {
val data: mutable.Buffer[ChildData] = topicsConfigPathCache.getCurrentData.asScala
val result: Option[ChildData] = data.find(p => p.getPath.endsWith("/" + topic))
- result.map(cd => (cd.getStat.getVersion,asString(cd.getData)))
+ result.map(cd => (cd.getStat.getVersion, asString(cd.getData)))
}
override def processActorResponse(response: ActorResponse): Unit = {
@@ -1219,7 +1239,7 @@ class KafkaStateActor(config: KafkaStateActorConfig) extends BaseClusterQueryCom
}
- private[this] def getBrokers : IndexedSeq[BrokerIdentity] = {
+ private[this] def getBrokers: IndexedSeq[BrokerIdentity] = {
val data: mutable.Buffer[ChildData] = brokersPathCache.getCurrentData.asScala
data.map { cd =>
BrokerIdentity.from(nodeFromPath(cd.getPath).toInt, asString(cd.getData))
@@ -1235,7 +1255,7 @@ class KafkaStateActor(config: KafkaStateActorConfig) extends BaseClusterQueryCom
}.toIndexedSeq.sortBy(_.id)
}
- private[this] def asyncPipeToSender[T](fn: => T): Unit = {
+ private[this] def asyncPipeToSender[T](fn: => T): Unit = {
implicit val ec = longRunningExecutionContext
val result: Future[T] = Future {
fn
@@ -1292,7 +1312,7 @@ class KafkaStateActor(config: KafkaStateActorConfig) extends BaseClusterQueryCom
//since we want to update offsets, let's do so if last update plus offset cache timeout is before current time
if (topicsTreeCacheLastUpdateMillis > lastUpdateMillis || ((topicsTreeCacheLastUpdateMillis + (config.partitionOffsetCacheTimeoutSecs * 1000)) < System.currentTimeMillis())) {
//we have option here since there may be no topics at all!
- withTopicsTreeCache { cache: TreeCache =>
+ withTopicsTreeCache { cache: TreeCache =>
cache.getCurrentChildren(ZkUtils.BrokerTopicsPath)
}.fold {
sender ! TopicDescriptions(IndexedSeq.empty, topicsTreeCacheLastUpdateMillis)
@@ -1328,13 +1348,13 @@ class KafkaStateActor(config: KafkaStateActorConfig) extends BaseClusterQueryCom
}
}
- private def getBrokerList : BrokerList = {
+ private def getBrokerList: BrokerList = {
BrokerList(getBrokers, config.clusterContext)
}
override def processCommandRequest(request: CommandRequest): Unit = {
request match {
- case KSUpdatePreferredLeaderElection(millis,json) =>
+ case KSUpdatePreferredLeaderElection(millis, json) =>
safeExecute {
val s: Set[TopicAndPartition] = PreferredReplicaLeaderElectionCommand.parsePreferredReplicaElectionData(json)
preferredLeaderElection.fold {
@@ -1351,12 +1371,12 @@ class KafkaStateActor(config: KafkaStateActorConfig) extends BaseClusterQueryCom
}
}
}
- case KSUpdateReassignPartition(millis,json) =>
+ case KSUpdateReassignPartition(millis, json) =>
safeExecute {
- val m : Map[TopicAndPartition, Seq[Int]] = ReassignPartitionCommand.parsePartitionReassignmentZkData(json)
+ val m: Map[TopicAndPartition, Seq[Int]] = ReassignPartitionCommand.parsePartitionReassignmentZkData(json)
reassignPartitions.fold {
//nothing there, add as new
- reassignPartitions = Some(ReassignPartitions(getDateTime(millis),m, None, config.clusterContext))
+ reassignPartitions = Some(ReassignPartitions(getDateTime(millis), m, None, config.clusterContext))
} {
existing =>
existing.endTime.fold {
@@ -1364,7 +1384,7 @@ class KafkaStateActor(config: KafkaStateActorConfig) extends BaseClusterQueryCom
reassignPartitions = Some(existing.copy(partitionsToBeReassigned = existing.partitionsToBeReassigned ++ m))
} { _ =>
//new op started
- reassignPartitions = Some(ReassignPartitions(getDateTime(millis),m, None, config.clusterContext))
+ reassignPartitions = Some(ReassignPartitions(getDateTime(millis), m, None, config.clusterContext))
}
}
}
@@ -1384,33 +1404,34 @@ class KafkaStateActor(config: KafkaStateActorConfig) extends BaseClusterQueryCom
}
}
- private[this] def getDateTime(millis: Long) : DateTime = new DateTime(millis,DateTimeZone.UTC)
+ private[this] def getDateTime(millis: Long): DateTime = new DateTime(millis, DateTimeZone.UTC)
- private[this] def safeExecute(fn: => Any) : Unit = {
+ private[this] def safeExecute(fn: => Any): Unit = {
Try(fn) match {
case Failure(t) =>
- log.error("Failed!",t)
+ log.error("Failed!", t)
case Success(_) =>
//do nothing
}
}
- private[this] def withTopicsTreeCache[T](fn: TreeCache => T) : Option[T] = {
+ private[this] def withTopicsTreeCache[T](fn: TreeCache => T): Option[T] = {
Option(fn(topicsTreeCache))
}
//---------------------------------------------------
- private[this] var kafkaTopicOffsetGetter : Option[KafkaTopicOffsetGetter] = None
+ private[this] var kafkaTopicOffsetGetter: Option[KafkaTopicOffsetGetter] = None
private[this] var kafkaTopicOffsetMap = new TrieMap[String, Map[Int, Long]]
private[this] var kafkaTopicOffsetCaptureMap = new TrieMap[String, PartitionOffsetsCapture]
- def startTopicOffsetGetter() : Unit = {
+
+ def startTopicOffsetGetter(): Unit = {
log.info("Starting kafka managed Topic Offset Getter ...")
kafkaTopicOffsetGetter = Option(new KafkaTopicOffsetGetter())
val topicOffsetGetterThread = new Thread(kafkaTopicOffsetGetter.get, "KafkaTopicOffsetGetter")
topicOffsetGetterThread.start()
}
- def stopTopicOffsetGetter() : Unit = {
+ def stopTopicOffsetGetter(): Unit = {
kafkaTopicOffsetGetter.foreach {
kto =>
Try {
@@ -1429,7 +1450,7 @@ class KafkaStateActor(config: KafkaStateActorConfig) extends BaseClusterQueryCom
while (!shutdown) {
try {
- withTopicsTreeCache { cache: TreeCache =>
+ withTopicsTreeCache { cache: TreeCache =>
cache.getCurrentChildren(ZkUtils.BrokerTopicsPath)
}.fold {
} { data: java.util.Map[String, ChildData] =>
@@ -1440,13 +1461,13 @@ class KafkaStateActor(config: KafkaStateActorConfig) extends BaseClusterQueryCom
if (shutdown) {
return
}
- var optPartitionsWithLeaders : Option[List[(Int, Option[BrokerIdentity])]] = getPartitionLeaders(topic)
+ var optPartitionsWithLeaders: Option[List[(Int, Option[BrokerIdentity])]] = getPartitionLeaders(topic)
optPartitionsWithLeaders match {
case Some(leaders) =>
leaders.foreach(leader => {
leader._2 match {
case Some(brokerIden) =>
- var tlList : List[(TopicAndPartition, PartitionOffsetRequestInfo)] = null
+ var tlList: List[(TopicAndPartition, PartitionOffsetRequestInfo)] = null
if (broker2TopicPartitionMap.contains(brokerIden)) {
tlList = broker2TopicPartitionMap(brokerIden)
} else {
@@ -1476,11 +1497,11 @@ class KafkaStateActor(config: KafkaStateActorConfig) extends BaseClusterQueryCom
consumerProperties.put(BOOTSTRAP_SERVERS_CONFIG, s"${broker.host}:$port")
consumerProperties.put(SECURITY_PROTOCOL_CONFIG, securityProtocol.stringId)
// Use secure endpoint if available
- if(kaConfig.clusterContext.config.saslMechanism.nonEmpty){
+ if (kaConfig.clusterContext.config.saslMechanism.nonEmpty) {
consumerProperties.put(SaslConfigs.SASL_MECHANISM, kaConfig.clusterContext.config.saslMechanism.get.stringId)
log.info(s"SASL Mechanism =${kaConfig.clusterContext.config.saslMechanism.get}")
}
- if(kaConfig.clusterContext.config.jaasConfig.nonEmpty){
+ if (kaConfig.clusterContext.config.jaasConfig.nonEmpty) {
consumerProperties.put(SaslConfigs.SASL_JAAS_CONFIG, kaConfig.clusterContext.config.jaasConfig.get)
log.info(s"SASL JAAS config=${kaConfig.clusterContext.config.jaasConfig.get}")
}
@@ -1539,5 +1560,6 @@ class KafkaStateActor(config: KafkaStateActorConfig) extends BaseClusterQueryCom
properties
}
}
+
}
diff --git a/app/kafka/manager/features/KMFeature.scala b/app/kafka/manager/features/KMFeature.scala
index a5c2916d8..487608c8a 100644
--- a/app/kafka/manager/features/KMFeature.scala
+++ b/app/kafka/manager/features/KMFeature.scala
@@ -20,6 +20,7 @@ sealed trait ClusterFeature extends KMFeature
case object KMLogKafkaFeature extends ClusterFeature
case object KMDeleteTopicFeature extends ClusterFeature
+case object KMRestrictedFeature extends ClusterFeature
case object KMJMXMetricsFeature extends ClusterFeature
case object KMDisplaySizeFeature extends ClusterFeature
case object KMPollConsumersFeature extends ClusterFeature
@@ -73,6 +74,9 @@ object ClusterFeatures {
if(clusterConfig.pollConsumers)
buffer+=KMPollConsumersFeature
+ if(clusterConfig.restrictOperations)
+ buffer+=KMRestrictedFeature
+
ClusterFeatures(buffer.toSet)
}
}
diff --git a/app/kafka/manager/model/model.scala b/app/kafka/manager/model/model.scala
index bf8e996bb..c58d9d7c0 100644
--- a/app/kafka/manager/model/model.scala
+++ b/app/kafka/manager/model/model.scala
@@ -185,6 +185,7 @@ object ClusterConfig {
, jmxUser: Option[String]
, jmxPass: Option[String]
, jmxSsl: Boolean
+ , restrictOperations:Boolean
, pollConsumers: Boolean
, filterConsumers: Boolean
, logkafkaEnabled: Boolean = false
@@ -210,6 +211,7 @@ object ClusterConfig {
, jmxUser
, jmxPass
, jmxSsl
+ , restrictOperations
, pollConsumers
, filterConsumers
, logkafkaEnabled
@@ -223,10 +225,10 @@ object ClusterConfig {
}
def customUnapply(cc: ClusterConfig) : Option[(
- String, String, String, Int, Boolean, Option[String], Option[String], Boolean, Boolean, Boolean, Boolean, Boolean, Boolean, Option[ClusterTuning], String, Option[String], Option[String])] = {
+ String, String, String, Int, Boolean, Option[String], Option[String], Boolean, Boolean, Boolean, Boolean, Boolean, Boolean, Boolean, Option[ClusterTuning], String, Option[String], Option[String])] = {
Some((
cc.name, cc.version.toString, cc.curatorConfig.zkConnect, cc.curatorConfig.zkMaxRetry,
- cc.jmxEnabled, cc.jmxUser, cc.jmxPass, cc.jmxSsl, cc.pollConsumers, cc.filterConsumers,
+ cc.jmxEnabled, cc.jmxUser, cc.jmxPass, cc.jmxSsl, cc.restrictOperations, cc.pollConsumers, cc.filterConsumers,
cc.logkafkaEnabled, cc.activeOffsetCacheEnabled, cc.displaySizeEnabled, cc.tuning, cc.securityProtocol.stringId, cc.saslMechanism.map(_.stringId), cc.jaasConfig
)
)
@@ -264,6 +266,7 @@ object ClusterConfig {
:: ("jmxUser" -> toJSON(config.jmxUser))
:: ("jmxPass" -> toJSON(config.jmxPass))
:: ("jmxSsl" -> toJSON(config.jmxSsl))
+ :: ("restrictOperations" -> toJSON(config.restrictOperations))
:: ("pollConsumers" -> toJSON(config.pollConsumers))
:: ("filterConsumers" -> toJSON(config.filterConsumers))
:: ("logkafkaEnabled" -> toJSON(config.logkafkaEnabled))
@@ -290,6 +293,7 @@ object ClusterConfig {
val jmxUser = fieldExtended[Option[String]]("jmxUser")(json)
val jmxPass = fieldExtended[Option[String]]("jmxPass")(json)
val jmxSsl = fieldExtended[Boolean]("jmxSsl")(json)
+ val restrictOperations = fieldExtended[Boolean]("restrictOperations")(json)
val pollConsumers = fieldExtended[Boolean]("pollConsumers")(json)
val filterConsumers = fieldExtended[Boolean]("filterConsumers")(json)
val logkafkaEnabled = fieldExtended[Boolean]("logkafkaEnabled")(json)
@@ -310,6 +314,7 @@ object ClusterConfig {
jmxUser.getOrElse(None),
jmxPass.getOrElse(None),
jmxSsl.getOrElse(false),
+ restrictOperations.getOrElse(false),
pollConsumers.getOrElse(false),
filterConsumers.getOrElse(true),
logkafkaEnabled.getOrElse(false),
@@ -442,6 +447,7 @@ case class ClusterConfig (name: String
, jmxUser: Option[String]
, jmxPass: Option[String]
, jmxSsl: Boolean
+ , restrictOperations:Boolean
, pollConsumers: Boolean
, filterConsumers: Boolean
, logkafkaEnabled: Boolean
diff --git a/app/kafka/manager/utils/logkafka81/LogConfig.scala b/app/kafka/manager/utils/logkafka81/LogConfig.scala
index 69996945b..59b3f5d2d 100644
--- a/app/kafka/manager/utils/logkafka81/LogConfig.scala
+++ b/app/kafka/manager/utils/logkafka81/LogConfig.scala
@@ -173,7 +173,8 @@ object LogConfig extends LogkafkaNewConfigs {
*/
def fromProps(defaults: Properties, overrides: Properties): LogConfig = {
val props = new Properties(defaults)
- props.putAll(overrides)
+ //to handle to Scala bug with Java 9+ versions https://github.com/scala/bug/issues/10418
+ overrides.forEach((k, v) => props.put(k, v))
fromProps(props)
}
diff --git a/app/kafka/manager/utils/logkafka82/LogConfig.scala b/app/kafka/manager/utils/logkafka82/LogConfig.scala
index e7e488232..4d561864e 100644
--- a/app/kafka/manager/utils/logkafka82/LogConfig.scala
+++ b/app/kafka/manager/utils/logkafka82/LogConfig.scala
@@ -173,7 +173,8 @@ object LogConfig extends LogkafkaNewConfigs {
*/
def fromProps(defaults: Properties, overrides: Properties): LogConfig = {
val props = new Properties(defaults)
- props.putAll(overrides)
+ //to handle to Scala bug with Java 9+ versions https://github.com/scala/bug/issues/10418
+ overrides.forEach((k, v) => props.put(k, v))
fromProps(props)
}
diff --git a/app/kafka/manager/utils/zero10/LogConfig.scala b/app/kafka/manager/utils/zero10/LogConfig.scala
index 6c43b66bb..02e6ffd8a 100644
--- a/app/kafka/manager/utils/zero10/LogConfig.scala
+++ b/app/kafka/manager/utils/zero10/LogConfig.scala
@@ -309,8 +309,9 @@ object LogConfig extends TopicConfigs {
*/
def fromProps(defaults: java.util.Map[_ <: Object, _ <: Object], overrides: Properties): LogConfig = {
val props = new Properties()
- props.putAll(defaults)
- props.putAll(overrides)
+ //to handle to Scala bug with Java 9+ versions https://github.com/scala/bug/issues/10418
+ defaults.forEach((k, v) => props.put(k, v))
+ overrides.forEach((k, v) => props.put(k, v))
LogConfig(props)
}
diff --git a/app/kafka/manager/utils/zero11/LogConfig.scala b/app/kafka/manager/utils/zero11/LogConfig.scala
index 6ceb0e5f7..2814527c8 100644
--- a/app/kafka/manager/utils/zero11/LogConfig.scala
+++ b/app/kafka/manager/utils/zero11/LogConfig.scala
@@ -269,8 +269,9 @@ object LogConfig extends TopicConfigs {
*/
def fromProps(defaults: java.util.Map[_ <: Object, _ <: Object], overrides: Properties): LogConfig = {
val props = new Properties()
- props.putAll(defaults)
- props.putAll(overrides)
+ //to handle to Scala bug with Java 9+ versions https://github.com/scala/bug/issues/10418
+ defaults.forEach((k, v) => props.put(k, v))
+ overrides.forEach((k, v) => props.put(k, v))
LogConfig(props)
}
diff --git a/app/kafka/manager/utils/zero81/LogConfig.scala b/app/kafka/manager/utils/zero81/LogConfig.scala
index c391a7b77..b90b1af9b 100644
--- a/app/kafka/manager/utils/zero81/LogConfig.scala
+++ b/app/kafka/manager/utils/zero81/LogConfig.scala
@@ -114,7 +114,8 @@ object LogConfig extends TopicConfigs {
*/
def fromProps(defaults: Properties, overrides: Properties): LogConfig = {
val props = new Properties(defaults)
- props.putAll(overrides)
+ //to handle to Scala bug with Java 9+ versions https://github.com/scala/bug/issues/10418
+ overrides.forEach((k, v) => props.put(k, v))
fromProps(props)
}
diff --git a/app/kafka/manager/utils/zero82/LogConfig.scala b/app/kafka/manager/utils/zero82/LogConfig.scala
index db5452f05..a7e068b70 100644
--- a/app/kafka/manager/utils/zero82/LogConfig.scala
+++ b/app/kafka/manager/utils/zero82/LogConfig.scala
@@ -181,7 +181,8 @@ object LogConfig extends TopicConfigs {
*/
def fromProps(defaults: Properties, overrides: Properties): LogConfig = {
val props = new Properties(defaults)
- props.putAll(overrides)
+ //to handle to Scala bug with Java 9+ versions https://github.com/scala/bug/issues/10418
+ overrides.forEach((k, v) => props.put(k, v))
fromProps(props)
}
diff --git a/app/kafka/manager/utils/zero90/LogConfig.scala b/app/kafka/manager/utils/zero90/LogConfig.scala
index 46a460cd8..435569994 100644
--- a/app/kafka/manager/utils/zero90/LogConfig.scala
+++ b/app/kafka/manager/utils/zero90/LogConfig.scala
@@ -173,8 +173,9 @@ object LogConfig extends TopicConfigs {
*/
def fromProps(defaults: java.util.Map[_ <: Object, _ <: Object], overrides: Properties): LogConfig = {
val props = new Properties()
- props.putAll(defaults)
- props.putAll(overrides)
+ //to handle to Scala bug with Java 9+ versions https://github.com/scala/bug/issues/10418
+ defaults.forEach((k, v) => props.put(k, v))
+ overrides.forEach((k, v) => props.put(k, v))
LogConfig(props)
}
diff --git a/app/models/form/ClusterOperation.scala b/app/models/form/ClusterOperation.scala
index 558a0adac..d35db73bd 100644
--- a/app/models/form/ClusterOperation.scala
+++ b/app/models/form/ClusterOperation.scala
@@ -39,6 +39,7 @@ object ClusterOperation {
, jmxUser: Option[String]
, jmxPass: Option[String]
, jmxSsl: Boolean
+ , restrictOperations: Boolean
, pollConsumers: Boolean
, filterConsumers: Boolean
, logkafkaEnabled: Boolean
@@ -50,14 +51,14 @@ object ClusterOperation {
, jaasConfig: Option[String]
): ClusterOperation = {
ClusterOperation(operation,ClusterConfig(name, version, zkHosts, zkMaxRetry, jmxEnabled, jmxUser, jmxPass, jmxSsl,
- pollConsumers, filterConsumers, logkafkaEnabled, activeOffsetCacheEnabled, displaySizeEnabled, tuning, securityProtocol, saslMechanism, jaasConfig))
+ restrictOperations,pollConsumers, filterConsumers, logkafkaEnabled, activeOffsetCacheEnabled, displaySizeEnabled, tuning, securityProtocol, saslMechanism, jaasConfig))
}
- def customUnapply(co: ClusterOperation) : Option[(String, String, String, String, Int, Boolean, Option[String], Option[String], Boolean, Boolean, Boolean, Boolean, Boolean, Boolean, Option[ClusterTuning], String, Option[String], Option[String])] = {
+ def customUnapply(co: ClusterOperation) : Option[(String, String, String, String, Int, Boolean, Option[String], Option[String], Boolean, Boolean, Boolean, Boolean, Boolean, Boolean, Boolean, Option[ClusterTuning], String, Option[String], Option[String])] = {
Option((co.op.toString, co.clusterConfig.name, co.clusterConfig.version.toString,
co.clusterConfig.curatorConfig.zkConnect, co.clusterConfig.curatorConfig.zkMaxRetry,
co.clusterConfig.jmxEnabled, co.clusterConfig.jmxUser, co.clusterConfig.jmxPass, co.clusterConfig.jmxSsl,
- co.clusterConfig.pollConsumers, co.clusterConfig.filterConsumers, co.clusterConfig.logkafkaEnabled,
+ co.clusterConfig.restrictOperations,co.clusterConfig.pollConsumers, co.clusterConfig.filterConsumers, co.clusterConfig.logkafkaEnabled,
co.clusterConfig.activeOffsetCacheEnabled, co.clusterConfig.displaySizeEnabled, co.clusterConfig.tuning, co.clusterConfig.securityProtocol.stringId,
co.clusterConfig.saslMechanism.map(_.stringId),
co.clusterConfig.jaasConfig))
diff --git a/app/models/navigation/Menus.scala b/app/models/navigation/Menus.scala
index 2c6afbcb1..54687f6f5 100644
--- a/app/models/navigation/Menus.scala
+++ b/app/models/navigation/Menus.scala
@@ -6,7 +6,7 @@
package models.navigation
import features.{KMTopicManagerFeature, KMClusterManagerFeature, KMPreferredReplicaElectionFeature, KMReassignPartitionsFeature, ApplicationFeatures}
-import kafka.manager.features.{KMLogKafkaFeature, ClusterFeatures}
+import kafka.manager.features.{KMLogKafkaFeature, ClusterFeatures, KMRestrictedFeature}
/**
* @author hiral
@@ -27,11 +27,11 @@ class Menus(implicit applicationFeatures: ApplicationFeatures) {
Option(Menu("Cluster", items, None))
}
- private[this] def topicMenu(cluster: String) : Option[Menu] = {
+ private[this] def topicMenu(cluster: String, clusterFeatures: ClusterFeatures) : Option[Menu] = {
val defaultItems = IndexedSeq("List".clusterRouteMenuItem(cluster))
val items = {
- if(applicationFeatures.features(KMTopicManagerFeature))
+ if(applicationFeatures.features(KMTopicManagerFeature) && !clusterFeatures.features(KMRestrictedFeature))
defaultItems.+:("Create".clusterRouteMenuItem(cluster))
else
defaultItems
@@ -75,7 +75,7 @@ class Menus(implicit applicationFeatures: ApplicationFeatures) {
IndexedSeq(
clusterMenu(cluster),
brokersMenu(cluster),
- topicMenu(cluster),
+ topicMenu(cluster, clusterFeatures),
preferredReplicaElectionMenu(cluster),
reassignPartitionsMenu(cluster),
consumersMenu(cluster),
diff --git a/app/views/cluster/addCluster.scala.html b/app/views/cluster/addCluster.scala.html
index acf42ee84..b4c50e9cc 100644
--- a/app/views/cluster/addCluster.scala.html
+++ b/app/views/cluster/addCluster.scala.html
@@ -31,6 +31,7 @@
@b4.text(form("jmxUser"), '_label -> "JMX Auth Username")
@b4.text(form("jmxPass"), '_label -> "JMX Auth Password")
@b4.checkbox(form("jmxSsl"), '_text -> "JMX with SSL")
+ @b4.checkbox(form("restrictOperations"), '_text -> "Restrict the Operations(Non-Disruptive View only mode)")
@b4.checkbox(form("logkafkaEnabled"), '_text -> "Enable Logkafka")
@b4.checkbox(form("pollConsumers"), '_text -> "Poll consumer information (Not recommended for large # of consumers if ZK is used for offsets tracking on older Kafka versions)")
@b4.checkbox(form("filterConsumers"), '_text -> "Filter out inactive consumers")
diff --git a/app/views/cluster/updateCluster.scala.html b/app/views/cluster/updateCluster.scala.html
index 5d21aedea..73a86426b 100644
--- a/app/views/cluster/updateCluster.scala.html
+++ b/app/views/cluster/updateCluster.scala.html
@@ -34,6 +34,7 @@
@b4.text(form("jmxUser"), '_label -> "JMX Auth Username")
@b4.text(form("jmxPass"), '_label -> "JMX Auth Password")
@b4.checkbox(form("jmxSsl"), '_text -> "JMX with SSL")
+ @b4.checkbox(form("restrictOperations"), '_text -> "Restrict the Operations(Non-Disruptive View only mode)")
@b4.checkbox(form("pollConsumers"), '_text -> "Poll consumer information (Not recommended for large # of consumers if ZK is used for offsets tracking on older Kafka versions)")
@b4.checkbox(form("filterConsumers"), '_text -> "Filter out inactive consumers")
@b4.checkbox(form("logkafkaEnabled"), '_text -> "Enable Logkafka")
diff --git a/app/views/topic/topicList.scala.html b/app/views/topic/topicList.scala.html
index 6f677cdc4..f67f3a2eb 100644
--- a/app/views/topic/topicList.scala.html
+++ b/app/views/topic/topicList.scala.html
@@ -37,6 +37,32 @@
}
}
+@renderOperations = {
+
+@if(errorOrTopics.fold(err=>false,tl=>tl.list.headOption.map(opt => opt._2.map(ti => ti.clusterContext.config.restrictOperations && ti.clusterContext.config.name.equals(cluster)).getOrElse(false)).getOrElse(false))){
+
+ Operations are restricted for @cluster Cluster (Enable at the cluster configurations)
+
+} else {
+
+}
+}
+
@main(
"Topic List",
menu = theMenu,
@@ -48,22 +74,7 @@
diff --git a/app/views/topic/topicViewContent.scala.html b/app/views/topic/topicViewContent.scala.html
index 2054e1524..4cc7f76c8 100644
--- a/app/views/topic/topicViewContent.scala.html
+++ b/app/views/topic/topicViewContent.scala.html
@@ -110,6 +110,64 @@
}
}
+@renderOperations = {
+ @if(topicIdentity.clusterContext.config.restrictOperations){
+
+ Operations are restricted for @cluster Cluster (Enable at the cluster configurations)
+
+ } else {
+
+
+
+ @if(topicIdentity.clusterContext.clusterFeatures.features(kafka.manager.features.KMDeleteTopicFeature)) {
+
+ Delete Topic
+ |
+ }
+ @features.app(features.KMReassignPartitionsFeature) {
+
+ @b4.vertical.form(routes.ReassignPartitions.handleOperation(cluster,topic)) { implicit fc =>
+ @reassignPartitionOperation match {
+ case ForceRunAssignment => {
+
+ }
+ case _ => {
+
+ }
+ }
+ }
+ |
+
+ Generate Partition Assignments
+ |
+ }
+
+
+
+ Add Partitions
+ |
+
+ Update Config
+ |
+ @features.app(features.KMReassignPartitionsFeature) {
+
+ Manual Partition Assignments
+ |
+ }
+
+
+
+ }
+}
+
@@ -204,55 +262,7 @@
-
-
-
- @if(topicIdentity.clusterContext.clusterFeatures.features(kafka.manager.features.KMDeleteTopicFeature)) {
-
- Delete Topic
- |
- }
- @features.app(features.KMReassignPartitionsFeature) {
-
- @b4.vertical.form(routes.ReassignPartitions.handleOperation(cluster,topic)) { implicit fc =>
- @reassignPartitionOperation match {
- case ForceRunAssignment => {
-
- }
- case _ => {
-
- }
- }
- }
- |
-
- Generate Partition Assignments
- |
- }
-
-
-
- Add Partitions
- |
-
- Update Config
- |
- @features.app(features.KMReassignPartitionsFeature) {
-
- Manual Partition Assignments
- |
- }
-
-
-
+ @renderOperations
}
diff --git a/test/kafka/manager/TestClusterManagerActor.scala b/test/kafka/manager/TestClusterManagerActor.scala
index 79ff48bcb..0f2dd6916 100644
--- a/test/kafka/manager/TestClusterManagerActor.scala
+++ b/test/kafka/manager/TestClusterManagerActor.scala
@@ -47,7 +47,7 @@ class TestClusterManagerActor extends CuratorAwareTest with BaseTest {
override protected def beforeAll(): Unit = {
super.beforeAll()
- val clusterConfig = ClusterConfig("dev","0.8.2.0",kafkaServerZkPath, jmxEnabled = false, pollConsumers = true, filterConsumers = true, logkafkaEnabled = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = Option(defaultTuning), securityProtocol="PLAINTEXT", saslMechanism=None, jaasConfig=None)
+ val clusterConfig = ClusterConfig("dev","0.8.2.0",kafkaServerZkPath, jmxEnabled = false, restrictOperations=false, pollConsumers = true, filterConsumers = true, logkafkaEnabled = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = Option(defaultTuning), securityProtocol="PLAINTEXT", saslMechanism=None, jaasConfig=None)
val curatorConfig = CuratorConfig(testServer.getConnectString)
val config = ClusterManagerActorConfig(
"pinned-dispatcher"
diff --git a/test/kafka/manager/TestKafkaManager.scala b/test/kafka/manager/TestKafkaManager.scala
index b4da3dd01..da7226792 100644
--- a/test/kafka/manager/TestKafkaManager.scala
+++ b/test/kafka/manager/TestKafkaManager.scala
@@ -125,7 +125,7 @@ class TestKafkaManager extends CuratorAwareTest with BaseTest {
}
test("add cluster") {
- val future = kafkaManager.addCluster("dev","2.2.0",kafkaServerZkPath, jmxEnabled = false, pollConsumers = true, filterConsumers = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = Option(kafkaManager.defaultTuning), securityProtocol="PLAINTEXT", saslMechanism = None, jaasConfig = None)
+ val future = kafkaManager.addCluster("dev","2.2.0",kafkaServerZkPath, jmxEnabled = false, restrictOperations=false, pollConsumers = true, filterConsumers = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = Option(kafkaManager.defaultTuning), securityProtocol="PLAINTEXT", saslMechanism = None, jaasConfig = None)
val result = Await.result(future,duration)
assert(result.isRight === true)
Thread.sleep(2000)
@@ -392,7 +392,7 @@ class TestKafkaManager extends CuratorAwareTest with BaseTest {
}
test("update cluster zkhost") {
- val future = kafkaManager.updateCluster("dev","2.2.0",testServer.getConnectString, jmxEnabled = false, pollConsumers = true, filterConsumers = true, jmxUser = None, jmxSsl = false, jmxPass = None, tuning = Option(defaultTuning), securityProtocol = "PLAINTEXT", saslMechanism = None, jaasConfig = None)
+ val future = kafkaManager.updateCluster("dev","2.2.0",testServer.getConnectString, jmxEnabled = false, restrictOperations = false, pollConsumers = true, filterConsumers = true, jmxUser = None, jmxSsl = false, jmxPass = None, tuning = Option(defaultTuning), securityProtocol = "PLAINTEXT", saslMechanism = None, jaasConfig = None)
val result = Await.result(future,duration)
assert(result.isRight === true)
@@ -427,7 +427,7 @@ class TestKafkaManager extends CuratorAwareTest with BaseTest {
}
test("update cluster version") {
- val future = kafkaManager.updateCluster("dev","0.8.1.1",testServer.getConnectString, jmxEnabled = false, pollConsumers = true, filterConsumers = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = Option(defaultTuning), securityProtocol = "PLAINTEXT", saslMechanism = None, jaasConfig = None)
+ val future = kafkaManager.updateCluster("dev","0.8.1.1",testServer.getConnectString, jmxEnabled = false, restrictOperations = false, pollConsumers = true, filterConsumers = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = Option(defaultTuning), securityProtocol = "PLAINTEXT", saslMechanism = None, jaasConfig = None)
val result = Await.result(future,duration)
assert(result.isRight === true)
Thread.sleep(2000)
@@ -449,7 +449,7 @@ class TestKafkaManager extends CuratorAwareTest with BaseTest {
}
test("update cluster logkafka enabled and activeOffsetCache enabled") {
- val future = kafkaManager.updateCluster("dev","2.2.0",testServer.getConnectString, jmxEnabled = false, pollConsumers = true, filterConsumers = true, logkafkaEnabled = true, activeOffsetCacheEnabled = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = Option(defaultTuning), securityProtocol = "PLAINTEXT", saslMechanism = None, jaasConfig = None)
+ val future = kafkaManager.updateCluster("dev","2.2.0",testServer.getConnectString, jmxEnabled = false, restrictOperations=false, pollConsumers = true, filterConsumers = true, logkafkaEnabled = true, activeOffsetCacheEnabled = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = Option(defaultTuning), securityProtocol = "PLAINTEXT", saslMechanism = None, jaasConfig = None)
val result = Await.result(future,duration)
assert(result.isRight === true)
diff --git a/test/kafka/manager/utils/TestClusterConfig.scala b/test/kafka/manager/utils/TestClusterConfig.scala
index 082d40667..045a13b5e 100644
--- a/test/kafka/manager/utils/TestClusterConfig.scala
+++ b/test/kafka/manager/utils/TestClusterConfig.scala
@@ -14,18 +14,18 @@ class TestClusterConfig extends FunSuite with Matchers {
test("invalid name") {
intercept[IllegalArgumentException] {
- ClusterConfig("qa!","0.8.1.1","localhost",jmxEnabled = false, pollConsumers = true, filterConsumers = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = None, securityProtocol = "PLAINTEXT", saslMechanism = None, jaasConfig = None)
+ ClusterConfig("qa!","0.8.1.1","localhost",jmxEnabled = false, restrictOperations=false, pollConsumers = true, filterConsumers = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = None, securityProtocol = "PLAINTEXT", saslMechanism = None, jaasConfig = None)
}
}
test("invalid kafka version") {
intercept[IllegalArgumentException] {
- ClusterConfig("qa","0.8.1","localhost:2181",jmxEnabled = false, pollConsumers = true, filterConsumers = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = None, securityProtocol = "PLAINTEXT", saslMechanism = None, jaasConfig = None)
+ ClusterConfig("qa","0.8.1","localhost:2181",jmxEnabled = false, restrictOperations=false, pollConsumers = true, filterConsumers = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = None, securityProtocol = "PLAINTEXT", saslMechanism = None, jaasConfig = None)
}
}
test("serialize and deserialize 0.8.1.1") {
- val cc = ClusterConfig("qa","0.8.2.0","localhost:2181", jmxEnabled = true, pollConsumers = true, filterConsumers = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = None, securityProtocol = "PLAINTEXT", saslMechanism = None, jaasConfig = None)
+ val cc = ClusterConfig("qa","0.8.2.0","localhost:2181", jmxEnabled = true, restrictOperations=false, pollConsumers = true, filterConsumers = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = None, securityProtocol = "PLAINTEXT", saslMechanism = None, jaasConfig = None)
val serialize: String = ClusterConfig.serialize(cc)
val deserialize = ClusterConfig.deserialize(serialize)
assert(deserialize.isSuccess === true)
@@ -33,7 +33,7 @@ class TestClusterConfig extends FunSuite with Matchers {
}
test("serialize and deserialize 0.8.2.0 +jmx credentials") {
- val cc = ClusterConfig("qa","0.8.2.0","localhost:2181", jmxEnabled = true, jmxUser = Some("mario"), jmxPass = Some("rossi"), jmxSsl = false, pollConsumers = true, filterConsumers = true, tuning = None, securityProtocol = "PLAINTEXT", saslMechanism = None, jaasConfig = None)
+ val cc = ClusterConfig("qa","0.8.2.0","localhost:2181", jmxEnabled = true, jmxUser = Some("mario"), jmxPass = Some("rossi"), jmxSsl = false, restrictOperations=false, pollConsumers = true, filterConsumers = true, tuning = None, securityProtocol = "PLAINTEXT", saslMechanism = None, jaasConfig = None)
val serialize: String = ClusterConfig.serialize(cc)
val deserialize = ClusterConfig.deserialize(serialize)
assert(deserialize.isSuccess === true)
@@ -41,7 +41,7 @@ class TestClusterConfig extends FunSuite with Matchers {
}
test("serialize and deserialize 0.8.2.0") {
- val cc = ClusterConfig("qa","0.8.2.0","localhost:2181", jmxEnabled = true, pollConsumers = true, filterConsumers = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = None, securityProtocol = "PLAINTEXT", saslMechanism = None, jaasConfig = None)
+ val cc = ClusterConfig("qa","0.8.2.0","localhost:2181", jmxEnabled = true, restrictOperations=false, pollConsumers = true, filterConsumers = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = None, securityProtocol = "PLAINTEXT", saslMechanism = None, jaasConfig = None)
val serialize: String = ClusterConfig.serialize(cc)
val deserialize = ClusterConfig.deserialize(serialize)
assert(deserialize.isSuccess === true)
@@ -49,7 +49,7 @@ class TestClusterConfig extends FunSuite with Matchers {
}
test("serialize and deserialize 0.8.2.1") {
- val cc = ClusterConfig("qa","0.8.2.1","localhost:2181", jmxEnabled = true, pollConsumers = true, filterConsumers = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = None, securityProtocol = "PLAINTEXT", saslMechanism = None, jaasConfig = None)
+ val cc = ClusterConfig("qa","0.8.2.1","localhost:2181", jmxEnabled = true, restrictOperations=false, pollConsumers = true, filterConsumers = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = None, securityProtocol = "PLAINTEXT", saslMechanism = None, jaasConfig = None)
val serialize: String = ClusterConfig.serialize(cc)
val deserialize = ClusterConfig.deserialize(serialize)
assert(deserialize.isSuccess === true)
@@ -57,7 +57,7 @@ class TestClusterConfig extends FunSuite with Matchers {
}
test("serialize and deserialize 0.8.2.2") {
- val cc = ClusterConfig("qa","0.8.2.2","localhost:2181", jmxEnabled = true, pollConsumers = true, filterConsumers = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = None, securityProtocol = "PLAINTEXT", saslMechanism = None, jaasConfig = None)
+ val cc = ClusterConfig("qa","0.8.2.2","localhost:2181", jmxEnabled = true, restrictOperations=false, pollConsumers = true, filterConsumers = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = None, securityProtocol = "PLAINTEXT", saslMechanism = None, jaasConfig = None)
val serialize: String = ClusterConfig.serialize(cc)
val deserialize = ClusterConfig.deserialize(serialize)
assert(deserialize.isSuccess === true)
@@ -65,7 +65,7 @@ class TestClusterConfig extends FunSuite with Matchers {
}
test("deserialize without version, jmxEnabled, and security protocol") {
- val cc = ClusterConfig("qa","0.8.2.0","localhost:2181", jmxEnabled = false, pollConsumers = true, filterConsumers = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = None, securityProtocol = "PLAINTEXT", saslMechanism = None, jaasConfig = None)
+ val cc = ClusterConfig("qa","0.8.2.0","localhost:2181", jmxEnabled = false, restrictOperations=false, pollConsumers = true, filterConsumers = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = None, securityProtocol = "PLAINTEXT", saslMechanism = None, jaasConfig = None)
val serialize: String = ClusterConfig.serialize(cc)
val noverison = serialize.replace(""","kafkaVersion":"0.8.2.0"""","").replace(""","jmxEnabled":false""","").replace(""","jmxSsl":false""","")
assert(!noverison.contains("kafkaVersion"))
@@ -77,7 +77,7 @@ class TestClusterConfig extends FunSuite with Matchers {
}
test("deserialize from 0.8.2-beta as 0.8.2.0") {
- val cc = ClusterConfig("qa","0.8.2-beta","localhost:2181", jmxEnabled = false, pollConsumers = true, filterConsumers = true, activeOffsetCacheEnabled = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = None, securityProtocol = "PLAINTEXT", saslMechanism = None, jaasConfig = None)
+ val cc = ClusterConfig("qa","0.8.2-beta","localhost:2181", jmxEnabled = false, restrictOperations=false, pollConsumers = true, filterConsumers = true, activeOffsetCacheEnabled = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = None, securityProtocol = "PLAINTEXT", saslMechanism = None, jaasConfig = None)
val serialize: String = ClusterConfig.serialize(cc)
val noverison = serialize.replace(""","kafkaVersion":"0.8.2.0"""",""","kafkaVersion":"0.8.2-beta"""")
val deserialize = ClusterConfig.deserialize(noverison)
@@ -86,7 +86,7 @@ class TestClusterConfig extends FunSuite with Matchers {
}
test("deserialize from 0.9.0.1") {
- val cc = ClusterConfig("qa","0.9.0.1","localhost:2181", jmxEnabled = false, pollConsumers = true, filterConsumers = true, activeOffsetCacheEnabled = true, jmxUser = None, jmxPass = None, jmxSsl = false,
+ val cc = ClusterConfig("qa","0.9.0.1","localhost:2181", jmxEnabled = false, restrictOperations=false, pollConsumers = true, filterConsumers = true, activeOffsetCacheEnabled = true, jmxUser = None, jmxPass = None, jmxSsl = false,
tuning = Option(ClusterTuning(
Option(1)
,Option(2)