diff --git a/app/kafka/manager/BrokerViewCacheActor.scala b/app/kafka/manager/BrokerViewCacheActor.scala index 08bd773df..b03234deb 100644 --- a/app/kafka/manager/BrokerViewCacheActor.scala +++ b/app/kafka/manager/BrokerViewCacheActor.scala @@ -174,7 +174,7 @@ class BrokerViewCacheActor(config: BrokerViewCacheActorConfig) extends LongRunni } val result = tryResult match { case scala.util.Failure(t) => - log.error(s"Failed to get topic metrics for broker $broker", t) + log.error(t, s"Failed to get topic metrics for broker $broker") topicPartitions.map { case (topic, id, partitions) => (topic.topic, BrokerMetrics.DEFAULT) @@ -198,7 +198,7 @@ class BrokerViewCacheActor(config: BrokerViewCacheActorConfig) extends LongRunni val result = tryResult match { case scala.util.Failure(t) => - log.error(s"Failed to get broker metrics for $broker", t) + log.error(t, s"Failed to get broker metrics for $broker") BrokerMetrics.DEFAULT case scala.util.Success(bm) => bm } diff --git a/app/kafka/manager/utils/AdminUtils.scala b/app/kafka/manager/utils/AdminUtils.scala index b46d43003..e4cb047d5 100644 --- a/app/kafka/manager/utils/AdminUtils.scala +++ b/app/kafka/manager/utils/AdminUtils.scala @@ -130,7 +130,7 @@ class AdminUtils(version: KafkaVersion) { checkCondition(curator.checkExists().forPath(topicPath) == null,TopicErrors.TopicAlreadyExists(topic)) } partitionReplicaAssignment.foreach { - case (part,reps) => checkCondition(reps.size == reps.toSet.size, TopicErrors.DuplicateReplicAssignment(topic,part,reps)) + case (part,reps) => checkCondition(reps.size == reps.toSet.size, TopicErrors.DuplicateReplicaAssignment(topic,part,reps)) } // write out the config on create, not update, if there is any, this isn't transactional with the partition assignments diff --git a/app/kafka/manager/utils/Topic.scala b/app/kafka/manager/utils/Topic.scala index 06468a535..33ee55447 100644 --- a/app/kafka/manager/utils/Topic.scala +++ b/app/kafka/manager/utils/Topic.scala @@ -77,7 +77,7 @@ object TopicErrors { def ReplicationGreaterThanNumBrokers(rf: Int, nb: Int) = new ReplicationGreaterThanNumBrokers(rf,nb) val InconsistentPartitionReplicas = new InconsistentPartitionReplicas def TopicAlreadyExists(topic: String) = new TopicAlreadyExists(topic) - def DuplicateReplicAssignment(topic: String, part: Int, replicas: Seq[Int]) = new DuplicateReplicaAssignment(topic,part,replicas) + def DuplicateReplicaAssignment(topic: String, part: Int, replicas: Seq[Int]) = new DuplicateReplicaAssignment(topic,part,replicas) def CannotAddZeroPartitions(topic: String, currentPartitions: Int, newPartitions:Int) = new CannotAddZeroPartitions(topic,currentPartitions,newPartitions) def FailedToAddNewPartitions(topic: String, newPartitions:Int, found: Int) = new FailedToAddNewPartitions(topic,newPartitions,found) def TopicDoesNotExist(topic: String) = new TopicDoesNotExist(topic) diff --git a/app/kafka/manager/utils/ZkUtils.scala b/app/kafka/manager/utils/ZkUtils.scala index f8bab2d10..2c7173ebd 100644 --- a/app/kafka/manager/utils/ZkUtils.scala +++ b/app/kafka/manager/utils/ZkUtils.scala @@ -61,7 +61,7 @@ object ZkUtils { /** * Update the value of a persistent node with the given path and data. - * create parrent directory if necessary. Never throw NodeExistException. + * create parent directory if necessary. Never throw NodeExistException. * Return the updated path zkVersion */ def updatePersistentPath(curator: CuratorFramework, path: String, ba: Array[Byte], version: Int = -1) = { diff --git a/test/controller/api/TestKafkaHealthCheck.scala b/test/controller/api/TestKafkaHealthCheck.scala index d54a31882..580b4af83 100644 --- a/test/controller/api/TestKafkaHealthCheck.scala +++ b/test/controller/api/TestKafkaHealthCheck.scala @@ -50,12 +50,15 @@ class TestKafkaHealthCheck extends CuratorAwareTest with KafkaServerInTest { private[this] def createCluster() = { val future = KafkaManagerContext.getKafkaManager.addCluster(testClusterName,"0.8.2.0",kafkaServerZkPath, jmxEnabled = false) - Await.result(future,duration) + val result = Await.result(future,duration) + result.toEither.left.foreach(apiError => sys.error(apiError.msg)) + Thread.sleep(3000) } private[this] def createTopic() = { val future = KafkaManagerContext.getKafkaManager.createTopic(testClusterName,testTopicName,4,1) - Await.result(future,duration) + val result = Await.result(future,duration) + result.toEither.left.foreach(apiError => sys.error(apiError.msg)) } private[this] def deleteTopic() = {