Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kafka 0.10 support #282

Merged
merged 9 commits into from
Feb 23, 2017
Prev Previous commit
Fix incompatible 0.10.1 message format. You only notice when there ar…
…e 0.10.1 clients talking to the cluster, which will lead to "Unknown version 1 for group metadata message" errors.
  • Loading branch information
flyaruu authored and tqh committed Dec 12, 2016
commit 180ee24169d9182503d3ad7f104bb5d8a71fad36
71 changes: 68 additions & 3 deletions app/kafka/manager/utils/zero90/GroupMetadataManager.scala
Original file line number Diff line number Diff line change
@@ -61,31 +61,65 @@ object GroupMetadataManager {
private val GROUP_METADATA_KEY_SCHEMA = new Schema(new Field("group", STRING))
private val GROUP_KEY_GROUP_FIELD = GROUP_METADATA_KEY_SCHEMA.get("group")

private val MEMBER_METADATA_V0 = new Schema(new Field("member_id", STRING),
private val MEMBER_METADATA_V0 = new Schema(
new Field("member_id", STRING),
new Field("client_id", STRING),
new Field("client_host", STRING),
new Field("session_timeout", INT32),
new Field("subscription", BYTES),
new Field("assignment", BYTES))

private val MEMBER_METADATA_V1 = new Schema(
new Field("member_id", STRING),
new Field("client_id", STRING),
new Field("client_host", STRING),
new Field("session_timeout", INT32),
new Field("rebalance_timeout", INT32),
new Field("subscription", BYTES),
new Field("assignment", BYTES))

private val MEMBER_METADATA_MEMBER_ID_V0 = MEMBER_METADATA_V0.get("member_id")
private val MEMBER_METADATA_CLIENT_ID_V0 = MEMBER_METADATA_V0.get("client_id")
private val MEMBER_METADATA_CLIENT_HOST_V0 = MEMBER_METADATA_V0.get("client_host")
private val MEMBER_METADATA_SESSION_TIMEOUT_V0 = MEMBER_METADATA_V0.get("session_timeout")
private val MEMBER_METADATA_SUBSCRIPTION_V0 = MEMBER_METADATA_V0.get("subscription")
private val MEMBER_METADATA_ASSIGNMENT_V0 = MEMBER_METADATA_V0.get("assignment")

private val MEMBER_METADATA_MEMBER_ID_V1 = MEMBER_METADATA_V1.get("member_id")
private val MEMBER_METADATA_CLIENT_ID_V1 = MEMBER_METADATA_V1.get("client_id")
private val MEMBER_METADATA_CLIENT_HOST_V1 = MEMBER_METADATA_V1.get("client_host")
private val MEMBER_METADATA_SESSION_TIMEOUT_V1 = MEMBER_METADATA_V1.get("session_timeout")
private val MEMBER_METADATA_REBALANCE_TIMEOUT_V1 = MEMBER_METADATA_V1.get("rebalance_timeout")
private val MEMBER_METADATA_SUBSCRIPTION_V1 = MEMBER_METADATA_V1.get("subscription")
private val MEMBER_METADATA_ASSIGNMENT_V1 = MEMBER_METADATA_V1.get("assignment")


private val GROUP_METADATA_VALUE_SCHEMA_V0 = new Schema(new Field("protocol_type", STRING),
private val GROUP_METADATA_VALUE_SCHEMA_V0 = new Schema(
new Field("protocol_type", STRING),
new Field("generation", INT32),
new Field("protocol", STRING),
new Field("leader", STRING),
new Field("members", new ArrayOf(MEMBER_METADATA_V0)))

private val GROUP_METADATA_VALUE_SCHEMA_V1 = new Schema(
new Field("protocol_type", STRING),
new Field("generation", INT32),
new Field("protocol", NULLABLE_STRING),
new Field("leader", NULLABLE_STRING),
new Field("members", new ArrayOf(MEMBER_METADATA_V1)))

private val GROUP_METADATA_PROTOCOL_TYPE_V0 = GROUP_METADATA_VALUE_SCHEMA_V0.get("protocol_type")
private val GROUP_METADATA_GENERATION_V0 = GROUP_METADATA_VALUE_SCHEMA_V0.get("generation")
private val GROUP_METADATA_PROTOCOL_V0 = GROUP_METADATA_VALUE_SCHEMA_V0.get("protocol")
private val GROUP_METADATA_LEADER_V0 = GROUP_METADATA_VALUE_SCHEMA_V0.get("leader")
private val GROUP_METADATA_MEMBERS_V0 = GROUP_METADATA_VALUE_SCHEMA_V0.get("members")

private val GROUP_METADATA_PROTOCOL_TYPE_V1 = GROUP_METADATA_VALUE_SCHEMA_V1.get("protocol_type")
private val GROUP_METADATA_GENERATION_V1 = GROUP_METADATA_VALUE_SCHEMA_V1.get("generation")
private val GROUP_METADATA_PROTOCOL_V1 = GROUP_METADATA_VALUE_SCHEMA_V1.get("protocol")
private val GROUP_METADATA_LEADER_V1 = GROUP_METADATA_VALUE_SCHEMA_V1.get("leader")
private val GROUP_METADATA_MEMBERS_V1 = GROUP_METADATA_VALUE_SCHEMA_V1.get("members")

// map of versions to key schemas as data types
private val MESSAGE_TYPE_SCHEMAS = Map(
0 -> OFFSET_COMMIT_KEY_SCHEMA,
@@ -99,7 +133,7 @@ object GroupMetadataManager {
private val CURRENT_OFFSET_VALUE_SCHEMA_VERSION = 1.toShort

// map of version of group metadata value schemas
private val GROUP_VALUE_SCHEMAS = Map(0 -> GROUP_METADATA_VALUE_SCHEMA_V0)
private val GROUP_VALUE_SCHEMAS = Map(0 -> GROUP_METADATA_VALUE_SCHEMA_V0,1 -> GROUP_METADATA_VALUE_SCHEMA_V1)
private val CURRENT_GROUP_VALUE_SCHEMA_VERSION = 0.toShort

private val CURRENT_OFFSET_KEY_SCHEMA = schemaForKey(CURRENT_OFFSET_KEY_SCHEMA_VERSION)
@@ -290,6 +324,37 @@ object GroupMetadataManager {
group.add(memberId, member)
}
group
} else if (version == 1){
val protocolType = value.get(GROUP_METADATA_PROTOCOL_TYPE_V1).asInstanceOf[String]

val generationId = value.get(GROUP_METADATA_GENERATION_V1).asInstanceOf[Int]
val leaderId = value.get(GROUP_METADATA_LEADER_V1).asInstanceOf[String]
val protocol = value.get(GROUP_METADATA_PROTOCOL_V1).asInstanceOf[String]
val group = new GroupMetadata(groupId, protocolType, generationId, leaderId, protocol)

value.getArray(GROUP_METADATA_MEMBERS_V1).foreach {
case memberMetadataObj =>
val memberMetadata = memberMetadataObj.asInstanceOf[Struct]
val memberId = memberMetadata.get(MEMBER_METADATA_MEMBER_ID_V1).asInstanceOf[String]
val clientId = memberMetadata.get(MEMBER_METADATA_CLIENT_ID_V1).asInstanceOf[String]
val clientHost = memberMetadata.get(MEMBER_METADATA_CLIENT_HOST_V1).asInstanceOf[String]
//val sessionTimeout = memberMetadata.get(MEMBER_METADATA_SESSION_TIMEOUT_V0).asInstanceOf[Int]
val subscription = ConsumerProtocol.deserializeSubscription(memberMetadata.get(MEMBER_METADATA_SUBSCRIPTION_V1).asInstanceOf[ByteBuffer])
val assignment = ConsumerProtocol.deserializeAssignment(memberMetadata.get(MEMBER_METADATA_ASSIGNMENT_V1).asInstanceOf[ByteBuffer])

import collection.JavaConverters._
val member = new MemberMetadata(
memberId
, groupId
, clientId
, clientHost
//, sessionTimeout
, List((group.protocol, subscription.topics().asScala.toSet))
, assignment.partitions().asScala.map(tp => tp.topic() -> tp.partition()).toSet
)
group.add(memberId, member)
}
group
} else {
throw new IllegalStateException("Unknown group metadata message version")
}