Skip to content

Commit

Permalink
MINOR; Small refactor in GroupMetadata (#10236)
Browse files Browse the repository at this point in the history
Reviewers: Jason Gustafson <jason@confluent.io>
  • Loading branch information
dajac authored Mar 1, 2021
1 parent a63e5be commit 2633db9
Showing 1 changed file with 13 additions and 5 deletions.
18 changes: 13 additions & 5 deletions core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState
leaderId = Some(member.memberId)

members.put(member.memberId, member)
member.supportedProtocols.foreach { case (protocol, _) => supportedProtocols(protocol) += 1 }
incSupportedProtocols(member)
member.awaitingJoinCallback = callback

if (member.isAwaitingJoin)
Expand All @@ -263,7 +263,7 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState

def remove(memberId: String): Unit = {
members.remove(memberId).foreach { member =>
member.supportedProtocols.foreach { case (protocol, _) => supportedProtocols(protocol) -= 1 }
decSupportedProtocols(member)
if (member.isAwaitingJoin)
numMembersAwaitingJoin -= 1

Expand Down Expand Up @@ -426,6 +426,14 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState
protocol
}

private def incSupportedProtocols(member: MemberMetadata): Unit = {
member.supportedProtocols.foreach { case (protocol, _) => supportedProtocols(protocol) += 1 }
}

private def decSupportedProtocols(member: MemberMetadata): Unit = {
member.supportedProtocols.foreach { case (protocol, _) => supportedProtocols(protocol) -= 1 }
}

private def candidateProtocols: Set[String] = {
// get the set of protocols that are commonly supported by all members
val numMembers = members.size
Expand All @@ -434,7 +442,7 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState

def supportsProtocols(memberProtocolType: String, memberProtocols: Set[String]): Boolean = {
if (is(Empty))
!memberProtocolType.isEmpty && memberProtocols.nonEmpty
memberProtocolType.nonEmpty && memberProtocols.nonEmpty
else
protocolType.contains(memberProtocolType) && memberProtocols.exists(supportedProtocols(_) == members.size)
}
Expand Down Expand Up @@ -489,9 +497,9 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState
def updateMember(member: MemberMetadata,
protocols: List[(String, Array[Byte])],
callback: JoinCallback): Unit = {
member.supportedProtocols.foreach { case (protocol, _) => supportedProtocols(protocol) -= 1 }
protocols.foreach { case (protocol, _) => supportedProtocols(protocol) += 1 }
decSupportedProtocols(member)
member.supportedProtocols = protocols
incSupportedProtocols(member)

if (callback != null && !member.isAwaitingJoin) {
numMembersAwaitingJoin += 1
Expand Down

0 comments on commit 2633db9

Please sign in to comment.