Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kafka 0.10 support #282

Merged
merged 9 commits into from
Feb 23, 2017
Merged
2 changes: 1 addition & 1 deletion app/kafka/manager/actor/cluster/KafkaStateActor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,7 @@ case class KafkaManagedOffsetCache(clusterContext: ClusterContext
lastUpdateTimeMillis = System.currentTimeMillis()
} catch {
case e: Exception =>
warn("Failed to process a message from offset topic!", e)
warn(s"Failed to process a message from offset topic on cluster ${clusterContext.config.name}!", e)
Copy link

@whithajess whithajess Sep 18, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Generally wouldn't look for something in a Exception case as it may not exist so maybe the ${clusterContext.config.name} should be removed

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It already proved very convenient to me. Why do you want to hide helpful information?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do not want to hide it, but if you access a variable that doesn't exist in the Exception case then what will catch that exception?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, but this review isn't helpful.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@whithajess, in general, you have a point that if a variable is undefined in a catch, then there's no point in catching the first exception if we just generate another one anyway. However, in this case ${clusterContext.config.name} is already used on line 242, and everything is wrapped in a try anyway. This change wouldn't be executed if ${clusterContext.config.name} wasn't defined.

}
}
} finally {
Expand Down
5 changes: 3 additions & 2 deletions app/kafka/manager/utils/zero90/GroupMetadataManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,10 @@ package kafka.manager.utils.zero90

import java.nio.ByteBuffer

import kafka.common.{KafkaException, TopicAndPartition, OffsetAndMetadata}
import kafka.common.{KafkaException, OffsetAndMetadata}
import kafka.coordinator.{GroupMetadataKey, GroupTopicPartition, OffsetKey, BaseKey}
import org.apache.kafka.clients.consumer.internals.ConsumerProtocol
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.protocol.types.Type._
import org.apache.kafka.common.protocol.types.{ArrayOf, Field, Schema, Struct}

Expand Down Expand Up @@ -199,7 +200,7 @@ object GroupMetadataManager {
val topic = key.get(OFFSET_KEY_TOPIC_FIELD).asInstanceOf[String]
val partition = key.get(OFFSET_KEY_PARTITION_FIELD).asInstanceOf[Int]

OffsetKey(version, GroupTopicPartition(group, TopicAndPartition(topic, partition)))
OffsetKey(version, GroupTopicPartition(group, new TopicPartition(topic, partition)))

} else if (version == CURRENT_GROUP_KEY_SCHEMA_VERSION) {
// version 2 refers to offset
Expand Down
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ libraryDependencies ++= Seq(
"org.slf4j" % "log4j-over-slf4j" % "1.7.12",
"com.adrianhurt" %% "play-bootstrap3" % "0.4.5-P24",
"org.clapper" %% "grizzled-slf4j" % "1.0.2",
"org.apache.kafka" %% "kafka" % "0.9.0.1" exclude("log4j","log4j") exclude("org.slf4j", "slf4j-log4j12") force(),
"org.apache.kafka" %% "kafka" % "0.10.0.1" exclude("log4j","log4j") exclude("org.slf4j", "slf4j-log4j12") force(),

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Surely this 0.9.0.1 part is still needed for older support.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Kafka client is backwards complient with 0.9. Havn't checked for older versions. Tested with a 0.9 cluster.

"com.beachape" %% "enumeratum" % "1.4.4",
"org.scalatest" %% "scalatest" % "2.2.1" % "test",
"org.apache.curator" % "curator-test" % "2.10.0" % "test",
Expand Down