Skip to content

Commit

Permalink
Merge pull request #4 from yahoo/master
Browse files Browse the repository at this point in the history
upstream fixes
  • Loading branch information
joestein committed Jul 7, 2015
2 parents dc06988 + cc279b0 commit e163169
Show file tree
Hide file tree
Showing 5 changed files with 10 additions and 7 deletions.
4 changes: 2 additions & 2 deletions app/kafka/manager/BrokerViewCacheActor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ class BrokerViewCacheActor(config: BrokerViewCacheActorConfig) extends LongRunni
}
val result = tryResult match {
case scala.util.Failure(t) =>
log.error(s"Failed to get topic metrics for broker $broker", t)
log.error(t, s"Failed to get topic metrics for broker $broker")
topicPartitions.map {
case (topic, id, partitions) =>
(topic.topic, BrokerMetrics.DEFAULT)
Expand All @@ -198,7 +198,7 @@ class BrokerViewCacheActor(config: BrokerViewCacheActorConfig) extends LongRunni

val result = tryResult match {
case scala.util.Failure(t) =>
log.error(s"Failed to get broker metrics for $broker", t)
log.error(t, s"Failed to get broker metrics for $broker")
BrokerMetrics.DEFAULT
case scala.util.Success(bm) => bm
}
Expand Down
2 changes: 1 addition & 1 deletion app/kafka/manager/utils/AdminUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ class AdminUtils(version: KafkaVersion) {
checkCondition(curator.checkExists().forPath(topicPath) == null,TopicErrors.TopicAlreadyExists(topic))
}
partitionReplicaAssignment.foreach {
case (part,reps) => checkCondition(reps.size == reps.toSet.size, TopicErrors.DuplicateReplicAssignment(topic,part,reps))
case (part,reps) => checkCondition(reps.size == reps.toSet.size, TopicErrors.DuplicateReplicaAssignment(topic,part,reps))
}

// write out the config on create, not update, if there is any, this isn't transactional with the partition assignments
Expand Down
2 changes: 1 addition & 1 deletion app/kafka/manager/utils/Topic.scala
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ object TopicErrors {
def ReplicationGreaterThanNumBrokers(rf: Int, nb: Int) = new ReplicationGreaterThanNumBrokers(rf,nb)
val InconsistentPartitionReplicas = new InconsistentPartitionReplicas
def TopicAlreadyExists(topic: String) = new TopicAlreadyExists(topic)
def DuplicateReplicAssignment(topic: String, part: Int, replicas: Seq[Int]) = new DuplicateReplicaAssignment(topic,part,replicas)
def DuplicateReplicaAssignment(topic: String, part: Int, replicas: Seq[Int]) = new DuplicateReplicaAssignment(topic,part,replicas)
def CannotAddZeroPartitions(topic: String, currentPartitions: Int, newPartitions:Int) = new CannotAddZeroPartitions(topic,currentPartitions,newPartitions)
def FailedToAddNewPartitions(topic: String, newPartitions:Int, found: Int) = new FailedToAddNewPartitions(topic,newPartitions,found)
def TopicDoesNotExist(topic: String) = new TopicDoesNotExist(topic)
Expand Down
2 changes: 1 addition & 1 deletion app/kafka/manager/utils/ZkUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ object ZkUtils {

/**
* Update the value of a persistent node with the given path and data.
* create parrent directory if necessary. Never throw NodeExistException.
* create parent directory if necessary. Never throw NodeExistException.
* Return the updated path zkVersion
*/
def updatePersistentPath(curator: CuratorFramework, path: String, ba: Array[Byte], version: Int = -1) = {
Expand Down
7 changes: 5 additions & 2 deletions test/controller/api/TestKafkaHealthCheck.scala
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,15 @@ class TestKafkaHealthCheck extends CuratorAwareTest with KafkaServerInTest {

private[this] def createCluster() = {
val future = KafkaManagerContext.getKafkaManager.addCluster(testClusterName,"0.8.2.0",kafkaServerZkPath, jmxEnabled = false)
Await.result(future,duration)
val result = Await.result(future,duration)
result.toEither.left.foreach(apiError => sys.error(apiError.msg))
Thread.sleep(3000)
}

private[this] def createTopic() = {
val future = KafkaManagerContext.getKafkaManager.createTopic(testClusterName,testTopicName,4,1)
Await.result(future,duration)
val result = Await.result(future,duration)
result.toEither.left.foreach(apiError => sys.error(apiError.msg))
}

private[this] def deleteTopic() = {
Expand Down

0 comments on commit e163169

Please sign in to comment.