From 2633db9b61ab68862556e1c17d90f9baba6666f8 Mon Sep 17 00:00:00 2001 From: David Jacot Date: Mon, 1 Mar 2021 18:25:23 +0100 Subject: [PATCH] MINOR; Small refactor in `GroupMetadata` (#10236) Reviewers: Jason Gustafson --- .../coordinator/group/GroupMetadata.scala | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala b/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala index 50328f0516300..53bce0b8cb0ee 100644 --- a/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala +++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala @@ -252,7 +252,7 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState leaderId = Some(member.memberId) members.put(member.memberId, member) - member.supportedProtocols.foreach { case (protocol, _) => supportedProtocols(protocol) += 1 } + incSupportedProtocols(member) member.awaitingJoinCallback = callback if (member.isAwaitingJoin) @@ -263,7 +263,7 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState def remove(memberId: String): Unit = { members.remove(memberId).foreach { member => - member.supportedProtocols.foreach { case (protocol, _) => supportedProtocols(protocol) -= 1 } + decSupportedProtocols(member) if (member.isAwaitingJoin) numMembersAwaitingJoin -= 1 @@ -426,6 +426,14 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState protocol } + private def incSupportedProtocols(member: MemberMetadata): Unit = { + member.supportedProtocols.foreach { case (protocol, _) => supportedProtocols(protocol) += 1 } + } + + private def decSupportedProtocols(member: MemberMetadata): Unit = { + member.supportedProtocols.foreach { case (protocol, _) => supportedProtocols(protocol) -= 1 } + } + private def candidateProtocols: Set[String] = { // get the set of protocols that are commonly supported by all members val numMembers = members.size @@ -434,7 +442,7 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState def supportsProtocols(memberProtocolType: String, memberProtocols: Set[String]): Boolean = { if (is(Empty)) - !memberProtocolType.isEmpty && memberProtocols.nonEmpty + memberProtocolType.nonEmpty && memberProtocols.nonEmpty else protocolType.contains(memberProtocolType) && memberProtocols.exists(supportedProtocols(_) == members.size) } @@ -489,9 +497,9 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState def updateMember(member: MemberMetadata, protocols: List[(String, Array[Byte])], callback: JoinCallback): Unit = { - member.supportedProtocols.foreach { case (protocol, _) => supportedProtocols(protocol) -= 1 } - protocols.foreach { case (protocol, _) => supportedProtocols(protocol) += 1 } + decSupportedProtocols(member) member.supportedProtocols = protocols + incSupportedProtocols(member) if (callback != null && !member.isAwaitingJoin) { numMembersAwaitingJoin += 1