Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-12342; Merge RaftClient and MetaLogManager interfaces and remove shim #10497

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -1071,6 +1071,7 @@ project(':metadata') {

dependencies {
implementation project(':clients')
implementation project(':raft')
implementation libs.jacksonDatabind
implementation libs.jacksonJDK8Datatypes
implementation libs.metrics
Expand Down Expand Up @@ -1268,7 +1269,6 @@ project(':raft') {

dependencies {
implementation project(':clients')
implementation project(':metadata')
implementation libs.slf4jApi
implementation libs.jacksonDatabind

Expand Down
7 changes: 5 additions & 2 deletions checkstyle/import-control.xml
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,7 @@
<allow pkg="org.apache.kafka.controller" />
<allow pkg="org.apache.kafka.metadata" />
<allow pkg="org.apache.kafka.metalog" />
<allow pkg="org.apache.kafka.raft" />
<allow pkg="org.apache.kafka.queue" />
<allow pkg="org.apache.kafka.test" />
<allow pkg="org.apache.kafka.timeline" />
Expand All @@ -231,6 +232,8 @@
<allow pkg="org.apache.kafka.common.message" />
<allow pkg="org.apache.kafka.common.metadata" />
<allow pkg="org.apache.kafka.common.protocol" />
<allow pkg="org.apache.kafka.metalog" />
<allow pkg="org.apache.kafka.raft" />
<allow pkg="org.apache.kafka.test" />
</subpackage>

Expand All @@ -239,6 +242,8 @@
<allow pkg="org.apache.kafka.common.protocol" />
<allow pkg="org.apache.kafka.metadata" />
<allow pkg="org.apache.kafka.metalog" />
<allow pkg="org.apache.kafka.raft" />
<allow pkg="org.apache.kafka.snapshot" />
<allow pkg="org.apache.kafka.queue" />
<allow pkg="org.apache.kafka.test" />
</subpackage>
Expand Down Expand Up @@ -282,7 +287,6 @@
<allow pkg="net.sourceforge.argparse4j" />
<allow pkg="org.apache.kafka.common"/>
<allow pkg="org.apache.kafka.metadata"/>
<allow pkg="org.apache.kafka.metalog"/>
<allow pkg="org.apache.kafka.queue"/>
<allow pkg="org.apache.kafka.raft"/>
<allow pkg="org.apache.kafka.shell"/>
Expand Down Expand Up @@ -406,7 +410,6 @@
<allow pkg="org.apache.kafka.common.record" />
<allow pkg="org.apache.kafka.common.requests" />
<allow pkg="org.apache.kafka.common.protocol" />
<allow pkg="org.apache.kafka.metalog" />
<allow pkg="org.apache.kafka.test"/>
<allow pkg="com.fasterxml.jackson" />
<allow pkg="net.jqwik"/>
Expand Down
25 changes: 16 additions & 9 deletions core/src/main/scala/kafka/raft/RaftManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ import org.apache.kafka.common.security.JaasContext
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.utils.{LogContext, Time}
import org.apache.kafka.raft.RaftConfig.{AddressSpec, InetAddressSpec, NON_ROUTABLE_ADDRESS, UnknownAddressSpec}
import org.apache.kafka.raft.{FileBasedStateStore, KafkaRaftClient, RaftClient, RaftConfig, RaftRequest, RecordSerde}
import org.apache.kafka.raft.{FileBasedStateStore, KafkaRaftClient, LeaderAndEpoch, RaftClient, RaftConfig, RaftRequest, RecordSerde}

import scala.jdk.CollectionConverters._

Expand Down Expand Up @@ -101,6 +101,10 @@ trait RaftManager[T] {
epoch: Int,
records: Seq[T]
): Option[Long]

def leaderAndEpoch: LeaderAndEpoch

def client: RaftClient[T]
}

class KafkaRaftManager[T](
Expand All @@ -126,10 +130,10 @@ class KafkaRaftManager[T](
private val dataDir = createDataDir()
private val metadataLog = buildMetadataLog()
private val netChannel = buildNetworkChannel()
private val raftClient = buildRaftClient()
private val raftIoThread = new RaftIoThread(raftClient, threadNamePrefix)
val client: KafkaRaftClient[T] = buildRaftClient()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you mean to override the return type from RaftClient[T] to KafkaRaftClient[T]?

private val raftIoThread = new RaftIoThread(client, threadNamePrefix)

def kafkaRaftClient: KafkaRaftClient[T] = raftClient
def kafkaRaftClient: KafkaRaftClient[T] = client
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that since you added a new method client: RaftClient[T] to RaftManager[T] and KafkaRaftManager overrides it to client: KafkaRaftClient[T] we should be able to remove this KafkaRaftManager[T] only public method.


def startup(): Unit = {
// Update the voter endpoints (if valid) with what's in RaftConfig
Expand All @@ -152,7 +156,7 @@ class KafkaRaftManager[T](

def shutdown(): Unit = {
raftIoThread.shutdown()
raftClient.close()
client.close()
scheduler.shutdown()
netChannel.close()
metadataLog.close()
Expand All @@ -161,7 +165,7 @@ class KafkaRaftManager[T](
override def register(
listener: RaftClient.Listener[T]
): Unit = {
raftClient.register(listener)
client.register(listener)
}

override def scheduleAtomicAppend(
Expand All @@ -184,9 +188,9 @@ class KafkaRaftManager[T](
isAtomic: Boolean
): Option[Long] = {
val offset = if (isAtomic) {
raftClient.scheduleAtomicAppend(epoch, records.asJava)
client.scheduleAtomicAppend(epoch, records.asJava)
} else {
raftClient.scheduleAppend(epoch, records.asJava)
client.scheduleAppend(epoch, records.asJava)
}

Option(offset).map(Long.unbox)
Expand All @@ -203,7 +207,7 @@ class KafkaRaftManager[T](
createdTimeMs
)

raftClient.handle(inboundRequest)
client.handle(inboundRequest)

inboundRequest.completion.thenApply { response =>
response.data
Expand Down Expand Up @@ -308,4 +312,7 @@ class KafkaRaftManager[T](
)
}

override def leaderAndEpoch: LeaderAndEpoch = {
client.leaderAndEpoch
}
}
30 changes: 15 additions & 15 deletions core/src/main/scala/kafka/server/BrokerServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@
package kafka.server

import java.util
import java.util.concurrent.{CompletableFuture, TimeUnit, TimeoutException}
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.locks.ReentrantLock
import java.util.concurrent.{CompletableFuture, TimeUnit, TimeoutException}
import java.net.InetAddress

import kafka.cluster.Broker.ServerInfo
Expand All @@ -29,6 +29,7 @@ import kafka.coordinator.transaction.{ProducerIdGenerator, TransactionCoordinato
import kafka.log.LogManager
import kafka.metrics.KafkaYammerMetrics
import kafka.network.SocketServer
import kafka.raft.RaftManager
import kafka.security.CredentialProvider
import kafka.server.metadata.{BrokerMetadataListener, CachedConfigRepository, ClientQuotaCache, ClientQuotaMetadataManager, RaftMetadataCache}
import kafka.utils.{CoreUtils, KafkaScheduler}
Expand All @@ -42,8 +43,7 @@ import org.apache.kafka.common.security.scram.internals.ScramMechanism
import org.apache.kafka.common.security.token.delegation.internals.DelegationTokenCache
import org.apache.kafka.common.utils.{AppInfoParser, LogContext, Time, Utils}
import org.apache.kafka.common.{ClusterResource, Endpoint, KafkaException}
import org.apache.kafka.metadata.{BrokerState, VersionRange}
import org.apache.kafka.metalog.MetaLogManager
import org.apache.kafka.metadata.{ApiMessageAndVersion, BrokerState, VersionRange}
import org.apache.kafka.raft.RaftConfig
import org.apache.kafka.raft.RaftConfig.AddressSpec
import org.apache.kafka.server.authorizer.Authorizer
Expand All @@ -55,16 +55,16 @@ import scala.jdk.CollectionConverters._
* A Kafka broker that runs in KRaft (Kafka Raft) mode.
*/
class BrokerServer(
val config: KafkaConfig,
val metaProps: MetaProperties,
val metaLogManager: MetaLogManager,
val time: Time,
val metrics: Metrics,
val threadNamePrefix: Option[String],
val initialOfflineDirs: Seq[String],
val controllerQuorumVotersFuture: CompletableFuture[util.Map[Integer, AddressSpec]],
val supportedFeatures: util.Map[String, VersionRange]
) extends KafkaBroker {
val config: KafkaConfig,
val metaProps: MetaProperties,
val raftManager: RaftManager[ApiMessageAndVersion],
val time: Time,
val metrics: Metrics,
val threadNamePrefix: Option[String],
val initialOfflineDirs: Seq[String],
val controllerQuorumVotersFuture: CompletableFuture[util.Map[Integer, AddressSpec]],
val supportedFeatures: util.Map[String, VersionRange]
) extends KafkaBroker {

import kafka.server.Server._

Expand Down Expand Up @@ -181,7 +181,7 @@ class BrokerServer(
credentialProvider = new CredentialProvider(ScramMechanism.mechanismNames, tokenCache)

val controllerNodes = RaftConfig.voterConnectionsToNodes(controllerQuorumVotersFuture.get()).asScala
val controllerNodeProvider = RaftControllerNodeProvider(metaLogManager, config, controllerNodes)
val controllerNodeProvider = RaftControllerNodeProvider(raftManager, config, controllerNodes)

clientToControllerChannelManager = BrokerToControllerChannelManager(
controllerNodeProvider,
Expand Down Expand Up @@ -284,7 +284,7 @@ class BrokerServer(
metaProps.clusterId, networkListeners, supportedFeatures)

// Register a listener with the Raft layer to receive metadata event notifications
metaLogManager.register(brokerMetadataListener)
raftManager.register(brokerMetadataListener)

val endpoints = new util.ArrayList[Endpoint](networkListeners.size())
var interBrokerListener: Endpoint = null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import java.util.concurrent.LinkedBlockingDeque
import java.util.concurrent.atomic.AtomicReference

import kafka.common.{InterBrokerSendThread, RequestAndCompletionHandler}
import kafka.raft.RaftManager
import kafka.utils.Logging
import org.apache.kafka.clients._
import org.apache.kafka.common.Node
Expand All @@ -31,9 +32,10 @@ import org.apache.kafka.common.requests.AbstractRequest
import org.apache.kafka.common.security.JaasContext
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.utils.{LogContext, Time}
import org.apache.kafka.metalog.MetaLogManager
import org.apache.kafka.metadata.ApiMessageAndVersion

import scala.collection.Seq
import scala.compat.java8.OptionConverters._
import scala.jdk.CollectionConverters._

trait ControllerNodeProvider {
Expand Down Expand Up @@ -77,15 +79,14 @@ class MetadataCacheControllerNodeProvider(
}

object RaftControllerNodeProvider {
def apply(metaLogManager: MetaLogManager,
def apply(raftManager: RaftManager[ApiMessageAndVersion],
config: KafkaConfig,
controllerQuorumVoterNodes: Seq[Node]): RaftControllerNodeProvider = {

val controllerListenerName = new ListenerName(config.controllerListenerNames.head)
val controllerSecurityProtocol = config.listenerSecurityProtocolMap.getOrElse(controllerListenerName, SecurityProtocol.forName(controllerListenerName.value()))
val controllerSaslMechanism = config.saslMechanismControllerProtocol
new RaftControllerNodeProvider(
metaLogManager,
raftManager,
controllerQuorumVoterNodes,
controllerListenerName,
controllerSecurityProtocol,
Expand All @@ -98,7 +99,7 @@ object RaftControllerNodeProvider {
* Finds the controller node by checking the metadata log manager.
* This provider is used when we are using a Raft-based metadata quorum.
*/
class RaftControllerNodeProvider(val metaLogManager: MetaLogManager,
class RaftControllerNodeProvider(val raftManager: RaftManager[ApiMessageAndVersion],
controllerQuorumVoterNodes: Seq[Node],
val listenerName: ListenerName,
val securityProtocol: SecurityProtocol,
Expand All @@ -107,14 +108,7 @@ class RaftControllerNodeProvider(val metaLogManager: MetaLogManager,
val idToNode = controllerQuorumVoterNodes.map(node => node.id() -> node).toMap

override def get(): Option[Node] = {
val leader = metaLogManager.leader()
if (leader == null) {
None
} else if (leader.nodeId() < 0) {
None
} else {
idToNode.get(leader.nodeId())
}
raftManager.leaderAndEpoch.leaderId.asScala.map(idToNode)
}
}

Expand Down
22 changes: 10 additions & 12 deletions core/src/main/scala/kafka/server/ControllerServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@

package kafka.server

import java.util.concurrent.{CompletableFuture, TimeUnit}
import java.util
import java.util.concurrent.locks.ReentrantLock
import java.util.concurrent.{CompletableFuture, TimeUnit}

import kafka.cluster.Broker.ServerInfo
import kafka.log.LogConfig
Expand All @@ -38,7 +38,6 @@ import org.apache.kafka.common.utils.{LogContext, Time}
import org.apache.kafka.common.{ClusterResource, Endpoint}
import org.apache.kafka.controller.{Controller, QuorumController, QuorumControllerMetrics}
import org.apache.kafka.metadata.{ApiMessageAndVersion, VersionRange}
import org.apache.kafka.metalog.MetaLogManager
import org.apache.kafka.raft.RaftConfig
import org.apache.kafka.raft.RaftConfig.AddressSpec
import org.apache.kafka.server.authorizer.Authorizer
Expand All @@ -49,15 +48,14 @@ import scala.jdk.CollectionConverters._
* A Kafka controller that runs in KRaft (Kafka Raft) mode.
*/
class ControllerServer(
val metaProperties: MetaProperties,
val config: KafkaConfig,
val metaLogManager: MetaLogManager,
val raftManager: RaftManager[ApiMessageAndVersion],
val time: Time,
val metrics: Metrics,
val threadNamePrefix: Option[String],
val controllerQuorumVotersFuture: CompletableFuture[util.Map[Integer, AddressSpec]]
) extends Logging with KafkaMetricsGroup {
val metaProperties: MetaProperties,
val config: KafkaConfig,
val raftManager: RaftManager[ApiMessageAndVersion],
val time: Time,
val metrics: Metrics,
val threadNamePrefix: Option[String],
val controllerQuorumVotersFuture: CompletableFuture[util.Map[Integer, AddressSpec]]
) extends Logging with KafkaMetricsGroup {
import kafka.server.Server._

val lock = new ReentrantLock()
Expand Down Expand Up @@ -148,7 +146,7 @@ class ControllerServer(
setTime(time).
setThreadNamePrefix(threadNamePrefixAsString).
setConfigDefs(configDefs).
setLogManager(metaLogManager).
setRaftClient(raftManager.client).
setDefaultReplicationFactor(config.defaultReplicationFactor.toShort).
setDefaultNumPartitions(config.numPartitions.intValue()).
setSessionTimeoutNs(TimeUnit.NANOSECONDS.convert(config.brokerSessionTimeoutMs.longValue(),
Expand Down
10 changes: 3 additions & 7 deletions core/src/main/scala/kafka/server/KafkaRaftServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,8 @@ import kafka.server.KafkaRaftServer.{BrokerRole, ControllerRole}
import kafka.utils.{CoreUtils, Logging, Mx4jLoader, VerifiableProperties}
import org.apache.kafka.common.{TopicPartition, Uuid}
import org.apache.kafka.common.utils.{AppInfoParser, Time}
import org.apache.kafka.metadata.ApiMessageAndVersion
import org.apache.kafka.metadata.{ApiMessageAndVersion, MetadataRecordSerde}
import org.apache.kafka.raft.RaftConfig
import org.apache.kafka.raft.metadata.{MetaLogRaftShim, MetadataRecordSerde}

import scala.collection.Seq

Expand All @@ -56,7 +55,7 @@ class KafkaRaftServer(
private val metrics = Server.initializeMetrics(
config,
time,
metaProps.clusterId.toString
metaProps.clusterId
)

private val controllerQuorumVotersFuture = CompletableFuture.completedFuture(
Expand All @@ -74,13 +73,11 @@ class KafkaRaftServer(
controllerQuorumVotersFuture
)

private val metaLogShim = new MetaLogRaftShim(raftManager.kafkaRaftClient, config.nodeId)

private val broker: Option[BrokerServer] = if (config.processRoles.contains(BrokerRole)) {
Some(new BrokerServer(
config,
metaProps,
metaLogShim,
raftManager,
time,
metrics,
threadNamePrefix,
Expand All @@ -96,7 +93,6 @@ class KafkaRaftServer(
Some(new ControllerServer(
metaProps,
config,
metaLogShim,
raftManager,
time,
metrics,
Expand Down
Loading