Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-12342: Reverse module dependency between Raft and Metadata #10705

Merged
merged 12 commits into from
May 20, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -1069,6 +1069,7 @@ project(':metadata') {
dependencies {
implementation project(':server-common')
implementation project(':clients')
implementation project(':raft')
implementation libs.jacksonDatabind
implementation libs.jacksonJDK8Datatypes
implementation libs.metrics
Expand All @@ -1077,6 +1078,7 @@ project(':metadata') {
testImplementation libs.hamcrest
testImplementation libs.slf4jlog4j
testImplementation project(':clients').sourceSets.test.output
testImplementation project(':raft').sourceSets.test.output
}

task processMessages(type:JavaExec) {
Expand Down Expand Up @@ -1267,7 +1269,6 @@ project(':raft') {
dependencies {
implementation project(':server-common')
implementation project(':clients')
implementation project(':metadata')
implementation libs.slf4jApi
implementation libs.jacksonDatabind

Expand Down
9 changes: 7 additions & 2 deletions checkstyle/import-control.xml
Original file line number Diff line number Diff line change
Expand Up @@ -217,12 +217,15 @@
<allow pkg="org.apache.kafka.common.network" />
<allow pkg="org.apache.kafka.common.protocol" />
<allow pkg="org.apache.kafka.common.quota" />
<allow pkg="org.apache.kafka.common.record" />
<allow pkg="org.apache.kafka.common.requests" />
<allow pkg="org.apache.kafka.controller" />
<allow pkg="org.apache.kafka.metadata" />
<allow pkg="org.apache.kafka.metalog" />
<allow pkg="org.apache.kafka.queue" />
<allow pkg="org.apache.kafka.raft" />
<allow pkg="org.apache.kafka.server.common" />
<allow pkg="org.apache.kafka.snapshot" />
<allow pkg="org.apache.kafka.test" />
<allow pkg="org.apache.kafka.timeline" />
</subpackage>
Expand All @@ -234,6 +237,8 @@
<allow pkg="org.apache.kafka.common.metadata" />
<allow pkg="org.apache.kafka.common.protocol" />
<allow pkg="org.apache.kafka.metadata" />
<allow pkg="org.apache.kafka.metalog" />
<allow pkg="org.apache.kafka.raft" />
<allow pkg="org.apache.kafka.server.common" />
<allow pkg="org.apache.kafka.test" />
</subpackage>
Expand All @@ -243,6 +248,8 @@
<allow pkg="org.apache.kafka.common.protocol" />
<allow pkg="org.apache.kafka.metadata" />
<allow pkg="org.apache.kafka.metalog" />
<allow pkg="org.apache.kafka.raft" />
<allow pkg="org.apache.kafka.snapshot" />
<allow pkg="org.apache.kafka.queue" />
<allow pkg="org.apache.kafka.server.common" />
<allow pkg="org.apache.kafka.test" />
Expand Down Expand Up @@ -292,7 +299,6 @@
<allow pkg="net.sourceforge.argparse4j" />
<allow pkg="org.apache.kafka.common"/>
<allow pkg="org.apache.kafka.metadata"/>
<allow pkg="org.apache.kafka.metalog"/>
<allow pkg="org.apache.kafka.queue"/>
<allow pkg="org.apache.kafka.raft"/>
<allow pkg="org.apache.kafka.server.common" />
Expand Down Expand Up @@ -418,7 +424,6 @@
<allow pkg="org.apache.kafka.common.record" />
<allow pkg="org.apache.kafka.common.requests" />
<allow pkg="org.apache.kafka.common.protocol" />
<allow pkg="org.apache.kafka.metalog" />
<allow pkg="org.apache.kafka.server.common" />
<allow pkg="org.apache.kafka.server.common.serialization" />
<allow pkg="org.apache.kafka.test"/>
Expand Down
27 changes: 17 additions & 10 deletions core/src/main/scala/kafka/raft/RaftManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,16 @@ import kafka.server.{KafkaConfig, MetaProperties}
import kafka.utils.timer.SystemTimer
import kafka.utils.{KafkaScheduler, Logging, ShutdownableThread}
import org.apache.kafka.clients.{ApiVersions, ManualMetadataUpdater, NetworkClient}
import org.apache.kafka.common.{TopicPartition, Uuid}
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.network.{ChannelBuilders, ListenerName, NetworkReceive, Selectable, Selector}
import org.apache.kafka.common.protocol.ApiMessage
import org.apache.kafka.common.requests.RequestHeader
import org.apache.kafka.common.security.JaasContext
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.utils.{LogContext, Time}
import org.apache.kafka.common.{TopicPartition, Uuid}
import org.apache.kafka.raft.RaftConfig.{AddressSpec, InetAddressSpec, NON_ROUTABLE_ADDRESS, UnknownAddressSpec}
import org.apache.kafka.raft.{FileBasedStateStore, KafkaRaftClient, RaftClient, RaftConfig, RaftRequest}
import org.apache.kafka.raft.{FileBasedStateStore, KafkaRaftClient, LeaderAndEpoch, RaftClient, RaftConfig, RaftRequest}
import org.apache.kafka.server.common.serialization.RecordSerde
import scala.jdk.CollectionConverters._

Expand Down Expand Up @@ -100,6 +100,10 @@ trait RaftManager[T] {
epoch: Int,
records: Seq[T]
): Option[Long]

def leaderAndEpoch: LeaderAndEpoch

def client: RaftClient[T]
}

class KafkaRaftManager[T](
Expand All @@ -125,10 +129,10 @@ class KafkaRaftManager[T](
private val dataDir = createDataDir()
private val metadataLog = buildMetadataLog()
private val netChannel = buildNetworkChannel()
private val raftClient = buildRaftClient()
private val raftIoThread = new RaftIoThread(raftClient, threadNamePrefix)
val client: KafkaRaftClient[T] = buildRaftClient()
private val raftIoThread = new RaftIoThread(client, threadNamePrefix)

def kafkaRaftClient: KafkaRaftClient[T] = raftClient
def kafkaRaftClient: KafkaRaftClient[T] = client

def startup(): Unit = {
// Update the voter endpoints (if valid) with what's in RaftConfig
Expand All @@ -151,7 +155,7 @@ class KafkaRaftManager[T](

def shutdown(): Unit = {
raftIoThread.shutdown()
raftClient.close()
client.close()
scheduler.shutdown()
netChannel.close()
metadataLog.close()
Expand All @@ -160,7 +164,7 @@ class KafkaRaftManager[T](
override def register(
listener: RaftClient.Listener[T]
): Unit = {
raftClient.register(listener)
client.register(listener)
}

override def scheduleAtomicAppend(
Expand All @@ -183,9 +187,9 @@ class KafkaRaftManager[T](
isAtomic: Boolean
): Option[Long] = {
val offset = if (isAtomic) {
raftClient.scheduleAtomicAppend(epoch, records.asJava)
client.scheduleAtomicAppend(epoch, records.asJava)
} else {
raftClient.scheduleAppend(epoch, records.asJava)
client.scheduleAppend(epoch, records.asJava)
}

Option(offset).map(Long.unbox)
Expand All @@ -202,7 +206,7 @@ class KafkaRaftManager[T](
createdTimeMs
)

raftClient.handle(inboundRequest)
client.handle(inboundRequest)

inboundRequest.completion.thenApply { response =>
response.data
Expand Down Expand Up @@ -307,4 +311,7 @@ class KafkaRaftManager[T](
)
}

override def leaderAndEpoch: LeaderAndEpoch = {
client.leaderAndEpoch
}
}
29 changes: 15 additions & 14 deletions core/src/main/scala/kafka/server/BrokerServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@
package kafka.server

import java.util
import java.util.concurrent.{CompletableFuture, TimeUnit, TimeoutException}
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.locks.ReentrantLock
import java.util.concurrent.{CompletableFuture, TimeUnit, TimeoutException}
import java.net.InetAddress

import kafka.cluster.Broker.ServerInfo
Expand All @@ -29,6 +29,7 @@ import kafka.coordinator.transaction.{ProducerIdGenerator, TransactionCoordinato
import kafka.log.LogManager
import kafka.metrics.KafkaYammerMetrics
import kafka.network.SocketServer
import kafka.raft.RaftManager
import kafka.security.CredentialProvider
import kafka.server.metadata.{BrokerMetadataListener, CachedConfigRepository, ClientQuotaCache, ClientQuotaMetadataManager, RaftMetadataCache}
import kafka.utils.{CoreUtils, KafkaScheduler}
Expand All @@ -43,10 +44,10 @@ import org.apache.kafka.common.security.token.delegation.internals.DelegationTok
import org.apache.kafka.common.utils.{AppInfoParser, LogContext, Time, Utils}
import org.apache.kafka.common.{ClusterResource, Endpoint, KafkaException}
import org.apache.kafka.metadata.{BrokerState, VersionRange}
import org.apache.kafka.metalog.MetaLogManager
import org.apache.kafka.raft.RaftConfig
import org.apache.kafka.raft.RaftConfig.AddressSpec
import org.apache.kafka.server.authorizer.Authorizer
import org.apache.kafka.server.common.ApiMessageAndVersion;

import scala.collection.{Map, Seq}
import scala.jdk.CollectionConverters._
Expand All @@ -55,16 +56,16 @@ import scala.jdk.CollectionConverters._
* A Kafka broker that runs in KRaft (Kafka Raft) mode.
*/
class BrokerServer(
val config: KafkaConfig,
val metaProps: MetaProperties,
val metaLogManager: MetaLogManager,
val time: Time,
val metrics: Metrics,
val threadNamePrefix: Option[String],
val initialOfflineDirs: Seq[String],
val controllerQuorumVotersFuture: CompletableFuture[util.Map[Integer, AddressSpec]],
val supportedFeatures: util.Map[String, VersionRange]
) extends KafkaBroker {
val config: KafkaConfig,
val metaProps: MetaProperties,
val raftManager: RaftManager[ApiMessageAndVersion],
val time: Time,
val metrics: Metrics,
val threadNamePrefix: Option[String],
val initialOfflineDirs: Seq[String],
val controllerQuorumVotersFuture: CompletableFuture[util.Map[Integer, AddressSpec]],
val supportedFeatures: util.Map[String, VersionRange]
) extends KafkaBroker {

import kafka.server.Server._

Expand Down Expand Up @@ -181,7 +182,7 @@ class BrokerServer(
credentialProvider = new CredentialProvider(ScramMechanism.mechanismNames, tokenCache)

val controllerNodes = RaftConfig.voterConnectionsToNodes(controllerQuorumVotersFuture.get()).asScala
val controllerNodeProvider = RaftControllerNodeProvider(metaLogManager, config, controllerNodes)
val controllerNodeProvider = RaftControllerNodeProvider(raftManager, config, controllerNodes)

clientToControllerChannelManager = BrokerToControllerChannelManager(
controllerNodeProvider,
Expand Down Expand Up @@ -284,7 +285,7 @@ class BrokerServer(
metaProps.clusterId, networkListeners, supportedFeatures)

// Register a listener with the Raft layer to receive metadata event notifications
metaLogManager.register(brokerMetadataListener)
raftManager.register(brokerMetadataListener)

val endpoints = new util.ArrayList[Endpoint](networkListeners.size())
var interBrokerListener: Endpoint = null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import java.util.concurrent.LinkedBlockingDeque
import java.util.concurrent.atomic.AtomicReference

import kafka.common.{InterBrokerSendThread, RequestAndCompletionHandler}
import kafka.raft.RaftManager
import kafka.utils.Logging
import org.apache.kafka.clients._
import org.apache.kafka.common.Node
Expand All @@ -31,9 +32,10 @@ import org.apache.kafka.common.requests.AbstractRequest
import org.apache.kafka.common.security.JaasContext
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.utils.{LogContext, Time}
import org.apache.kafka.metalog.MetaLogManager
import org.apache.kafka.server.common.ApiMessageAndVersion;

import scala.collection.Seq
import scala.compat.java8.OptionConverters._
import scala.jdk.CollectionConverters._

trait ControllerNodeProvider {
Expand Down Expand Up @@ -77,15 +79,14 @@ class MetadataCacheControllerNodeProvider(
}

object RaftControllerNodeProvider {
def apply(metaLogManager: MetaLogManager,
def apply(raftManager: RaftManager[ApiMessageAndVersion],
config: KafkaConfig,
controllerQuorumVoterNodes: Seq[Node]): RaftControllerNodeProvider = {

val controllerListenerName = new ListenerName(config.controllerListenerNames.head)
val controllerSecurityProtocol = config.listenerSecurityProtocolMap.getOrElse(controllerListenerName, SecurityProtocol.forName(controllerListenerName.value()))
val controllerSaslMechanism = config.saslMechanismControllerProtocol
new RaftControllerNodeProvider(
metaLogManager,
raftManager,
controllerQuorumVoterNodes,
controllerListenerName,
controllerSecurityProtocol,
Expand All @@ -98,7 +99,7 @@ object RaftControllerNodeProvider {
* Finds the controller node by checking the metadata log manager.
* This provider is used when we are using a Raft-based metadata quorum.
*/
class RaftControllerNodeProvider(val metaLogManager: MetaLogManager,
class RaftControllerNodeProvider(val raftManager: RaftManager[ApiMessageAndVersion],
controllerQuorumVoterNodes: Seq[Node],
val listenerName: ListenerName,
val securityProtocol: SecurityProtocol,
Expand All @@ -107,14 +108,7 @@ class RaftControllerNodeProvider(val metaLogManager: MetaLogManager,
val idToNode = controllerQuorumVoterNodes.map(node => node.id() -> node).toMap

override def get(): Option[Node] = {
val leader = metaLogManager.leader()
if (leader == null) {
None
} else if (leader.nodeId() < 0) {
None
} else {
idToNode.get(leader.nodeId())
}
raftManager.leaderAndEpoch.leaderId.asScala.map(idToNode)
}
}

Expand Down
23 changes: 11 additions & 12 deletions core/src/main/scala/kafka/server/ControllerServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@

package kafka.server

import java.util.concurrent.{CompletableFuture, TimeUnit}
import java.util
import java.util.concurrent.locks.ReentrantLock
import java.util.concurrent.{CompletableFuture, TimeUnit}

import kafka.cluster.Broker.ServerInfo
import kafka.log.LogConfig
import kafka.metrics.{KafkaMetricsGroup, KafkaYammerMetrics, LinuxIoMetricsCollector}
Expand All @@ -37,7 +38,6 @@ import org.apache.kafka.common.utils.{LogContext, Time}
import org.apache.kafka.common.{ClusterResource, Endpoint}
import org.apache.kafka.controller.{Controller, QuorumController, QuorumControllerMetrics}
import org.apache.kafka.metadata.VersionRange
import org.apache.kafka.metalog.MetaLogManager
import org.apache.kafka.raft.RaftConfig
import org.apache.kafka.raft.RaftConfig.AddressSpec
import org.apache.kafka.server.authorizer.Authorizer
Expand All @@ -49,15 +49,14 @@ import scala.jdk.CollectionConverters._
* A Kafka controller that runs in KRaft (Kafka Raft) mode.
*/
class ControllerServer(
val metaProperties: MetaProperties,
val config: KafkaConfig,
val metaLogManager: MetaLogManager,
val raftManager: RaftManager[ApiMessageAndVersion],
val time: Time,
val metrics: Metrics,
val threadNamePrefix: Option[String],
val controllerQuorumVotersFuture: CompletableFuture[util.Map[Integer, AddressSpec]]
) extends Logging with KafkaMetricsGroup {
val metaProperties: MetaProperties,
val config: KafkaConfig,
val raftManager: RaftManager[ApiMessageAndVersion],
val time: Time,
val metrics: Metrics,
val threadNamePrefix: Option[String],
val controllerQuorumVotersFuture: CompletableFuture[util.Map[Integer, AddressSpec]]
) extends Logging with KafkaMetricsGroup {
import kafka.server.Server._

val lock = new ReentrantLock()
Expand Down Expand Up @@ -148,7 +147,7 @@ class ControllerServer(
setTime(time).
setThreadNamePrefix(threadNamePrefixAsString).
setConfigDefs(configDefs).
setLogManager(metaLogManager).
setRaftClient(raftManager.client).
setDefaultReplicationFactor(config.defaultReplicationFactor.toShort).
setDefaultNumPartitions(config.numPartitions.intValue()).
setSessionTimeoutNs(TimeUnit.NANOSECONDS.convert(config.brokerSessionTimeoutMs.longValue(),
Expand Down
Loading