Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

upstream fixes #4

Merged
merged 7 commits into from
Jul 7, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions app/kafka/manager/BrokerViewCacheActor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ class BrokerViewCacheActor(config: BrokerViewCacheActorConfig) extends LongRunni
}
val result = tryResult match {
case scala.util.Failure(t) =>
log.error(s"Failed to get topic metrics for broker $broker", t)
log.error(t, s"Failed to get topic metrics for broker $broker")
topicPartitions.map {
case (topic, id, partitions) =>
(topic.topic, BrokerMetrics.DEFAULT)
Expand All @@ -198,7 +198,7 @@ class BrokerViewCacheActor(config: BrokerViewCacheActorConfig) extends LongRunni

val result = tryResult match {
case scala.util.Failure(t) =>
log.error(s"Failed to get broker metrics for $broker", t)
log.error(t, s"Failed to get broker metrics for $broker")
BrokerMetrics.DEFAULT
case scala.util.Success(bm) => bm
}
Expand Down
2 changes: 1 addition & 1 deletion app/kafka/manager/utils/AdminUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ class AdminUtils(version: KafkaVersion) {
checkCondition(curator.checkExists().forPath(topicPath) == null,TopicErrors.TopicAlreadyExists(topic))
}
partitionReplicaAssignment.foreach {
case (part,reps) => checkCondition(reps.size == reps.toSet.size, TopicErrors.DuplicateReplicAssignment(topic,part,reps))
case (part,reps) => checkCondition(reps.size == reps.toSet.size, TopicErrors.DuplicateReplicaAssignment(topic,part,reps))
}

// write out the config on create, not update, if there is any, this isn't transactional with the partition assignments
Expand Down
2 changes: 1 addition & 1 deletion app/kafka/manager/utils/Topic.scala
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ object TopicErrors {
def ReplicationGreaterThanNumBrokers(rf: Int, nb: Int) = new ReplicationGreaterThanNumBrokers(rf,nb)
val InconsistentPartitionReplicas = new InconsistentPartitionReplicas
def TopicAlreadyExists(topic: String) = new TopicAlreadyExists(topic)
def DuplicateReplicAssignment(topic: String, part: Int, replicas: Seq[Int]) = new DuplicateReplicaAssignment(topic,part,replicas)
def DuplicateReplicaAssignment(topic: String, part: Int, replicas: Seq[Int]) = new DuplicateReplicaAssignment(topic,part,replicas)
def CannotAddZeroPartitions(topic: String, currentPartitions: Int, newPartitions:Int) = new CannotAddZeroPartitions(topic,currentPartitions,newPartitions)
def FailedToAddNewPartitions(topic: String, newPartitions:Int, found: Int) = new FailedToAddNewPartitions(topic,newPartitions,found)
def TopicDoesNotExist(topic: String) = new TopicDoesNotExist(topic)
Expand Down
2 changes: 1 addition & 1 deletion app/kafka/manager/utils/ZkUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ object ZkUtils {

/**
* Update the value of a persistent node with the given path and data.
* create parrent directory if necessary. Never throw NodeExistException.
* create parent directory if necessary. Never throw NodeExistException.
* Return the updated path zkVersion
*/
def updatePersistentPath(curator: CuratorFramework, path: String, ba: Array[Byte], version: Int = -1) = {
Expand Down
7 changes: 5 additions & 2 deletions test/controller/api/TestKafkaHealthCheck.scala
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,15 @@ class TestKafkaHealthCheck extends CuratorAwareTest with KafkaServerInTest {

private[this] def createCluster() = {
val future = KafkaManagerContext.getKafkaManager.addCluster(testClusterName,"0.8.2.0",kafkaServerZkPath, jmxEnabled = false)
Await.result(future,duration)
val result = Await.result(future,duration)
result.toEither.left.foreach(apiError => sys.error(apiError.msg))
Thread.sleep(3000)
}

private[this] def createTopic() = {
val future = KafkaManagerContext.getKafkaManager.createTopic(testClusterName,testTopicName,4,1)
Await.result(future,duration)
val result = Await.result(future,duration)
result.toEither.left.foreach(apiError => sys.error(apiError.msg))
}

private[this] def deleteTopic() = {
Expand Down