From bf5ed4b93832bdfda0e70115c4a863878a0e733f Mon Sep 17 00:00:00 2001 From: Jason Gustafson Date: Tue, 6 Apr 2021 11:04:44 -0700 Subject: [PATCH 1/2] KAFKA-12342; Merge RaftClient and MetaLogManager interfaces and remove shim --- build.gradle | 2 +- checkstyle/import-control.xml | 14 +- .../main/scala/kafka/raft/RaftManager.scala | 25 +-- .../scala/kafka/server/BrokerServer.scala | 30 ++-- .../BrokerToControllerChannelManager.scala | 20 +-- .../scala/kafka/server/ControllerServer.scala | 22 ++- .../scala/kafka/server/KafkaRaftServer.scala | 10 +- .../metadata/BrokerMetadataListener.scala | 58 ++++--- .../scala/kafka/tools/DumpLogSegments.scala | 2 +- .../scala/kafka/tools/TestRaftServer.scala | 15 +- .../kafka/testkit/KafkaClusterTestKit.java | 9 +- .../metadata/BrokerMetadataListenerTest.scala | 23 ++- .../kafka/tools/DumpLogSegmentsTest.scala | 3 +- .../kafka/controller/QuorumController.java | 129 +++++++++------- .../kafka}/metadata/MetadataRecordSerde.java | 3 +- .../apache/kafka/metalog/MetaLogLeader.java | 58 ------- .../apache/kafka/metalog/MetaLogListener.java | 55 ------- .../apache/kafka/metalog/MetaLogManager.java | 96 ------------ .../controller/QuorumControllerTestEnv.java | 2 +- .../apache/kafka/metalog/LocalLogManager.java | 130 ++++++++++------ .../kafka/metalog/LocalLogManagerTest.java | 39 +++-- .../kafka/metalog/LocalLogManagerTestEnv.java | 17 +- .../metalog}/MetadataRecordSerdeTest.java | 3 +- .../metalog/MockMetaLogManagerListener.java | 61 +++++--- .../org/apache/kafka/raft/BatchReader.java | 8 + .../apache/kafka/raft/KafkaRaftClient.java | 79 ++++++---- .../org/apache/kafka/raft/LeaderAndEpoch.java | 12 ++ .../org/apache/kafka/raft/QuorumState.java | 4 + .../org/apache/kafka/raft/RaftClient.java | 39 +++-- .../apache/kafka/raft/ReplicatedCounter.java | 23 ++- .../kafka/raft/metadata/MetaLogRaftShim.java | 145 ------------------ .../kafka/raft/KafkaRaftClientTest.java | 6 +- .../kafka/raft/RaftClientTestContext.java | 26 ++-- .../kafka/shell/MetadataNodeManager.java | 36 +---- .../kafka/shell/SnapshotFileReader.java | 34 ++-- 35 files changed, 494 insertions(+), 744 deletions(-) rename {raft/src/main/java/org/apache/kafka/raft => metadata/src/main/java/org/apache/kafka}/metadata/MetadataRecordSerde.java (96%) delete mode 100644 metadata/src/main/java/org/apache/kafka/metalog/MetaLogLeader.java delete mode 100644 metadata/src/main/java/org/apache/kafka/metalog/MetaLogListener.java delete mode 100644 metadata/src/main/java/org/apache/kafka/metalog/MetaLogManager.java rename {raft/src/test/java/org/apache/kafka/raft/metadata => metadata/src/test/java/org/apache/kafka/metalog}/MetadataRecordSerdeTest.java (97%) delete mode 100644 raft/src/main/java/org/apache/kafka/raft/metadata/MetaLogRaftShim.java diff --git a/build.gradle b/build.gradle index e1a05e3c0a170..4c628eca320a7 100644 --- a/build.gradle +++ b/build.gradle @@ -1050,6 +1050,7 @@ project(':metadata') { dependencies { implementation project(':clients') + implementation project(':raft') implementation libs.jacksonDatabind implementation libs.jacksonJDK8Datatypes implementation libs.metrics @@ -1248,7 +1249,6 @@ project(':raft') { dependencies { implementation project(':clients') - implementation project(':metadata') implementation libs.slf4jApi implementation libs.jacksonDatabind diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml index a5235039d913f..14446d4b50bf8 100644 --- a/checkstyle/import-control.xml +++ b/checkstyle/import-control.xml @@ -219,7 +219,7 @@ - + @@ -231,15 +231,7 @@ - - - - - - - - - + @@ -277,7 +269,6 @@ - @@ -401,7 +392,6 @@ - diff --git a/core/src/main/scala/kafka/raft/RaftManager.scala b/core/src/main/scala/kafka/raft/RaftManager.scala index 50e67f43d3dca..e78a2580e8ac0 100644 --- a/core/src/main/scala/kafka/raft/RaftManager.scala +++ b/core/src/main/scala/kafka/raft/RaftManager.scala @@ -37,7 +37,7 @@ import org.apache.kafka.common.security.JaasContext import org.apache.kafka.common.security.auth.SecurityProtocol import org.apache.kafka.common.utils.{LogContext, Time} import org.apache.kafka.raft.RaftConfig.{AddressSpec, InetAddressSpec, NON_ROUTABLE_ADDRESS, UnknownAddressSpec} -import org.apache.kafka.raft.{FileBasedStateStore, KafkaRaftClient, RaftClient, RaftConfig, RaftRequest, RecordSerde} +import org.apache.kafka.raft.{FileBasedStateStore, KafkaRaftClient, LeaderAndEpoch, RaftClient, RaftConfig, RaftRequest, RecordSerde} import scala.jdk.CollectionConverters._ @@ -101,6 +101,10 @@ trait RaftManager[T] { epoch: Int, records: Seq[T] ): Option[Long] + + def leaderAndEpoch: LeaderAndEpoch + + def client: RaftClient[T] } class KafkaRaftManager[T]( @@ -125,10 +129,10 @@ class KafkaRaftManager[T]( private val dataDir = createDataDir() private val metadataLog = buildMetadataLog() private val netChannel = buildNetworkChannel() - private val raftClient = buildRaftClient() - private val raftIoThread = new RaftIoThread(raftClient, threadNamePrefix) + val client: KafkaRaftClient[T] = buildRaftClient() + private val raftIoThread = new RaftIoThread(client, threadNamePrefix) - def kafkaRaftClient: KafkaRaftClient[T] = raftClient + def kafkaRaftClient: KafkaRaftClient[T] = client def startup(): Unit = { // Update the voter endpoints (if valid) with what's in RaftConfig @@ -151,7 +155,7 @@ class KafkaRaftManager[T]( def shutdown(): Unit = { raftIoThread.shutdown() - raftClient.close() + client.close() scheduler.shutdown() netChannel.close() metadataLog.close() @@ -160,7 +164,7 @@ class KafkaRaftManager[T]( override def register( listener: RaftClient.Listener[T] ): Unit = { - raftClient.register(listener) + client.register(listener) } override def scheduleAtomicAppend( @@ -183,9 +187,9 @@ class KafkaRaftManager[T]( isAtomic: Boolean ): Option[Long] = { val offset = if (isAtomic) { - raftClient.scheduleAtomicAppend(epoch, records.asJava) + client.scheduleAtomicAppend(epoch, records.asJava) } else { - raftClient.scheduleAppend(epoch, records.asJava) + client.scheduleAppend(epoch, records.asJava) } Option(offset).map(Long.unbox) @@ -202,7 +206,7 @@ class KafkaRaftManager[T]( createdTimeMs ) - raftClient.handle(inboundRequest) + client.handle(inboundRequest) inboundRequest.completion.thenApply { response => response.data @@ -306,4 +310,7 @@ class KafkaRaftManager[T]( ) } + override def leaderAndEpoch: LeaderAndEpoch = { + client.leaderAndEpoch + } } diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala b/core/src/main/scala/kafka/server/BrokerServer.scala index 65620d3ae5a3d..2b634fe1c306b 100644 --- a/core/src/main/scala/kafka/server/BrokerServer.scala +++ b/core/src/main/scala/kafka/server/BrokerServer.scala @@ -18,9 +18,9 @@ package kafka.server import java.util -import java.util.concurrent.{CompletableFuture, TimeUnit, TimeoutException} import java.util.concurrent.atomic.AtomicBoolean import java.util.concurrent.locks.ReentrantLock +import java.util.concurrent.{CompletableFuture, TimeUnit, TimeoutException} import kafka.cluster.Broker.ServerInfo import kafka.coordinator.group.GroupCoordinator @@ -28,6 +28,7 @@ import kafka.coordinator.transaction.{ProducerIdGenerator, TransactionCoordinato import kafka.log.LogManager import kafka.metrics.KafkaYammerMetrics import kafka.network.SocketServer +import kafka.raft.RaftManager import kafka.security.CredentialProvider import kafka.server.metadata.{BrokerMetadataListener, CachedConfigRepository, ClientQuotaCache, ClientQuotaMetadataManager, RaftMetadataCache} import kafka.utils.{CoreUtils, KafkaScheduler} @@ -41,8 +42,7 @@ import org.apache.kafka.common.security.scram.internals.ScramMechanism import org.apache.kafka.common.security.token.delegation.internals.DelegationTokenCache import org.apache.kafka.common.utils.{AppInfoParser, LogContext, Time} import org.apache.kafka.common.{ClusterResource, Endpoint, KafkaException} -import org.apache.kafka.metadata.{BrokerState, VersionRange} -import org.apache.kafka.metalog.MetaLogManager +import org.apache.kafka.metadata.{ApiMessageAndVersion, BrokerState, VersionRange} import org.apache.kafka.raft.RaftConfig import org.apache.kafka.raft.RaftConfig.AddressSpec import org.apache.kafka.server.authorizer.Authorizer @@ -54,16 +54,16 @@ import scala.jdk.CollectionConverters._ * A Kafka broker that runs in KRaft (Kafka Raft) mode. */ class BrokerServer( - val config: KafkaConfig, - val metaProps: MetaProperties, - val metaLogManager: MetaLogManager, - val time: Time, - val metrics: Metrics, - val threadNamePrefix: Option[String], - val initialOfflineDirs: Seq[String], - val controllerQuorumVotersFuture: CompletableFuture[util.Map[Integer, AddressSpec]], - val supportedFeatures: util.Map[String, VersionRange] - ) extends KafkaBroker { + val config: KafkaConfig, + val metaProps: MetaProperties, + val raftManager: RaftManager[ApiMessageAndVersion], + val time: Time, + val metrics: Metrics, + val threadNamePrefix: Option[String], + val initialOfflineDirs: Seq[String], + val controllerQuorumVotersFuture: CompletableFuture[util.Map[Integer, AddressSpec]], + val supportedFeatures: util.Map[String, VersionRange] +) extends KafkaBroker { import kafka.server.Server._ @@ -179,7 +179,7 @@ class BrokerServer( credentialProvider = new CredentialProvider(ScramMechanism.mechanismNames, tokenCache) val controllerNodes = RaftConfig.voterConnectionsToNodes(controllerQuorumVotersFuture.get()).asScala - val controllerNodeProvider = RaftControllerNodeProvider(metaLogManager, config, controllerNodes) + val controllerNodeProvider = RaftControllerNodeProvider(raftManager, config, controllerNodes) clientToControllerChannelManager = BrokerToControllerChannelManager( controllerNodeProvider, @@ -282,7 +282,7 @@ class BrokerServer( metaProps.clusterId, networkListeners, supportedFeatures) // Register a listener with the Raft layer to receive metadata event notifications - metaLogManager.register(brokerMetadataListener) + raftManager.register(brokerMetadataListener) val endpoints = new util.ArrayList[Endpoint](networkListeners.size()) var interBrokerListener: Endpoint = null diff --git a/core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala b/core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala index 5834a17942286..d3ae104fd563f 100644 --- a/core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala +++ b/core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala @@ -21,6 +21,7 @@ import java.util.concurrent.LinkedBlockingDeque import java.util.concurrent.atomic.AtomicReference import kafka.common.{InterBrokerSendThread, RequestAndCompletionHandler} +import kafka.raft.RaftManager import kafka.utils.Logging import org.apache.kafka.clients._ import org.apache.kafka.common.Node @@ -31,9 +32,10 @@ import org.apache.kafka.common.requests.AbstractRequest import org.apache.kafka.common.security.JaasContext import org.apache.kafka.common.security.auth.SecurityProtocol import org.apache.kafka.common.utils.{LogContext, Time} -import org.apache.kafka.metalog.MetaLogManager +import org.apache.kafka.metadata.ApiMessageAndVersion import scala.collection.Seq +import scala.compat.java8.OptionConverters._ import scala.jdk.CollectionConverters._ trait ControllerNodeProvider { @@ -77,15 +79,14 @@ class MetadataCacheControllerNodeProvider( } object RaftControllerNodeProvider { - def apply(metaLogManager: MetaLogManager, + def apply(raftManager: RaftManager[ApiMessageAndVersion], config: KafkaConfig, controllerQuorumVoterNodes: Seq[Node]): RaftControllerNodeProvider = { - val controllerListenerName = new ListenerName(config.controllerListenerNames.head) val controllerSecurityProtocol = config.listenerSecurityProtocolMap.getOrElse(controllerListenerName, SecurityProtocol.forName(controllerListenerName.value())) val controllerSaslMechanism = config.saslMechanismControllerProtocol new RaftControllerNodeProvider( - metaLogManager, + raftManager, controllerQuorumVoterNodes, controllerListenerName, controllerSecurityProtocol, @@ -98,7 +99,7 @@ object RaftControllerNodeProvider { * Finds the controller node by checking the metadata log manager. * This provider is used when we are using a Raft-based metadata quorum. */ -class RaftControllerNodeProvider(val metaLogManager: MetaLogManager, +class RaftControllerNodeProvider(val raftManager: RaftManager[ApiMessageAndVersion], controllerQuorumVoterNodes: Seq[Node], val listenerName: ListenerName, val securityProtocol: SecurityProtocol, @@ -107,14 +108,7 @@ class RaftControllerNodeProvider(val metaLogManager: MetaLogManager, val idToNode = controllerQuorumVoterNodes.map(node => node.id() -> node).toMap override def get(): Option[Node] = { - val leader = metaLogManager.leader() - if (leader == null) { - None - } else if (leader.nodeId() < 0) { - None - } else { - idToNode.get(leader.nodeId()) - } + raftManager.leaderAndEpoch.leaderId.asScala.map(idToNode) } } diff --git a/core/src/main/scala/kafka/server/ControllerServer.scala b/core/src/main/scala/kafka/server/ControllerServer.scala index 6a2844af8a25d..f3fe22bf8d4ab 100644 --- a/core/src/main/scala/kafka/server/ControllerServer.scala +++ b/core/src/main/scala/kafka/server/ControllerServer.scala @@ -17,9 +17,9 @@ package kafka.server -import java.util.concurrent.{CompletableFuture, TimeUnit} import java.util import java.util.concurrent.locks.ReentrantLock +import java.util.concurrent.{CompletableFuture, TimeUnit} import kafka.cluster.Broker.ServerInfo import kafka.log.LogConfig @@ -38,7 +38,6 @@ import org.apache.kafka.common.utils.{LogContext, Time} import org.apache.kafka.common.{ClusterResource, Endpoint} import org.apache.kafka.controller.{Controller, QuorumController, QuorumControllerMetrics} import org.apache.kafka.metadata.{ApiMessageAndVersion, VersionRange} -import org.apache.kafka.metalog.MetaLogManager import org.apache.kafka.raft.RaftConfig import org.apache.kafka.raft.RaftConfig.AddressSpec import org.apache.kafka.server.authorizer.Authorizer @@ -49,15 +48,14 @@ import scala.jdk.CollectionConverters._ * A Kafka controller that runs in KRaft (Kafka Raft) mode. */ class ControllerServer( - val metaProperties: MetaProperties, - val config: KafkaConfig, - val metaLogManager: MetaLogManager, - val raftManager: RaftManager[ApiMessageAndVersion], - val time: Time, - val metrics: Metrics, - val threadNamePrefix: Option[String], - val controllerQuorumVotersFuture: CompletableFuture[util.Map[Integer, AddressSpec]] - ) extends Logging with KafkaMetricsGroup { + val metaProperties: MetaProperties, + val config: KafkaConfig, + val raftManager: RaftManager[ApiMessageAndVersion], + val time: Time, + val metrics: Metrics, + val threadNamePrefix: Option[String], + val controllerQuorumVotersFuture: CompletableFuture[util.Map[Integer, AddressSpec]] +) extends Logging with KafkaMetricsGroup { import kafka.server.Server._ val lock = new ReentrantLock() @@ -148,7 +146,7 @@ class ControllerServer( setTime(time). setThreadNamePrefix(threadNamePrefixAsString). setConfigDefs(configDefs). - setLogManager(metaLogManager). + setRaftClient(raftManager.client). setDefaultReplicationFactor(config.defaultReplicationFactor.toShort). setDefaultNumPartitions(config.numPartitions.intValue()). setSessionTimeoutNs(TimeUnit.NANOSECONDS.convert(config.brokerSessionTimeoutMs.longValue(), diff --git a/core/src/main/scala/kafka/server/KafkaRaftServer.scala b/core/src/main/scala/kafka/server/KafkaRaftServer.scala index 965da861c7196..b6d23309f3a36 100644 --- a/core/src/main/scala/kafka/server/KafkaRaftServer.scala +++ b/core/src/main/scala/kafka/server/KafkaRaftServer.scala @@ -27,9 +27,8 @@ import kafka.server.KafkaRaftServer.{BrokerRole, ControllerRole} import kafka.utils.{CoreUtils, Logging, Mx4jLoader, VerifiableProperties} import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.utils.{AppInfoParser, Time} -import org.apache.kafka.metadata.ApiMessageAndVersion +import org.apache.kafka.metadata.{ApiMessageAndVersion, MetadataRecordSerde} import org.apache.kafka.raft.RaftConfig -import org.apache.kafka.raft.metadata.{MetaLogRaftShim, MetadataRecordSerde} import scala.collection.Seq @@ -56,7 +55,7 @@ class KafkaRaftServer( private val metrics = Server.initializeMetrics( config, time, - metaProps.clusterId.toString + metaProps.clusterId ) private val controllerQuorumVotersFuture = CompletableFuture.completedFuture( @@ -73,13 +72,11 @@ class KafkaRaftServer( controllerQuorumVotersFuture ) - private val metaLogShim = new MetaLogRaftShim(raftManager.kafkaRaftClient, config.nodeId) - private val broker: Option[BrokerServer] = if (config.processRoles.contains(BrokerRole)) { Some(new BrokerServer( config, metaProps, - metaLogShim, + raftManager, time, metrics, threadNamePrefix, @@ -95,7 +92,6 @@ class KafkaRaftServer( Some(new ControllerServer( metaProps, config, - metaLogShim, raftManager, time, metrics, diff --git a/core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala b/core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala index 8d07f8ea9fc0f..1d0f9930343d6 100644 --- a/core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala +++ b/core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala @@ -18,6 +18,7 @@ package kafka.server.metadata import java.util import java.util.concurrent.TimeUnit + import kafka.coordinator.group.GroupCoordinator import kafka.coordinator.transaction.TransactionCoordinator import kafka.metrics.KafkaMetricsGroup @@ -27,9 +28,11 @@ import org.apache.kafka.common.metadata.MetadataRecordType._ import org.apache.kafka.common.metadata._ import org.apache.kafka.common.protocol.ApiMessage import org.apache.kafka.common.utils.{LogContext, Time} -import org.apache.kafka.metalog.{MetaLogLeader, MetaLogListener} +import org.apache.kafka.metadata.ApiMessageAndVersion import org.apache.kafka.queue.{EventQueue, KafkaEventQueue} +import org.apache.kafka.raft.{BatchReader, LeaderAndEpoch, RaftClient} +import scala.compat.java8.OptionConverters._ import scala.jdk.CollectionConverters._ object BrokerMetadataListener{ @@ -37,16 +40,17 @@ object BrokerMetadataListener{ val MetadataBatchSizes = "MetadataBatchSizes" } -class BrokerMetadataListener(brokerId: Int, - time: Time, - metadataCache: RaftMetadataCache, - configRepository: CachedConfigRepository, - groupCoordinator: GroupCoordinator, - replicaManager: RaftReplicaManager, - txnCoordinator: TransactionCoordinator, - threadNamePrefix: Option[String], - clientQuotaManager: ClientQuotaMetadataManager - ) extends MetaLogListener with KafkaMetricsGroup { +class BrokerMetadataListener( + brokerId: Int, + time: Time, + metadataCache: RaftMetadataCache, + configRepository: CachedConfigRepository, + groupCoordinator: GroupCoordinator, + replicaManager: RaftReplicaManager, + txnCoordinator: TransactionCoordinator, + threadNamePrefix: Option[String], + clientQuotaManager: ClientQuotaMetadataManager +) extends RaftClient.Listener[ApiMessageAndVersion] with KafkaMetricsGroup { private val logContext = new LogContext(s"[BrokerMetadataListener id=${brokerId}] ") private val log = logContext.logger(classOf[BrokerMetadataListener]) logIdent = logContext.logPrefix() @@ -73,17 +77,25 @@ class BrokerMetadataListener(brokerId: Int, /** * Handle new metadata records. */ - override def handleCommits(lastOffset: Long, records: util.List[ApiMessage]): Unit = { - eventQueue.append(new HandleCommitsEvent(lastOffset, records)) + override def handleCommit(reader: BatchReader[ApiMessageAndVersion]): Unit = { + try { + while (reader.hasNext) { + val batch = reader.next() + eventQueue.append(new HandleCommitsEvent(batch.lastOffset, batch.records )) + } + } finally { + reader.close() + } + } // Visible for testing. It's useful to execute events synchronously - private[metadata] def execCommits(lastOffset: Long, records: util.List[ApiMessage]): Unit = { + private[metadata] def execCommits(lastOffset: Long, records: util.List[ApiMessageAndVersion]): Unit = { new HandleCommitsEvent(lastOffset, records).run() } class HandleCommitsEvent(lastOffset: Long, - records: util.List[ApiMessage]) + records: util.List[ApiMessageAndVersion]) extends EventQueue.FailureLoggingEvent(log) { override def run(): Unit = { if (isDebugEnabled) { @@ -100,7 +112,7 @@ class BrokerMetadataListener(brokerId: Int, trace("Metadata batch %d: processing [%d/%d]: %s.".format(lastOffset, index + 1, records.size(), record.toString)) } - handleMessage(imageBuilder, record, lastOffset) + handleMessage(imageBuilder, record.message, lastOffset) } catch { case e: Exception => error(s"Unable to handle record ${index} in batch " + s"ending at offset ${lastOffset}", e) @@ -234,22 +246,20 @@ class BrokerMetadataListener(brokerId: Int, clientQuotaManager.handleQuotaRecord(record) } - class HandleNewLeaderEvent(leader: MetaLogLeader) + class HandleNewLeaderEvent(leaderAndEpoch: LeaderAndEpoch) extends EventQueue.FailureLoggingEvent(log) { override def run(): Unit = { val imageBuilder = MetadataImageBuilder(brokerId, log, metadataCache.currentImage()) - if (leader.nodeId() < 0) { - imageBuilder.controllerId(None) - } else { - imageBuilder.controllerId(Some(leader.nodeId())) - } + imageBuilder.controllerId(leaderAndEpoch.leaderId.asScala) metadataCache.image(imageBuilder.build()) } } - override def handleNewLeader(leader: MetaLogLeader): Unit = { - eventQueue.append(new HandleNewLeaderEvent(leader)) + override def handleLeaderChange(leader: LeaderAndEpoch): Unit = { + if (leader.isLeader(brokerId)) { + eventQueue.append(new HandleNewLeaderEvent(leader)) + } } class ShutdownEvent() extends EventQueue.FailureLoggingEvent(log) { diff --git a/core/src/main/scala/kafka/tools/DumpLogSegments.scala b/core/src/main/scala/kafka/tools/DumpLogSegments.scala index 8bc07da73e51c..71f2ae9aa8b0a 100755 --- a/core/src/main/scala/kafka/tools/DumpLogSegments.scala +++ b/core/src/main/scala/kafka/tools/DumpLogSegments.scala @@ -30,7 +30,7 @@ import org.apache.kafka.common.metadata.{MetadataJsonConverters, MetadataRecordT import org.apache.kafka.common.protocol.ByteBufferAccessor import org.apache.kafka.common.record._ import org.apache.kafka.common.utils.Utils -import org.apache.kafka.raft.metadata.MetadataRecordSerde +import org.apache.kafka.metadata.MetadataRecordSerde import scala.jdk.CollectionConverters._ import scala.collection.mutable diff --git a/core/src/main/scala/kafka/tools/TestRaftServer.scala b/core/src/main/scala/kafka/tools/TestRaftServer.scala index e52a960168927..47b0a18d9d71f 100644 --- a/core/src/main/scala/kafka/tools/TestRaftServer.scala +++ b/core/src/main/scala/kafka/tools/TestRaftServer.scala @@ -19,6 +19,7 @@ package kafka.tools import java.util.concurrent.atomic.{AtomicInteger, AtomicLong} import java.util.concurrent.{CompletableFuture, CountDownLatch, LinkedBlockingDeque, TimeUnit} + import joptsimple.OptionException import kafka.network.SocketServer import kafka.raft.{KafkaRaftManager, RaftManager} @@ -36,7 +37,7 @@ import org.apache.kafka.common.security.token.delegation.internals.DelegationTok import org.apache.kafka.common.utils.{Time, Utils} import org.apache.kafka.common.{TopicPartition, Uuid, protocol} import org.apache.kafka.raft.BatchReader.Batch -import org.apache.kafka.raft.{BatchReader, RaftClient, RaftConfig, RecordSerde} +import org.apache.kafka.raft.{BatchReader, LeaderAndEpoch, RaftClient, RaftConfig, RecordSerde} import scala.jdk.CollectionConverters._ @@ -160,12 +161,12 @@ class TestRaftServer( raftManager.register(this) - override def handleClaim(epoch: Int): Unit = { - eventQueue.offer(HandleClaim(epoch)) - } - - override def handleResign(epoch: Int): Unit = { - eventQueue.offer(HandleResign) + override def handleLeaderChange(newLeaderAndEpoch: LeaderAndEpoch): Unit = { + if (newLeaderAndEpoch.isLeader(config.nodeId)) { + eventQueue.offer(HandleClaim(newLeaderAndEpoch.epoch)) + } else if (claimedEpoch.isDefined) { + eventQueue.offer(HandleResign) + } } override def handleCommit(reader: BatchReader[Array[Byte]]): Unit = { diff --git a/core/src/test/java/kafka/testkit/KafkaClusterTestKit.java b/core/src/test/java/kafka/testkit/KafkaClusterTestKit.java index 6e3cebb525276..20bd7281d79f5 100644 --- a/core/src/test/java/kafka/testkit/KafkaClusterTestKit.java +++ b/core/src/test/java/kafka/testkit/KafkaClusterTestKit.java @@ -37,10 +37,8 @@ import org.apache.kafka.common.utils.Utils; import org.apache.kafka.controller.Controller; import org.apache.kafka.metadata.ApiMessageAndVersion; -import org.apache.kafka.metalog.MetaLogManager; +import org.apache.kafka.metadata.MetadataRecordSerde; import org.apache.kafka.raft.RaftConfig; -import org.apache.kafka.raft.metadata.MetaLogRaftShim; -import org.apache.kafka.raft.metadata.MetadataRecordSerde; import org.apache.kafka.test.TestUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -175,11 +173,9 @@ public KafkaClusterTestKit build() throws Exception { KafkaRaftManager raftManager = new KafkaRaftManager<>( metaProperties, config, new MetadataRecordSerde(), metadataPartition, Time.SYSTEM, new Metrics(), Option.apply(threadNamePrefix), connectFutureManager.future); - MetaLogManager metaLogShim = new MetaLogRaftShim(raftManager.kafkaRaftClient(), config.nodeId()); ControllerServer controller = new ControllerServer( nodes.controllerProperties(node.id()), config, - metaLogShim, raftManager, Time.SYSTEM, new Metrics(), @@ -228,11 +224,10 @@ metaProperties, config, new MetadataRecordSerde(), metadataPartition, KafkaRaftManager raftManager = new KafkaRaftManager<>( metaProperties, config, new MetadataRecordSerde(), metadataPartition, Time.SYSTEM, new Metrics(), Option.apply(threadNamePrefix), connectFutureManager.future); - MetaLogManager metaLogShim = new MetaLogRaftShim(raftManager.kafkaRaftClient(), config.nodeId()); BrokerServer broker = new BrokerServer( config, nodes.brokerProperties(node.id()), - metaLogShim, + raftManager, Time.SYSTEM, new Metrics(), Option.apply(threadNamePrefix), diff --git a/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataListenerTest.scala b/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataListenerTest.scala index 545fe48afa544..cc9bf4c5bc935 100644 --- a/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataListenerTest.scala +++ b/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataListenerTest.scala @@ -25,9 +25,9 @@ import kafka.server.RaftReplicaManager import kafka.utils.Implicits._ import org.apache.kafka.common.config.ConfigResource import org.apache.kafka.common.metadata.{ConfigRecord, PartitionRecord, RemoveTopicRecord, TopicRecord} -import org.apache.kafka.common.protocol.ApiMessage import org.apache.kafka.common.utils.MockTime import org.apache.kafka.common.{TopicPartition, Uuid} +import org.apache.kafka.metadata.ApiMessageAndVersion import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.Test import org.mockito.ArgumentMatchers._ @@ -83,8 +83,8 @@ class BrokerMetadataListenerTest { val deleteRecord = new RemoveTopicRecord() .setTopicId(topicId) lastMetadataOffset += 1 - listener.execCommits(lastOffset = lastMetadataOffset, List[ApiMessage]( - deleteRecord, + listener.execCommits(lastOffset = lastMetadataOffset, List[ApiMessageAndVersion]( + new ApiMessageAndVersion(deleteRecord, 0.toShort), ).asJava) assertFalse(metadataCache.contains(topic)) @@ -115,11 +115,10 @@ class BrokerMetadataListenerTest { numPartitions: Int, numBrokers: Int ): Set[TopicPartition] = { - val records = new java.util.ArrayList[ApiMessage] - records.add(new TopicRecord() + val records = new java.util.ArrayList[ApiMessageAndVersion] + records.add(new ApiMessageAndVersion(new TopicRecord() .setName(topic) - .setTopicId(topicId) - ) + .setTopicId(topicId), 0)) val localTopicPartitions = mutable.Set.empty[TopicPartition] (0 until numPartitions).map { partitionId => @@ -134,24 +133,22 @@ class BrokerMetadataListenerTest { localTopicPartitions.add(new TopicPartition(topic, partitionId)) } - records.add(new PartitionRecord() + records.add(new ApiMessageAndVersion(new PartitionRecord() .setTopicId(topicId) .setPartitionId(partitionId) .setLeader(preferredLeaderId) .setLeaderEpoch(0) .setPartitionEpoch(0) .setReplicas(replicas) - .setIsr(replicas) - ) + .setIsr(replicas), 0)) } topicConfig.forKeyValue { (key, value) => - records.add(new ConfigRecord() + records.add(new ApiMessageAndVersion(new ConfigRecord() .setResourceName(topic) .setResourceType(ConfigResource.Type.TOPIC.id()) .setName(key) - .setValue(value) - ) + .setValue(value), 0)) } lastMetadataOffset += records.size() diff --git a/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala b/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala index 256262a99b4e0..36283360801e1 100644 --- a/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala +++ b/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala @@ -31,8 +31,7 @@ import org.apache.kafka.common.metadata.{PartitionChangeRecord, RegisterBrokerRe import org.apache.kafka.common.protocol.{ByteBufferAccessor, ObjectSerializationCache} import org.apache.kafka.common.record.{CompressionType, MemoryRecords, SimpleRecord} import org.apache.kafka.common.utils.Utils -import org.apache.kafka.metadata.ApiMessageAndVersion -import org.apache.kafka.raft.metadata.MetadataRecordSerde +import org.apache.kafka.metadata.{ApiMessageAndVersion, MetadataRecordSerde} import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.{AfterEach, BeforeEach, Test} diff --git a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java index 4f2b708bfe5a4..6a8fbef56f186 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java +++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java @@ -17,18 +17,6 @@ package org.apache.kafka.controller; -import java.util.Collection; -import java.util.Collections; -import java.util.List; -import java.util.Map.Entry; -import java.util.Map; -import java.util.Optional; -import java.util.Random; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.RejectedExecutionException; -import java.util.concurrent.TimeUnit; -import java.util.function.Supplier; -import java.util.stream.Collectors; import org.apache.kafka.clients.admin.AlterConfigOp.OpType; import org.apache.kafka.common.Uuid; import org.apache.kafka.common.config.ConfigDef; @@ -68,15 +56,29 @@ import org.apache.kafka.metadata.BrokerRegistrationReply; import org.apache.kafka.metadata.FeatureMapAndEpoch; import org.apache.kafka.metadata.VersionRange; -import org.apache.kafka.metalog.MetaLogLeader; -import org.apache.kafka.metalog.MetaLogListener; -import org.apache.kafka.metalog.MetaLogManager; -import org.apache.kafka.queue.EventQueue.EarliestDeadlineFunction; import org.apache.kafka.queue.EventQueue; +import org.apache.kafka.queue.EventQueue.EarliestDeadlineFunction; import org.apache.kafka.queue.KafkaEventQueue; +import org.apache.kafka.raft.BatchReader; +import org.apache.kafka.raft.LeaderAndEpoch; +import org.apache.kafka.raft.RaftClient; import org.apache.kafka.timeline.SnapshotRegistry; import org.slf4j.Logger; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Optional; +import java.util.OptionalInt; +import java.util.Random; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; +import java.util.stream.Collectors; + import static java.util.concurrent.TimeUnit.MICROSECONDS; import static java.util.concurrent.TimeUnit.NANOSECONDS; @@ -107,7 +109,7 @@ static public class Builder { private String threadNamePrefix = null; private LogContext logContext = null; private Map configDefs = Collections.emptyMap(); - private MetaLogManager logManager = null; + private RaftClient raftClient = null; private Map supportedFeatures = Collections.emptyMap(); private short defaultReplicationFactor = 3; private int defaultNumPartitions = 1; @@ -140,8 +142,8 @@ public Builder setConfigDefs(Map configDefs) { return this; } - public Builder setLogManager(MetaLogManager logManager) { - this.logManager = logManager; + public Builder setRaftClient(RaftClient logManager) { + this.raftClient = logManager; return this; } @@ -176,7 +178,7 @@ public Builder setMetrics(ControllerMetrics controllerMetrics) { } public QuorumController build() throws Exception { - if (logManager == null) { + if (raftClient == null) { throw new RuntimeException("You must set a metadata log manager."); } if (threadNamePrefix == null) { @@ -193,9 +195,9 @@ public QuorumController build() throws Exception { try { queue = new KafkaEventQueue(time, logContext, threadNamePrefix); return new QuorumController(logContext, nodeId, queue, time, configDefs, - logManager, supportedFeatures, defaultReplicationFactor, - defaultNumPartitions, replicaPlacementPolicy, sessionTimeoutNs, - controllerMetrics); + raftClient, supportedFeatures, defaultReplicationFactor, + defaultNumPartitions, replicaPlacementPolicy, sessionTimeoutNs, + controllerMetrics); } catch (Exception e) { Utils.closeQuietly(queue, "event queue"); throw e; @@ -207,12 +209,12 @@ public QuorumController build() throws Exception { "The active controller appears to be node "; private NotControllerException newNotControllerException() { - int latestController = logManager.leader().nodeId(); - if (latestController < 0) { - return new NotControllerException("No controller appears to be active."); - } else { + OptionalInt latestController = raftClient.leaderAndEpoch().leaderId; + if (latestController.isPresent()) { return new NotControllerException(ACTIVE_CONTROLLER_EXCEPTION_TEXT_PREFIX + - latestController); + latestController.getAsInt()); + } else { + return new NotControllerException("No controller appears to be active."); } } @@ -410,7 +412,7 @@ CompletableFuture future() { public void run() throws Exception { long now = time.nanoseconds(); controllerMetrics.updateEventQueueTime(NANOSECONDS.toMillis(now - eventCreatedTimeNs)); - long controllerEpoch = curClaimEpoch; + int controllerEpoch = curClaimEpoch; if (controllerEpoch == -1) { throw newNotControllerException(); } @@ -443,9 +445,9 @@ public void run() throws Exception { // out asynchronously. final long offset; if (result.isAtomic()) { - offset = logManager.scheduleAtomicWrite(controllerEpoch, result.records()); + offset = raftClient.scheduleAtomicAppend(controllerEpoch, result.records()); } else { - offset = logManager.scheduleWrite(controllerEpoch, result.records()); + offset = raftClient.scheduleAppend(controllerEpoch, result.records()); } op.processBatchEndOffset(offset); writeOffset = offset; @@ -498,9 +500,12 @@ private CompletableFuture appendWriteEvent(String name, return event.future(); } - class QuorumMetaLogListener implements MetaLogListener { - @Override - public void handleCommits(long offset, List messages) { + class QuorumMetaLogListener implements RaftClient.Listener { + + private void handleCommittedBatch(BatchReader.Batch batch) { + long offset = batch.lastOffset(); + List messages = batch.records(); + appendControlEvent("handleCommits[" + offset + "]", () -> { if (curClaimEpoch == -1) { // If the controller is a standby, replay the records that were @@ -508,15 +513,16 @@ public void handleCommits(long offset, List messages) { if (log.isDebugEnabled()) { if (log.isTraceEnabled()) { log.trace("Replaying commits from the active node up to " + - "offset {}: {}.", offset, messages.stream(). - map(m -> m.toString()).collect(Collectors.joining(", "))); + "offset {}: {}.", offset, messages.stream() + .map(ApiMessageAndVersion::toString) + .collect(Collectors.joining(", "))); } else { log.debug("Replaying commits from the active node up to " + "offset {}.", offset); } } - for (ApiMessage message : messages) { - replay(message, offset); + for (ApiMessageAndVersion messageAndVersion : messages) { + replay(messageAndVersion.message(), offset); } } else { // If the controller is active, the records were already replayed, @@ -535,11 +541,23 @@ public void handleCommits(long offset, List messages) { } @Override - public void handleNewLeader(MetaLogLeader newLeader) { - if (newLeader.nodeId() == nodeId) { - final long newEpoch = newLeader.epoch(); + public void handleCommit(BatchReader reader) { + try { + while (reader.hasNext()) { + BatchReader.Batch batch = reader.next(); + handleCommittedBatch(batch); + } + } finally { + reader.close(); + } + } + + @Override + public void handleLeaderChange(LeaderAndEpoch newLeader) { + if (newLeader.isLeader(nodeId)) { + final int newEpoch = newLeader.epoch; appendControlEvent("handleClaim[" + newEpoch + "]", () -> { - long curEpoch = curClaimEpoch; + int curEpoch = curClaimEpoch; if (curEpoch != -1) { throw new RuntimeException("Tried to claim controller epoch " + newEpoch + ", but we never renounced controller epoch " + @@ -551,19 +569,14 @@ public void handleNewLeader(MetaLogLeader newLeader) { writeOffset = lastCommittedOffset; clusterControl.activate(); }); - } - } - - @Override - public void handleRenounce(long oldEpoch) { - appendControlEvent("handleRenounce[" + oldEpoch + "]", () -> { - if (curClaimEpoch == oldEpoch) { + } else if (curClaimEpoch != -1) { + appendControlEvent("handleRenounce[" + curClaimEpoch + "]", () -> { log.info("Renouncing the leadership at oldEpoch {} due to a metadata " + "log event. Reverting to last committed offset {}.", curClaimEpoch, lastCommittedOffset); renounce(); - } - }); + }); + } } @Override @@ -738,7 +751,7 @@ private void replay(ApiMessage message, long offset) { /** * The interface that we use to mutate the Raft log. */ - private final MetaLogManager logManager; + private final RaftClient raftClient; /** * The interface that receives callbacks from the Raft log. These callbacks are @@ -751,7 +764,7 @@ private void replay(ApiMessage message, long offset) { * Otherwise, this is -1. This variable must be modified only from the controller * thread, but it can be read from other threads. */ - private volatile long curClaimEpoch; + private volatile int curClaimEpoch; /** * The last offset we have committed, or -1 if we have not committed any offsets. @@ -768,13 +781,13 @@ private QuorumController(LogContext logContext, KafkaEventQueue queue, Time time, Map configDefs, - MetaLogManager logManager, + RaftClient raftClient, Map supportedFeatures, short defaultReplicationFactor, int defaultNumPartitions, ReplicaPlacementPolicy replicaPlacementPolicy, long sessionTimeoutNs, - ControllerMetrics controllerMetrics) throws Exception { + ControllerMetrics controllerMetrics) { this.log = logContext.logger(QuorumController.class); this.nodeId = nodeId; this.queue = queue; @@ -792,12 +805,12 @@ private QuorumController(LogContext logContext, this.replicationControl = new ReplicationControlManager(snapshotRegistry, logContext, defaultReplicationFactor, defaultNumPartitions, configurationControl, clusterControl); - this.logManager = logManager; + this.raftClient = raftClient; this.metaLogListener = new QuorumMetaLogListener(); - this.curClaimEpoch = -1L; + this.curClaimEpoch = -1; this.lastCommittedOffset = -1L; this.writeOffset = -1L; - this.logManager.register(metaLogListener); + this.raftClient.register(metaLogListener); } @Override diff --git a/raft/src/main/java/org/apache/kafka/raft/metadata/MetadataRecordSerde.java b/metadata/src/main/java/org/apache/kafka/metadata/MetadataRecordSerde.java similarity index 96% rename from raft/src/main/java/org/apache/kafka/raft/metadata/MetadataRecordSerde.java rename to metadata/src/main/java/org/apache/kafka/metadata/MetadataRecordSerde.java index c740497fb317e..72def41a3f3a9 100644 --- a/raft/src/main/java/org/apache/kafka/raft/metadata/MetadataRecordSerde.java +++ b/metadata/src/main/java/org/apache/kafka/metadata/MetadataRecordSerde.java @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.kafka.raft.metadata; +package org.apache.kafka.metadata; import org.apache.kafka.common.errors.SerializationException; import org.apache.kafka.common.metadata.MetadataRecordType; @@ -23,7 +23,6 @@ import org.apache.kafka.common.protocol.Readable; import org.apache.kafka.common.protocol.Writable; import org.apache.kafka.common.utils.ByteUtils; -import org.apache.kafka.metadata.ApiMessageAndVersion; import org.apache.kafka.raft.RecordSerde; public class MetadataRecordSerde implements RecordSerde { diff --git a/metadata/src/main/java/org/apache/kafka/metalog/MetaLogLeader.java b/metadata/src/main/java/org/apache/kafka/metalog/MetaLogLeader.java deleted file mode 100644 index 2bf4f7c718bd5..0000000000000 --- a/metadata/src/main/java/org/apache/kafka/metalog/MetaLogLeader.java +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kafka.metalog; - -import java.util.Objects; - -/** - * The current leader of the MetaLog. - */ -public class MetaLogLeader { - private final int nodeId; - private final long epoch; - - public MetaLogLeader(int nodeId, long epoch) { - this.nodeId = nodeId; - this.epoch = epoch; - } - - public int nodeId() { - return nodeId; - } - - public long epoch() { - return epoch; - } - - @Override - public boolean equals(Object o) { - if (!(o instanceof MetaLogLeader)) return false; - MetaLogLeader other = (MetaLogLeader) o; - return other.nodeId == nodeId && other.epoch == epoch; - } - - @Override - public int hashCode() { - return Objects.hash(nodeId, epoch); - } - - @Override - public String toString() { - return "MetaLogLeader(nodeId=" + nodeId + ", epoch=" + epoch + ")"; - } -} diff --git a/metadata/src/main/java/org/apache/kafka/metalog/MetaLogListener.java b/metadata/src/main/java/org/apache/kafka/metalog/MetaLogListener.java deleted file mode 100644 index 93744202dc90f..0000000000000 --- a/metadata/src/main/java/org/apache/kafka/metalog/MetaLogListener.java +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kafka.metalog; - -import org.apache.kafka.common.protocol.ApiMessage; - -import java.util.List; - -/** - * Listeners receive notifications from the MetaLogManager. - */ -public interface MetaLogListener { - /** - * Called when the MetaLogManager commits some messages. - * - * @param lastOffset The last offset found in all the given messages. - * @param messages The messages. - */ - void handleCommits(long lastOffset, List messages); - - /** - * Called when a new leader is elected. - * - * @param leader The new leader id and epoch. - */ - default void handleNewLeader(MetaLogLeader leader) {} - - /** - * Called when the MetaLogManager has renounced the leadership. - * - * @param epoch The controller epoch that has ended. - */ - default void handleRenounce(long epoch) {} - - /** - * Called when the MetaLogManager has finished shutting down, and wants to tell its - * listener that it is safe to shut down as well. - */ - default void beginShutdown() {} -} diff --git a/metadata/src/main/java/org/apache/kafka/metalog/MetaLogManager.java b/metadata/src/main/java/org/apache/kafka/metalog/MetaLogManager.java deleted file mode 100644 index 9126245ef3855..0000000000000 --- a/metadata/src/main/java/org/apache/kafka/metalog/MetaLogManager.java +++ /dev/null @@ -1,96 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kafka.metalog; - -import org.apache.kafka.metadata.ApiMessageAndVersion; - -import java.util.List; - -/** - * The MetaLogManager handles storing metadata and electing leaders. - */ -public interface MetaLogManager { - - /** - * Start this meta log manager. - * The manager must be ready to accept incoming calls after this function returns. - * It is an error to initialize a MetaLogManager more than once. - */ - void initialize() throws Exception; - - /** - * Register the listener. The manager must be initialized already. - * The listener must be ready to accept incoming calls immediately. - * - * @param listener The listener to register. - */ - void register(MetaLogListener listener) throws Exception; - - /** - * Schedule a write to the log. - * - * The write will be scheduled to happen at some time in the future. There is no - * error return or exception thrown if the write fails. Instead, the listener may - * regard the write as successful if and only if the MetaLogManager reaches the given - * offset before renouncing its leadership. The listener should determine this by - * monitoring the committed offsets. - * - * @param epoch the controller epoch - * @param batch the batch of messages to write - * - * @return the offset of the last message in the batch - * @throws IllegalArgumentException if buffer allocatio failed and the client should backoff - */ - long scheduleWrite(long epoch, List batch); - - /** - * Schedule a atomic write to the log. - * - * The write will be scheduled to happen at some time in the future. All of the messages in batch - * will be appended atomically in one batch. The listener may regard the write as successful - * if and only if the MetaLogManager reaches the given offset before renouncing its leadership. - * The listener should determine this by monitoring the committed offsets. - * - * @param epoch the controller epoch - * @param batch the batch of messages to write - * - * @return the offset of the last message in the batch - * @throws IllegalArgumentException if buffer allocatio failed and the client should backoff - */ - long scheduleAtomicWrite(long epoch, List batch); - - /** - * Renounce the leadership. - * - * @param epoch The epoch. If this does not match the current epoch, this - * call will be ignored. - */ - void renounce(long epoch); - - /** - * Returns the current leader. The active node may change immediately after this - * function is called, of course. - */ - MetaLogLeader leader(); - - /** - * Returns the node id. - */ - int nodeId(); - -} diff --git a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTestEnv.java b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTestEnv.java index 99270422fcf2c..db3acfba2a72f 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTestEnv.java +++ b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTestEnv.java @@ -41,7 +41,7 @@ public QuorumControllerTestEnv(LocalLogManagerTestEnv logEnv, try { for (int i = 0; i < numControllers; i++) { QuorumController.Builder builder = new QuorumController.Builder(i); - builder.setLogManager(logEnv.logManagers().get(i)); + builder.setRaftClient(logEnv.logManagers().get(i)); builderConsumer.accept(builder); this.controllers.add(builder.build()); } diff --git a/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java b/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java index e58848eda4ed2..15551169acf54 100644 --- a/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java +++ b/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java @@ -17,12 +17,16 @@ package org.apache.kafka.metalog; -import org.apache.kafka.common.protocol.ApiMessage; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Time; import org.apache.kafka.metadata.ApiMessageAndVersion; import org.apache.kafka.queue.EventQueue; import org.apache.kafka.queue.KafkaEventQueue; +import org.apache.kafka.raft.BatchReader; +import org.apache.kafka.raft.LeaderAndEpoch; +import org.apache.kafka.raft.OffsetAndEpoch; +import org.apache.kafka.raft.RaftClient; +import org.apache.kafka.snapshot.SnapshotWriter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -33,6 +37,7 @@ import java.util.List; import java.util.Map.Entry; import java.util.Objects; +import java.util.OptionalInt; import java.util.TreeMap; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; @@ -43,15 +48,15 @@ /** * The LocalLogManager is a test implementation that relies on the contents of memory. */ -public final class LocalLogManager implements MetaLogManager, AutoCloseable { +public final class LocalLogManager implements RaftClient, AutoCloseable { interface LocalBatch { int size(); } static class LeaderChangeBatch implements LocalBatch { - private final MetaLogLeader newLeader; + private final LeaderAndEpoch newLeader; - LeaderChangeBatch(MetaLogLeader newLeader) { + LeaderChangeBatch(LeaderAndEpoch newLeader) { this.newLeader = newLeader; } @@ -80,9 +85,11 @@ public String toString() { } static class LocalRecordBatch implements LocalBatch { - private final List records; + private final long leaderEpoch; + private final List records; - LocalRecordBatch(List records) { + LocalRecordBatch(long leaderEpoch, List records) { + this.leaderEpoch = leaderEpoch; this.records = records; } @@ -126,7 +133,7 @@ public static class SharedLogData { /** * The current leader. */ - private MetaLogLeader leader = new MetaLogLeader(-1, -1); + private LeaderAndEpoch leader = new LeaderAndEpoch(OptionalInt.empty(), 0); /** * The start offset of the last batch that was created, or -1 if no batches have @@ -135,7 +142,7 @@ public static class SharedLogData { private long prevOffset = -1; synchronized void registerLogManager(LocalLogManager logManager) { - if (logManagers.put(logManager.nodeId(), logManager) != null) { + if (logManagers.put(logManager.nodeId, logManager) != null) { throw new RuntimeException("Can't have multiple LocalLogManagers " + "with id " + logManager.nodeId()); } @@ -143,21 +150,21 @@ synchronized void registerLogManager(LocalLogManager logManager) { } synchronized void unregisterLogManager(LocalLogManager logManager) { - if (!logManagers.remove(logManager.nodeId(), logManager)) { + if (!logManagers.remove(logManager.nodeId, logManager)) { throw new RuntimeException("Log manager " + logManager.nodeId() + " was not found."); } } synchronized long tryAppend(int nodeId, long epoch, LocalBatch batch) { - if (epoch != leader.epoch()) { + if (epoch != leader.epoch) { log.trace("tryAppend(nodeId={}, epoch={}): the provided epoch does not " + - "match the current leader epoch of {}.", nodeId, epoch, leader.epoch()); + "match the current leader epoch of {}.", nodeId, epoch, leader.epoch); return Long.MAX_VALUE; } - if (nodeId != leader.nodeId()) { + if (!leader.isLeader(nodeId)) { log.trace("tryAppend(nodeId={}, epoch={}): the given node id does not " + - "match the current leader id of {}.", nodeId, epoch, leader.nodeId()); + "match the current leader id of {}.", nodeId, epoch, leader.leaderId); return Long.MAX_VALUE; } log.trace("tryAppend(nodeId={}): appending {}.", nodeId, batch); @@ -181,7 +188,7 @@ synchronized long append(LocalBatch batch) { } synchronized void electLeaderIfNeeded() { - if (leader.nodeId() != -1 || logManagers.isEmpty()) { + if (leader.leaderId.isPresent() || logManagers.isEmpty()) { return; } int nextLeaderIndex = ThreadLocalRandom.current().nextInt(logManagers.size()); @@ -190,7 +197,7 @@ synchronized void electLeaderIfNeeded() { for (int i = 0; i <= nextLeaderIndex; i++) { nextLeaderNode = iter.next(); } - MetaLogLeader newLeader = new MetaLogLeader(nextLeaderNode, leader.epoch() + 1); + LeaderAndEpoch newLeader = new LeaderAndEpoch(OptionalInt.of(nextLeaderNode), leader.epoch + 1); log.info("Elected new leader: {}.", newLeader); append(new LeaderChangeBatch(newLeader)); } @@ -206,9 +213,9 @@ synchronized Entry nextBatch(long offset) { private static class MetaLogListenerData { private long offset = -1; - private final MetaLogListener listener; + private final RaftClient.Listener listener; - MetaLogListenerData(MetaLogListener listener) { + MetaLogListenerData(RaftClient.Listener listener) { this.listener = listener; } } @@ -218,7 +225,7 @@ private static class MetaLogListenerData { /** * The node ID of this local log manager. Each log manager must have a unique ID. */ - private final int nodeId; + public final int nodeId; /** * A reference to the in-memory state that unites all the log managers in use. @@ -254,7 +261,7 @@ private static class MetaLogListenerData { /** * The current leader, as seen by this log manager. */ - private volatile MetaLogLeader leader = new MetaLogLeader(-1, -1); + private volatile LeaderAndEpoch leader = new LeaderAndEpoch(OptionalInt.empty(), 0); public LocalLogManager(LogContext logContext, int nodeId, @@ -291,15 +298,19 @@ private void scheduleLogCheck() { LeaderChangeBatch batch = (LeaderChangeBatch) entry.getValue(); log.trace("Node {}: handling LeaderChange to {}.", nodeId, batch.newLeader); - listenerData.listener.handleNewLeader(batch.newLeader); - if (batch.newLeader.epoch() > leader.epoch()) { + listenerData.listener.handleLeaderChange(batch.newLeader); + if (batch.newLeader.epoch > leader.epoch) { leader = batch.newLeader; } } else if (entry.getValue() instanceof LocalRecordBatch) { LocalRecordBatch batch = (LocalRecordBatch) entry.getValue(); log.trace("Node {}: handling LocalRecordBatch with offset {}.", nodeId, entryOffset); - listenerData.listener.handleCommits(entryOffset, batch.records); + listenerData.listener.handleCommit(BatchReader.singleton(new BatchReader.Batch<>( + entryOffset - batch.records.size() + 1, + Math.toIntExact(batch.leaderEpoch), + batch.records + ))); } numEntriesFound++; listenerData.offset = entryOffset; @@ -317,7 +328,7 @@ public void beginShutdown() { try { if (initialized && !shutdown) { log.debug("Node {}: beginning shutdown.", nodeId); - renounce(leader.epoch()); + resign(leader.epoch); for (MetaLogListenerData listenerData : listeners) { listenerData.listener.beginShutdown(); } @@ -331,14 +342,32 @@ public void beginShutdown() { } @Override - public void close() throws InterruptedException { + public void close() { log.debug("Node {}: closing.", nodeId); beginShutdown(); - eventQueue.close(); + + try { + eventQueue.close(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } + } + + @Override + public CompletableFuture shutdown(int timeoutMs) { + CompletableFuture shutdownFuture = new CompletableFuture<>(); + try { + close(); + shutdownFuture.complete(null); + } catch (Throwable t) { + shutdownFuture.completeExceptionally(t); + } + return shutdownFuture; } @Override - public void initialize() throws Exception { + public void initialize() { eventQueue.append(() -> { log.debug("initialized local log manager for node " + nodeId); initialized = true; @@ -346,7 +375,7 @@ public void initialize() throws Exception { } @Override - public void register(MetaLogListener listener) throws Exception { + public void register(RaftClient.Listener listener) { CompletableFuture future = new CompletableFuture<>(); eventQueue.append(() -> { if (shutdown) { @@ -366,47 +395,54 @@ public void register(MetaLogListener listener) throws Exception { "LocalLogManager was not initialized.")); } }); - future.get(); + try { + future.get(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } catch (ExecutionException e) { + throw new RuntimeException(e); + } } @Override - public long scheduleWrite(long epoch, List batch) { - return scheduleAtomicWrite(epoch, batch); + public Long scheduleAppend(int epoch, List batch) { + return scheduleAtomicAppend(epoch, batch); } @Override - public long scheduleAtomicWrite(long epoch, List batch) { + public Long scheduleAtomicAppend(int epoch, List batch) { return shared.tryAppend( nodeId, - leader.epoch(), - new LocalRecordBatch( - batch - .stream() - .map(ApiMessageAndVersion::message) - .collect(Collectors.toList()) - ) + leader.epoch, + new LocalRecordBatch(leader.epoch, batch) ); } @Override - public void renounce(long epoch) { - MetaLogLeader curLeader = leader; - MetaLogLeader nextLeader = new MetaLogLeader(-1, curLeader.epoch() + 1); - shared.tryAppend(nodeId, curLeader.epoch(), new LeaderChangeBatch(nextLeader)); + public void resign(int epoch) { + LeaderAndEpoch curLeader = leader; + LeaderAndEpoch nextLeader = new LeaderAndEpoch(OptionalInt.empty(), curLeader.epoch + 1); + shared.tryAppend(nodeId, curLeader.epoch, new LeaderChangeBatch(nextLeader)); + } + + @Override + public SnapshotWriter createSnapshot(OffsetAndEpoch snapshotId) { + throw new UnsupportedOperationException(); } @Override - public MetaLogLeader leader() { + public LeaderAndEpoch leaderAndEpoch() { return leader; } @Override - public int nodeId() { - return nodeId; + public OptionalInt nodeId() { + return OptionalInt.of(nodeId); } - public List listeners() { - final CompletableFuture> future = new CompletableFuture<>(); + public List> listeners() { + final CompletableFuture>> future = new CompletableFuture<>(); eventQueue.append(() -> { future.complete(listeners.stream().map(l -> l.listener).collect(Collectors.toList())); }); diff --git a/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManagerTest.java b/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManagerTest.java index ac578fb635807..b10c747576e5e 100644 --- a/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManagerTest.java +++ b/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManagerTest.java @@ -19,14 +19,14 @@ import org.apache.kafka.common.metadata.RegisterBrokerRecord; import org.apache.kafka.metadata.ApiMessageAndVersion; +import org.apache.kafka.raft.LeaderAndEpoch; import org.apache.kafka.test.TestUtils; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.util.Arrays; import java.util.List; +import java.util.OptionalInt; import java.util.stream.Collectors; import static org.apache.kafka.metalog.MockMetaLogManagerListener.COMMIT; @@ -37,7 +37,6 @@ @Timeout(value = 40) public class LocalLogManagerTest { - private static final Logger log = LoggerFactory.getLogger(LocalLogManagerTest.class); /** * Test creating a LocalLogManager and closing it. @@ -58,7 +57,7 @@ public void testCreateAndClose() throws Exception { public void testClaimsLeadership() throws Exception { try (LocalLogManagerTestEnv env = LocalLogManagerTestEnv.createWithMockListeners(1)) { - assertEquals(new MetaLogLeader(0, 0), env.waitForLeader()); + assertEquals(new LeaderAndEpoch(OptionalInt.of(0), 1), env.waitForLeader()); env.close(); assertEquals(null, env.firstError.get()); } @@ -71,20 +70,24 @@ public void testClaimsLeadership() throws Exception { public void testPassLeadership() throws Exception { try (LocalLogManagerTestEnv env = LocalLogManagerTestEnv.createWithMockListeners(3)) { - MetaLogLeader first = env.waitForLeader(); - MetaLogLeader cur = first; + LeaderAndEpoch first = env.waitForLeader(); + LeaderAndEpoch cur = first; do { - env.logManagers().get(cur.nodeId()).renounce(cur.epoch()); - MetaLogLeader next = env.waitForLeader(); - while (next.epoch() == cur.epoch()) { + int currentLeaderId = cur.leaderId.orElseThrow(() -> + new AssertionError("Current leader is undefined") + ); + env.logManagers().get(currentLeaderId).resign(cur.epoch); + + LeaderAndEpoch next = env.waitForLeader(); + while (next.epoch == cur.epoch) { Thread.sleep(1); next = env.waitForLeader(); } - long expectedNextEpoch = cur.epoch() + 2; - assertEquals(expectedNextEpoch, next.epoch(), "Expected next epoch to be " + expectedNextEpoch + + long expectedNextEpoch = cur.epoch + 2; + assertEquals(expectedNextEpoch, next.epoch, "Expected next epoch to be " + expectedNextEpoch + ", but found " + next); cur = next; - } while (cur.nodeId() == first.nodeId()); + } while (cur.leaderId.equals(first.leaderId)); env.close(); assertEquals(null, env.firstError.get()); } @@ -121,14 +124,18 @@ private static void waitForLastCommittedOffset(long targetOffset, public void testCommits() throws Exception { try (LocalLogManagerTestEnv env = LocalLogManagerTestEnv.createWithMockListeners(3)) { - MetaLogLeader leaderInfo = env.waitForLeader(); - LocalLogManager activeLogManager = env.logManagers().get(leaderInfo.nodeId()); - long epoch = activeLogManager.leader().epoch(); + LeaderAndEpoch leaderInfo = env.waitForLeader(); + int leaderId = leaderInfo.leaderId.orElseThrow(() -> + new AssertionError("Current leader is undefined") + ); + + LocalLogManager activeLogManager = env.logManagers().get(leaderId); + int epoch = activeLogManager.leaderAndEpoch().epoch; List messages = Arrays.asList( new ApiMessageAndVersion(new RegisterBrokerRecord().setBrokerId(0), (short) 0), new ApiMessageAndVersion(new RegisterBrokerRecord().setBrokerId(1), (short) 0), new ApiMessageAndVersion(new RegisterBrokerRecord().setBrokerId(2), (short) 0)); - assertEquals(3, activeLogManager.scheduleWrite(epoch, messages)); + assertEquals(3, activeLogManager.scheduleAppend(epoch, messages)); for (LocalLogManager logManager : env.logManagers()) { waitForLastCommittedOffset(3, logManager); } diff --git a/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManagerTestEnv.java b/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManagerTestEnv.java index 52aeea052bdde..94e5116c118a9 100644 --- a/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManagerTestEnv.java +++ b/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManagerTestEnv.java @@ -20,6 +20,7 @@ import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.metalog.LocalLogManager.SharedLogData; +import org.apache.kafka.raft.LeaderAndEpoch; import org.apache.kafka.test.TestUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -59,7 +60,7 @@ public static LocalLogManagerTestEnv createWithMockListeners(int numManagers) th LocalLogManagerTestEnv testEnv = new LocalLogManagerTestEnv(numManagers); try { for (LocalLogManager logManager : testEnv.logManagers) { - logManager.register(new MockMetaLogManagerListener()); + logManager.register(new MockMetaLogManagerListener(logManager.nodeId)); } } catch (Exception e) { testEnv.close(); @@ -100,16 +101,16 @@ File dir() { return dir; } - MetaLogLeader waitForLeader() throws InterruptedException { - AtomicReference value = new AtomicReference<>(null); + LeaderAndEpoch waitForLeader() throws InterruptedException { + AtomicReference value = new AtomicReference<>(null); TestUtils.retryOnExceptionWithTimeout(3, 20000, () -> { - MetaLogLeader result = null; + LeaderAndEpoch result = null; for (LocalLogManager logManager : logManagers) { - MetaLogLeader leader = logManager.leader(); - if (leader.nodeId() == logManager.nodeId()) { + LeaderAndEpoch leader = logManager.leaderAndEpoch(); + if (leader.isLeader(logManager.nodeId)) { if (result != null) { - throw new RuntimeException("node " + leader.nodeId() + - " thinks it's the leader, but so does " + result.nodeId()); + throw new RuntimeException("node " + logManager.nodeId + + " thinks it's the leader, but so does " + result.leaderId); } result = leader; } diff --git a/raft/src/test/java/org/apache/kafka/raft/metadata/MetadataRecordSerdeTest.java b/metadata/src/test/java/org/apache/kafka/metalog/MetadataRecordSerdeTest.java similarity index 97% rename from raft/src/test/java/org/apache/kafka/raft/metadata/MetadataRecordSerdeTest.java rename to metadata/src/test/java/org/apache/kafka/metalog/MetadataRecordSerdeTest.java index 2071814ed9532..158807b646e43 100644 --- a/raft/src/test/java/org/apache/kafka/raft/metadata/MetadataRecordSerdeTest.java +++ b/metadata/src/test/java/org/apache/kafka/metalog/MetadataRecordSerdeTest.java @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.kafka.raft.metadata; +package org.apache.kafka.metalog; import org.apache.kafka.common.Uuid; import org.apache.kafka.common.errors.SerializationException; @@ -23,6 +23,7 @@ import org.apache.kafka.common.protocol.ObjectSerializationCache; import org.apache.kafka.common.utils.ByteUtils; import org.apache.kafka.metadata.ApiMessageAndVersion; +import org.apache.kafka.metadata.MetadataRecordSerde; import org.junit.jupiter.api.Test; import java.nio.ByteBuffer; diff --git a/metadata/src/test/java/org/apache/kafka/metalog/MockMetaLogManagerListener.java b/metadata/src/test/java/org/apache/kafka/metalog/MockMetaLogManagerListener.java index fe61ec070285b..5e61417bf0be9 100644 --- a/metadata/src/test/java/org/apache/kafka/metalog/MockMetaLogManagerListener.java +++ b/metadata/src/test/java/org/apache/kafka/metalog/MockMetaLogManagerListener.java @@ -18,46 +18,65 @@ package org.apache.kafka.metalog; import org.apache.kafka.common.protocol.ApiMessage; +import org.apache.kafka.metadata.ApiMessageAndVersion; +import org.apache.kafka.raft.BatchReader; +import org.apache.kafka.raft.LeaderAndEpoch; +import org.apache.kafka.raft.RaftClient; import java.util.ArrayList; import java.util.List; +import java.util.OptionalInt; -public class MockMetaLogManagerListener implements MetaLogListener { +public class MockMetaLogManagerListener implements RaftClient.Listener { public static final String COMMIT = "COMMIT"; public static final String LAST_COMMITTED_OFFSET = "LAST_COMMITTED_OFFSET"; public static final String NEW_LEADER = "NEW_LEADER"; public static final String RENOUNCE = "RENOUNCE"; public static final String SHUTDOWN = "SHUTDOWN"; + private final int nodeId; private final List serializedEvents = new ArrayList<>(); + private LeaderAndEpoch leaderAndEpoch = new LeaderAndEpoch(OptionalInt.empty(), 0); - @Override - public synchronized void handleCommits(long lastCommittedOffset, List messages) { - for (ApiMessage message : messages) { - StringBuilder bld = new StringBuilder(); - bld.append(COMMIT).append(" ").append(message.toString()); - serializedEvents.add(bld.toString()); - } - StringBuilder bld = new StringBuilder(); - bld.append(LAST_COMMITTED_OFFSET).append(" ").append(lastCommittedOffset); - serializedEvents.add(bld.toString()); + public MockMetaLogManagerListener(int nodeId) { + this.nodeId = nodeId; } @Override - public void handleNewLeader(MetaLogLeader leader) { - StringBuilder bld = new StringBuilder(); - bld.append(NEW_LEADER).append(" "). - append(leader.nodeId()).append(" ").append(leader.epoch()); - synchronized (this) { - serializedEvents.add(bld.toString()); + public synchronized void handleCommit(BatchReader reader) { + try { + while (reader.hasNext()) { + BatchReader.Batch batch = reader.next(); + long lastCommittedOffset = batch.lastOffset(); + + for (ApiMessageAndVersion messageAndVersion : batch.records()) { + ApiMessage message = messageAndVersion.message(); + StringBuilder bld = new StringBuilder(); + bld.append(COMMIT).append(" ").append(message.toString()); + serializedEvents.add(bld.toString()); + } + StringBuilder bld = new StringBuilder(); + bld.append(LAST_COMMITTED_OFFSET).append(" ").append(lastCommittedOffset); + serializedEvents.add(bld.toString()); + } + } finally { + reader.close(); } } @Override - public void handleRenounce(long epoch) { - StringBuilder bld = new StringBuilder(); - bld.append(RENOUNCE).append(" ").append(epoch); - synchronized (this) { + public synchronized void handleLeaderChange(LeaderAndEpoch newLeaderAndEpoch) { + LeaderAndEpoch oldLeaderAndEpoch = this.leaderAndEpoch; + this.leaderAndEpoch = newLeaderAndEpoch; + + if (newLeaderAndEpoch.isLeader(nodeId)) { + StringBuilder bld = new StringBuilder(); + bld.append(NEW_LEADER).append(" "). + append(nodeId).append(" ").append(newLeaderAndEpoch.epoch); + serializedEvents.add(bld.toString()); + } else if (oldLeaderAndEpoch.isLeader(nodeId)) { + StringBuilder bld = new StringBuilder(); + bld.append(RENOUNCE).append(" ").append(newLeaderAndEpoch.epoch); serializedEvents.add(bld.toString()); } } diff --git a/raft/src/main/java/org/apache/kafka/raft/BatchReader.java b/raft/src/main/java/org/apache/kafka/raft/BatchReader.java index e5f9e38612a0d..a302d5ae62130 100644 --- a/raft/src/main/java/org/apache/kafka/raft/BatchReader.java +++ b/raft/src/main/java/org/apache/kafka/raft/BatchReader.java @@ -16,6 +16,9 @@ */ package org.apache.kafka.raft; +import org.apache.kafka.raft.internals.MemoryBatchReader; + +import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.Objects; @@ -112,4 +115,9 @@ public int hashCode() { } } + static BatchReader singleton(Batch batch) { + return new MemoryBatchReader<>(Collections.singletonList(batch), reader -> { + }); + } + } diff --git a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java index a3dbbdd3a66fd..95a76b6cd49e9 100644 --- a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java +++ b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java @@ -40,8 +40,6 @@ import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.ApiMessage; import org.apache.kafka.common.protocol.Errors; -import org.apache.kafka.common.requests.FetchResponse; -import org.apache.kafka.common.utils.BufferSupplier; import org.apache.kafka.common.record.CompressionType; import org.apache.kafka.common.record.MemoryRecords; import org.apache.kafka.common.record.Records; @@ -53,10 +51,12 @@ import org.apache.kafka.common.requests.DescribeQuorumResponse; import org.apache.kafka.common.requests.EndQuorumEpochRequest; import org.apache.kafka.common.requests.EndQuorumEpochResponse; +import org.apache.kafka.common.requests.FetchResponse; import org.apache.kafka.common.requests.FetchSnapshotRequest; import org.apache.kafka.common.requests.FetchSnapshotResponse; import org.apache.kafka.common.requests.VoteRequest; import org.apache.kafka.common.requests.VoteResponse; +import org.apache.kafka.common.utils.BufferSupplier; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Timer; @@ -340,17 +340,15 @@ private void maybeFireHandleCommit(long baseOffset, int epoch, List records) } } - private void maybeFireHandleClaim(LeaderState state) { - int leaderEpoch = state.epoch(); - long epochStartOffset = state.epochStartOffset(); + private void maybeFireLeaderChange(LeaderState state) { for (ListenerContext listenerContext : listenerContexts) { - listenerContext.maybeFireHandleClaim(leaderEpoch, epochStartOffset); + listenerContext.maybeFireLeaderChange(quorum.leaderAndEpoch(), state.epochStartOffset()); } } - private void fireHandleResign(int epoch) { + private void maybeFireLeaderChange() { for (ListenerContext listenerContext : listenerContexts) { - listenerContext.fireHandleResign(epoch); + listenerContext.maybeFireLeaderChange(quorum.leaderAndEpoch()); } } @@ -387,6 +385,11 @@ public LeaderAndEpoch leaderAndEpoch() { return quorum.leaderAndEpoch(); } + @Override + public OptionalInt nodeId() { + return quorum.localId(); + } + private OffsetAndEpoch endOffset() { return new OffsetAndEpoch(log.endOffset().offset, log.lastFetchedEpoch()); } @@ -395,9 +398,7 @@ private void resetConnections() { requestManager.resetAll(); } - private void onBecomeLeader(long currentTimeMs) { - LeaderState state = quorum.leaderStateOrThrow(); - + private void onBecomeLeader(LeaderState state, long currentTimeMs) { log.initializeLeaderEpoch(quorum.epoch()); // The high watermark can only be advanced once we have written a record @@ -461,7 +462,10 @@ private boolean maybeTransitionToLeader(CandidateState state, long currentTimeMs if (state.isVoteGranted()) { long endOffset = log.endOffset().offset; quorum.transitionToLeader(endOffset); - onBecomeLeader(currentTimeMs); + + LeaderState leaderState = quorum.leaderStateOrThrow(); + maybeFireLeaderChange(leaderState); + onBecomeLeader(leaderState, currentTimeMs); return true; } else { return false; @@ -477,10 +481,6 @@ private void onBecomeCandidate(long currentTimeMs) throws IOException { } private void maybeResignLeadership() { - if (quorum.isLeader()) { - fireHandleResign(quorum.epoch()); - } - if (accumulator != null) { accumulator.close(); accumulator = null; @@ -490,24 +490,28 @@ private void maybeResignLeadership() { private void transitionToCandidate(long currentTimeMs) throws IOException { maybeResignLeadership(); quorum.transitionToCandidate(); + maybeFireLeaderChange(); onBecomeCandidate(currentTimeMs); } private void transitionToUnattached(int epoch) throws IOException { maybeResignLeadership(); quorum.transitionToUnattached(epoch); + maybeFireLeaderChange(); resetConnections(); } private void transitionToResigned(List preferredSuccessors) { fetchPurgatory.completeAllExceptionally(Errors.BROKER_NOT_AVAILABLE.exception("The broker is shutting down")); quorum.transitionToResigned(preferredSuccessors); + maybeFireLeaderChange(); resetConnections(); } private void transitionToVoted(int candidateId, int epoch) throws IOException { maybeResignLeadership(); quorum.transitionToVoted(epoch, candidateId); + maybeFireLeaderChange(); resetConnections(); } @@ -533,6 +537,7 @@ private void transitionToFollower( ) throws IOException { maybeResignLeadership(); quorum.transitionToFollower(epoch, leaderId); + maybeFireLeaderChange(); onBecomeFollower(currentTimeMs); } @@ -1926,7 +1931,7 @@ private long pollResigned(long currentTimeMs) throws IOException { private long pollLeader(long currentTimeMs) { LeaderState state = quorum.leaderStateOrThrow(); - maybeFireHandleClaim(state); + maybeFireLeaderChange(state); GracefulShutdown shutdown = this.shutdown.get(); if (shutdown != null) { @@ -2256,6 +2261,11 @@ public CompletableFuture shutdown(int timeoutMs) { return shutdownComplete; } + @Override + public void resign(int epoch) { + throw new UnsupportedOperationException(); + } + @Override public SnapshotWriter createSnapshot(OffsetAndEpoch snapshotId) throws IOException { return new SnapshotWriter<>( @@ -2324,7 +2334,7 @@ public void complete() { private final class ListenerContext implements CloseListener> { private final RaftClient.Listener listener; // This field is used only by the Raft IO thread - private int claimedEpoch = 0; + private LeaderAndEpoch lastFiredLeaderChange = new LeaderAndEpoch(OptionalInt.empty(), 0); // These fields are visible to both the Raft IO thread and the listener // and are protected through synchronization on this `ListenerContext` instance @@ -2408,19 +2418,32 @@ private void fireHandleCommit(BatchReader reader) { listener.handleCommit(reader); } - void maybeFireHandleClaim(int epoch, long epochStartOffset) { - // We can fire `handleClaim` as soon as the listener has caught - // up to the start of the leader epoch. This guarantees that the - // state machine has seen the full committed state before it becomes - // leader and begins writing to the log. - if (epoch > claimedEpoch && nextOffset() >= epochStartOffset) { - claimedEpoch = epoch; - listener.handleClaim(epoch); + void maybeFireLeaderChange(LeaderAndEpoch leaderAndEpoch) { + if (shouldFireLeaderChange(leaderAndEpoch)) { + lastFiredLeaderChange = leaderAndEpoch; + listener.handleLeaderChange(leaderAndEpoch); } } - void fireHandleResign(int epoch) { - listener.handleResign(epoch); + private boolean shouldFireLeaderChange(LeaderAndEpoch leaderAndEpoch) { + if (leaderAndEpoch.equals(lastFiredLeaderChange)) { + return false; + } else if (leaderAndEpoch.epoch > lastFiredLeaderChange.epoch) { + return true; + } else { + return leaderAndEpoch.leaderId.isPresent() && !lastFiredLeaderChange.leaderId.isPresent(); + } + } + + void maybeFireLeaderChange(LeaderAndEpoch leaderAndEpoch, long epochStartOffset) { + // If this node is becoming the leader, then we can fire `handleClaim` as soon + // as the listener has caught up to the start of the leader epoch. This guarantees + // that the state machine has seen the full committed state before it becomes + // leader and begins writing to the log. + if (shouldFireLeaderChange(leaderAndEpoch) && nextOffset() >= epochStartOffset) { + lastFiredLeaderChange = leaderAndEpoch; + listener.handleLeaderChange(leaderAndEpoch); + } } public synchronized void onClose(BatchReader reader) { diff --git a/raft/src/main/java/org/apache/kafka/raft/LeaderAndEpoch.java b/raft/src/main/java/org/apache/kafka/raft/LeaderAndEpoch.java index 47bd404ae2798..0f803a890ebed 100644 --- a/raft/src/main/java/org/apache/kafka/raft/LeaderAndEpoch.java +++ b/raft/src/main/java/org/apache/kafka/raft/LeaderAndEpoch.java @@ -28,6 +28,10 @@ public LeaderAndEpoch(OptionalInt leaderId, int epoch) { this.epoch = epoch; } + public boolean isLeader(int nodeId) { + return leaderId.isPresent() && leaderId.getAsInt() == nodeId; + } + @Override public boolean equals(Object o) { if (this == o) return true; @@ -41,4 +45,12 @@ public boolean equals(Object o) { public int hashCode() { return Objects.hash(leaderId, epoch); } + + @Override + public String toString() { + return "LeaderAndEpoch(" + + "leaderId=" + leaderId + + ", epoch=" + epoch + + ')'; + } } diff --git a/raft/src/main/java/org/apache/kafka/raft/QuorumState.java b/raft/src/main/java/org/apache/kafka/raft/QuorumState.java index 86f8c187d8f35..8187a081dc5e2 100644 --- a/raft/src/main/java/org/apache/kafka/raft/QuorumState.java +++ b/raft/src/main/java/org/apache/kafka/raft/QuorumState.java @@ -222,6 +222,10 @@ public int localIdOrThrow() { return localId.orElseThrow(() -> new IllegalStateException("Required local id is not present")); } + public OptionalInt localId() { + return localId; + } + public int epoch() { return state.epoch(); } diff --git a/raft/src/main/java/org/apache/kafka/raft/RaftClient.java b/raft/src/main/java/org/apache/kafka/raft/RaftClient.java index 74488b450ede1..9d847c7a535e2 100644 --- a/raft/src/main/java/org/apache/kafka/raft/RaftClient.java +++ b/raft/src/main/java/org/apache/kafka/raft/RaftClient.java @@ -21,6 +21,7 @@ import java.io.Closeable; import java.io.IOException; import java.util.List; +import java.util.OptionalInt; import java.util.concurrent.CompletableFuture; public interface RaftClient extends Closeable { @@ -45,24 +46,14 @@ interface Listener { void handleCommit(BatchReader reader); /** - * Invoked after this node has become a leader. This is only called after - * all commits up to the start of the leader's epoch have been sent to - * {@link #handleCommit(BatchReader)}. + * Called on any change to leadership. This includes both when a leader is elected and + * when a leader steps down or fails. * - * After becoming a leader, the client is eligible to write to the log - * using {@link #scheduleAppend(int, List)} or {@link #scheduleAtomicAppend(int, List)}. - * - * @param epoch the claimed leader epoch + * @param leader the current leader and epoch */ - default void handleClaim(int epoch) {} + default void handleLeaderChange(LeaderAndEpoch leader) {} - /** - * Invoked after a leader has stepped down. This callback may or may not - * fire before the next leader has been elected. - * - * @param epoch the epoch that the leader is resigning from - */ - default void handleResign(int epoch) {} + default void beginShutdown() {} } /** @@ -86,6 +77,14 @@ default void handleResign(int epoch) {} */ LeaderAndEpoch leaderAndEpoch(); + /** + * Get local nodeId if one is defined. This may be absent when the client is used + * as an anonymous observer, as in the case of the metadata shell. + * + * @return optional node id + */ + OptionalInt nodeId(); + /** * Append a list of records to the log. The write will be scheduled for some time * in the future. There is no guarantee that appended records will be written to @@ -147,6 +146,16 @@ default void handleResign(int epoch) {} */ CompletableFuture shutdown(int timeoutMs); + /** + * Resign the leadership. The leader will give up its leadership in the current epoch, + * and a new election will be held. Note that nothing prevents this leader from getting + * reelected. + * + * @param epoch the epoch to resign from. If this does not match the current epoch, this + * call will be ignored. + */ + void resign(int epoch); + /** * Create a writable snapshot file for a given offset and epoch. * diff --git a/raft/src/main/java/org/apache/kafka/raft/ReplicatedCounter.java b/raft/src/main/java/org/apache/kafka/raft/ReplicatedCounter.java index 3db4d736a53f7..fe498d1060adc 100644 --- a/raft/src/main/java/org/apache/kafka/raft/ReplicatedCounter.java +++ b/raft/src/main/java/org/apache/kafka/raft/ReplicatedCounter.java @@ -88,18 +88,17 @@ public synchronized void handleCommit(BatchReader reader) { } @Override - public synchronized void handleClaim(int epoch) { - log.debug("Counter uncommitted value initialized to {} after claiming leadership in epoch {}", - committed, epoch); - this.uncommitted = committed; - this.claimedEpoch = Optional.of(epoch); - } - - @Override - public synchronized void handleResign(int epoch) { - log.debug("Counter uncommitted value reset after resigning leadership"); - this.uncommitted = -1; - this.claimedEpoch = Optional.empty(); + public synchronized void handleLeaderChange(LeaderAndEpoch newLeader) { + if (newLeader.isLeader(nodeId)) { + log.debug("Counter uncommitted value initialized to {} after claiming leadership in epoch {}", + committed, newLeader); + this.uncommitted = committed; + this.claimedEpoch = Optional.of(newLeader.epoch); + } else { + log.debug("Counter uncommitted value reset after resigning leadership"); + this.uncommitted = -1; + this.claimedEpoch = Optional.empty(); + } } } diff --git a/raft/src/main/java/org/apache/kafka/raft/metadata/MetaLogRaftShim.java b/raft/src/main/java/org/apache/kafka/raft/metadata/MetaLogRaftShim.java deleted file mode 100644 index 1ca63f1b9c3cd..0000000000000 --- a/raft/src/main/java/org/apache/kafka/raft/metadata/MetaLogRaftShim.java +++ /dev/null @@ -1,145 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.raft.metadata; - -import org.apache.kafka.common.protocol.ApiMessage; -import org.apache.kafka.metadata.ApiMessageAndVersion; -import org.apache.kafka.metalog.MetaLogLeader; -import org.apache.kafka.metalog.MetaLogListener; -import org.apache.kafka.metalog.MetaLogManager; -import org.apache.kafka.raft.BatchReader; -import org.apache.kafka.raft.LeaderAndEpoch; -import org.apache.kafka.raft.RaftClient; - -import java.util.List; -import java.util.stream.Collectors; - -/** - * For now, we rely on a shim to translate from `RaftClient` to `MetaLogManager`. - * Once we check in to trunk, we can drop `RaftClient` and implement `MetaLogManager` - * directly. - */ -public class MetaLogRaftShim implements MetaLogManager { - private final RaftClient client; - private final int nodeId; - - public MetaLogRaftShim(RaftClient client, int nodeId) { - this.client = client; - this.nodeId = nodeId; - } - - @Override - public void initialize() { - // NO-OP - The RaftClient is initialized externally - } - - @Override - public void register(MetaLogListener listener) { - client.register(new ListenerShim(listener)); - } - - @Override - public long scheduleAtomicWrite(long epoch, List batch) { - return write(epoch, batch, true); - } - - @Override - public long scheduleWrite(long epoch, List batch) { - return write(epoch, batch, false); - } - - private long write(long epoch, List batch, boolean isAtomic) { - final Long result; - if (isAtomic) { - result = client.scheduleAtomicAppend((int) epoch, batch); - } else { - result = client.scheduleAppend((int) epoch, batch); - } - - if (result == null) { - throw new IllegalArgumentException( - String.format( - "Unable to alloate a buffer for the schedule write operation: epoch %s, batch %s)", - epoch, - batch - ) - ); - } else { - return result; - } - } - - @Override - public void renounce(long epoch) { - throw new UnsupportedOperationException(); - } - - @Override - public MetaLogLeader leader() { - LeaderAndEpoch leaderAndEpoch = client.leaderAndEpoch(); - return new MetaLogLeader(leaderAndEpoch.leaderId.orElse(-1), leaderAndEpoch.epoch); - } - - @Override - public int nodeId() { - return nodeId; - } - - private class ListenerShim implements RaftClient.Listener { - private final MetaLogListener listener; - - private ListenerShim(MetaLogListener listener) { - this.listener = listener; - } - - @Override - public void handleCommit(BatchReader reader) { - try { - // TODO: The `BatchReader` might need to read from disk if this is - // not a leader. We want to move this IO to the state machine so that - // it does not block Raft replication - while (reader.hasNext()) { - BatchReader.Batch batch = reader.next(); - List records = batch.records().stream() - .map(ApiMessageAndVersion::message) - .collect(Collectors.toList()); - listener.handleCommits(batch.lastOffset(), records); - } - } finally { - reader.close(); - } - } - - @Override - public void handleClaim(int epoch) { - listener.handleNewLeader(new MetaLogLeader(nodeId, epoch)); - } - - @Override - public void handleResign(int epoch) { - listener.handleRenounce(epoch); - } - - @Override - public String toString() { - return "ListenerShim(" + - "listener=" + listener + - ')'; - } - } - -} diff --git a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java index 61e3e86a4d4cf..af5f4c4db5bb9 100644 --- a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java @@ -2159,7 +2159,7 @@ public void testLateRegisteredListenerCatchesUp() throws Exception { assertEquals(OptionalInt.of(epoch), context.listener.currentClaimedEpoch()); // Register a second listener and allow it to catch up to the high watermark - RaftClientTestContext.MockListener secondListener = new RaftClientTestContext.MockListener(); + RaftClientTestContext.MockListener secondListener = new RaftClientTestContext.MockListener(OptionalInt.of(localId)); context.client.register(secondListener); context.client.poll(); assertEquals(OptionalLong.of(8L), secondListener.lastCommitOffset()); @@ -2251,7 +2251,7 @@ public void testHandleCommitCallbackFiresInVotedState() throws Exception { assertEquals(OptionalLong.of(10L), context.client.highWatermark()); // Register another listener and verify that it catches up while we remain 'voted' - RaftClientTestContext.MockListener secondListener = new RaftClientTestContext.MockListener(); + RaftClientTestContext.MockListener secondListener = new RaftClientTestContext.MockListener(OptionalInt.of(localId)); context.client.register(secondListener); context.client.poll(); context.assertVotedCandidate(candidateEpoch, otherNodeId); @@ -2298,7 +2298,7 @@ public void testHandleCommitCallbackFiresInCandidateState() throws Exception { context.assertVotedCandidate(candidateEpoch, localId); // Register another listener and verify that it catches up - RaftClientTestContext.MockListener secondListener = new RaftClientTestContext.MockListener(); + RaftClientTestContext.MockListener secondListener = new RaftClientTestContext.MockListener(OptionalInt.of(localId)); context.client.register(secondListener); context.client.poll(); context.assertVotedCandidate(candidateEpoch, localId); diff --git a/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java b/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java index 39e9c34de54d8..7450fee14ae7f 100644 --- a/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java +++ b/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java @@ -204,7 +204,7 @@ public RaftClientTestContext build() throws IOException { Metrics metrics = new Metrics(time); MockNetworkChannel channel = new MockNetworkChannel(voters); LogContext logContext = new LogContext(); - MockListener listener = new MockListener(); + MockListener listener = new MockListener(localId); Map voterAddressMap = voters.stream() .collect(Collectors.toMap(id -> id, RaftClientTestContext::mockAddress)); RaftConfig raftConfig = new RaftConfig(voterAddressMap, requestTimeoutMs, RETRY_BACKOFF_MS, electionTimeoutMs, @@ -1051,6 +1051,11 @@ static class MockListener implements RaftClient.Listener { private final List> commits = new ArrayList<>(); private final Map claimedEpochStartOffsets = new HashMap<>(); private OptionalInt currentClaimedEpoch = OptionalInt.empty(); + private final OptionalInt localId; + + MockListener(OptionalInt localId) { + this.localId = localId; + } int numCommittedBatches() { return commits.size(); @@ -1097,19 +1102,18 @@ List commitWithLastOffset(long lastOffset) { } @Override - public void handleClaim(int epoch) { + public void handleLeaderChange(LeaderAndEpoch leader) { // We record the next expected offset as the claimed epoch's start // offset. This is useful to verify that the `handleClaim` callback // was not received early. - long claimedEpochStartOffset = lastCommitOffset().isPresent() ? - lastCommitOffset().getAsLong() + 1 : 0L; - this.currentClaimedEpoch = OptionalInt.of(epoch); - this.claimedEpochStartOffsets.put(epoch, claimedEpochStartOffset); - } - - @Override - public void handleResign(int epoch) { - this.currentClaimedEpoch = OptionalInt.empty(); + if (localId.isPresent() && leader.isLeader(localId.getAsInt())) { + long claimedEpochStartOffset = lastCommitOffset().isPresent() ? + lastCommitOffset().getAsLong() + 1 : 0L; + this.currentClaimedEpoch = OptionalInt.of(leader.epoch); + this.claimedEpochStartOffsets.put(leader.epoch, claimedEpochStartOffset); + } else { + this.currentClaimedEpoch = OptionalInt.empty(); + } } @Override diff --git a/shell/src/main/java/org/apache/kafka/shell/MetadataNodeManager.java b/shell/src/main/java/org/apache/kafka/shell/MetadataNodeManager.java index 739e0278d5cda..781d0e5d8c0ac 100644 --- a/shell/src/main/java/org/apache/kafka/shell/MetadataNodeManager.java +++ b/shell/src/main/java/org/apache/kafka/shell/MetadataNodeManager.java @@ -37,18 +37,16 @@ import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Time; import org.apache.kafka.metadata.ApiMessageAndVersion; -import org.apache.kafka.metalog.MetaLogLeader; -import org.apache.kafka.metalog.MetaLogListener; import org.apache.kafka.queue.EventQueue; import org.apache.kafka.queue.KafkaEventQueue; import org.apache.kafka.raft.BatchReader; +import org.apache.kafka.raft.LeaderAndEpoch; import org.apache.kafka.raft.RaftClient; import org.apache.kafka.shell.MetadataNode.DirectoryNode; import org.apache.kafka.shell.MetadataNode.FileNode; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.function.Consumer; @@ -77,13 +75,15 @@ public void setWorkingDirectory(String workingDirectory) { } } - class LogListener implements MetaLogListener, RaftClient.Listener { + class LogListener implements RaftClient.Listener { @Override public void handleCommit(BatchReader reader) { try { - // TODO: handle lastOffset while (reader.hasNext()) { BatchReader.Batch batch = reader.next(); + log.debug("handleCommits " + batch.records() + " at offset " + batch.lastOffset()); + DirectoryNode dir = data.root.mkdirs("metadataQuorum"); + dir.create("offset").setContents(String.valueOf(batch.lastOffset())); for (ApiMessageAndVersion messageAndVersion : batch.records()) { handleMessage(messageAndVersion.message()); } @@ -94,19 +94,7 @@ public void handleCommit(BatchReader reader) { } @Override - public void handleCommits(long lastOffset, List messages) { - appendEvent("handleCommits", () -> { - log.debug("handleCommits " + messages + " at offset " + lastOffset); - DirectoryNode dir = data.root.mkdirs("metadataQuorum"); - dir.create("offset").setContents(String.valueOf(lastOffset)); - for (ApiMessage message : messages) { - handleMessage(message); - } - }, null); - } - - @Override - public void handleNewLeader(MetaLogLeader leader) { + public void handleLeaderChange(LeaderAndEpoch leader) { appendEvent("handleNewLeader", () -> { log.debug("handleNewLeader " + leader); DirectoryNode dir = data.root.mkdirs("metadataQuorum"); @@ -114,18 +102,6 @@ public void handleNewLeader(MetaLogLeader leader) { }, null); } - @Override - public void handleClaim(int epoch) { - // This shouldn't happen because we should never be the leader. - log.debug("RaftClient.Listener sent handleClaim(epoch=" + epoch + ")"); - } - - @Override - public void handleRenounce(long epoch) { - // This shouldn't happen because we should never be the leader. - log.debug("MetaLogListener sent handleRenounce(epoch=" + epoch + ")"); - } - @Override public void beginShutdown() { log.debug("MetaLogListener sent beginShutdown"); diff --git a/shell/src/main/java/org/apache/kafka/shell/SnapshotFileReader.java b/shell/src/main/java/org/apache/kafka/shell/SnapshotFileReader.java index 907b4db467ae1..e7d6d1d3e1fac 100644 --- a/shell/src/main/java/org/apache/kafka/shell/SnapshotFileReader.java +++ b/shell/src/main/java/org/apache/kafka/shell/SnapshotFileReader.java @@ -18,7 +18,6 @@ package org.apache.kafka.shell; import org.apache.kafka.common.message.LeaderChangeMessage; -import org.apache.kafka.common.protocol.ApiMessage; import org.apache.kafka.common.protocol.ByteBufferAccessor; import org.apache.kafka.common.record.ControlRecordType; import org.apache.kafka.common.record.FileLogInputStream.FileChannelRecordBatch; @@ -27,11 +26,12 @@ import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Time; import org.apache.kafka.metadata.ApiMessageAndVersion; -import org.apache.kafka.metalog.MetaLogLeader; -import org.apache.kafka.metalog.MetaLogListener; -import org.apache.kafka.raft.metadata.MetadataRecordSerde; +import org.apache.kafka.metadata.MetadataRecordSerde; import org.apache.kafka.queue.EventQueue; import org.apache.kafka.queue.KafkaEventQueue; +import org.apache.kafka.raft.BatchReader; +import org.apache.kafka.raft.LeaderAndEpoch; +import org.apache.kafka.raft.RaftClient; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -39,6 +39,7 @@ import java.util.ArrayList; import java.util.Iterator; import java.util.List; +import java.util.OptionalInt; import java.util.concurrent.CompletableFuture; @@ -49,14 +50,14 @@ public final class SnapshotFileReader implements AutoCloseable { private static final Logger log = LoggerFactory.getLogger(SnapshotFileReader.class); private final String snapshotPath; - private final MetaLogListener listener; + private final RaftClient.Listener listener; private final KafkaEventQueue queue; private final CompletableFuture caughtUpFuture; private FileRecords fileRecords; private Iterator batchIterator; private final MetadataRecordSerde serde = new MetadataRecordSerde(); - public SnapshotFileReader(String snapshotPath, MetaLogListener listener) { + public SnapshotFileReader(String snapshotPath, RaftClient.Listener listener) { this.snapshotPath = snapshotPath; this.listener = listener; this.queue = new KafkaEventQueue(Time.SYSTEM, @@ -101,7 +102,7 @@ private void handleNextBatch() { private void scheduleHandleNextBatch() { queue.append(new EventQueue.Event() { @Override - public void run() throws Exception { + public void run() { handleNextBatch(); } @@ -123,8 +124,10 @@ private void handleControlBatch(FileChannelRecordBatch batch) { case LEADER_CHANGE: LeaderChangeMessage message = new LeaderChangeMessage(); message.read(new ByteBufferAccessor(record.value()), (short) 0); - listener.handleNewLeader(new MetaLogLeader(message.leaderId(), - batch.partitionLeaderEpoch())); + listener.handleLeaderChange(new LeaderAndEpoch( + OptionalInt.of(message.leaderId()), + batch.partitionLeaderEpoch() + )); break; default: log.error("Ignoring control record with type {} at offset {}", @@ -137,18 +140,21 @@ private void handleControlBatch(FileChannelRecordBatch batch) { } private void handleMetadataBatch(FileChannelRecordBatch batch) { - List messages = new ArrayList<>(); - for (Iterator iter = batch.iterator(); iter.hasNext(); ) { - Record record = iter.next(); + List messages = new ArrayList<>(); + for (Record record : batch) { ByteBufferAccessor accessor = new ByteBufferAccessor(record.value()); try { ApiMessageAndVersion messageAndVersion = serde.read(accessor, record.valueSize()); - messages.add(messageAndVersion.message()); + messages.add(messageAndVersion); } catch (Throwable e) { log.error("unable to read metadata record at offset {}", record.offset(), e); } } - listener.handleCommits(batch.lastOffset(), messages); + listener.handleCommit(BatchReader.singleton(new BatchReader.Batch<>( + batch.baseOffset(), + batch.partitionLeaderEpoch(), + messages + ))); } public void beginShutdown(String reason) { From 2340d7fc18d10cfd807fb5ebec4ad413e58b6199 Mon Sep 17 00:00:00 2001 From: Jason Gustafson Date: Wed, 7 Apr 2021 12:01:38 -0700 Subject: [PATCH 2/2] Move reader iteration to event handler --- .../metadata/BrokerMetadataListener.scala | 53 +++++------ .../metadata/BrokerMetadataListenerTest.scala | 38 +++++--- .../kafka/controller/QuorumController.java | 88 +++++++++---------- 3 files changed, 96 insertions(+), 83 deletions(-) diff --git a/core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala b/core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala index 1d0f9930343d6..053f28d904416 100644 --- a/core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala +++ b/core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala @@ -16,7 +16,6 @@ */ package kafka.server.metadata -import java.util import java.util.concurrent.TimeUnit import kafka.coordinator.group.GroupCoordinator @@ -78,28 +77,32 @@ class BrokerMetadataListener( * Handle new metadata records. */ override def handleCommit(reader: BatchReader[ApiMessageAndVersion]): Unit = { - try { - while (reader.hasNext) { - val batch = reader.next() - eventQueue.append(new HandleCommitsEvent(batch.lastOffset, batch.records )) - } - } finally { - reader.close() - } - + eventQueue.append(new HandleCommitsEvent(reader)) } - // Visible for testing. It's useful to execute events synchronously - private[metadata] def execCommits(lastOffset: Long, records: util.List[ApiMessageAndVersion]): Unit = { - new HandleCommitsEvent(lastOffset, records).run() + // Visible for testing. It's useful to execute events synchronously in order + // to make tests deterministic + private[metadata] def execCommits(batch: BatchReader.Batch[ApiMessageAndVersion]): Unit = { + new HandleCommitsEvent(BatchReader.singleton(batch)).run() } - class HandleCommitsEvent(lastOffset: Long, - records: util.List[ApiMessageAndVersion]) - extends EventQueue.FailureLoggingEvent(log) { + class HandleCommitsEvent( + reader: BatchReader[ApiMessageAndVersion] + ) extends EventQueue.FailureLoggingEvent(log) { override def run(): Unit = { + try { + apply(reader.next()) + } finally { + reader.close() + } + } + + private def apply(batch: BatchReader.Batch[ApiMessageAndVersion]): Unit = { + val records = batch.records + val lastOffset = batch.lastOffset + if (isDebugEnabled) { - debug(s"Metadata batch ${lastOffset}: handling ${records.size()} record(s).") + debug(s"Metadata batch $lastOffset: handling ${records.size()} record(s).") } val imageBuilder = MetadataImageBuilder(brokerId, log, metadataCache.currentImage()) @@ -114,35 +117,35 @@ class BrokerMetadataListener( } handleMessage(imageBuilder, record.message, lastOffset) } catch { - case e: Exception => error(s"Unable to handle record ${index} in batch " + - s"ending at offset ${lastOffset}", e) + case e: Exception => error(s"Unable to handle record $index in batch " + + s"ending at offset $lastOffset", e) } index = index + 1 } if (imageBuilder.hasChanges) { val newImage = imageBuilder.build() if (isTraceEnabled) { - trace(s"Metadata batch ${lastOffset}: creating new metadata image ${newImage}") + trace(s"Metadata batch $lastOffset: creating new metadata image ${newImage}") } else if (isDebugEnabled) { - debug(s"Metadata batch ${lastOffset}: creating new metadata image") + debug(s"Metadata batch $lastOffset: creating new metadata image") } metadataCache.image(newImage) } else if (isDebugEnabled) { - debug(s"Metadata batch ${lastOffset}: no new metadata image required.") + debug(s"Metadata batch $lastOffset: no new metadata image required.") } if (imageBuilder.hasPartitionChanges) { if (isDebugEnabled) { - debug(s"Metadata batch ${lastOffset}: applying partition changes") + debug(s"Metadata batch $lastOffset: applying partition changes") } replicaManager.handleMetadataRecords(imageBuilder, lastOffset, RequestHandlerHelper.onLeadershipChange(groupCoordinator, txnCoordinator, _, _)) } else if (isDebugEnabled) { - debug(s"Metadata batch ${lastOffset}: no partition changes found.") + debug(s"Metadata batch $lastOffset: no partition changes found.") } _highestMetadataOffset = lastOffset val endNs = time.nanoseconds() val deltaUs = TimeUnit.MICROSECONDS.convert(endNs - startNs, TimeUnit.NANOSECONDS) - debug(s"Metadata batch ${lastOffset}: advanced highest metadata offset in ${deltaUs} " + + debug(s"Metadata batch $lastOffset: advanced highest metadata offset in ${deltaUs} " + "microseconds.") batchProcessingTimeHist.update(deltaUs) } diff --git a/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataListenerTest.scala b/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataListenerTest.scala index cc9bf4c5bc935..b3558c51ccd17 100644 --- a/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataListenerTest.scala +++ b/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataListenerTest.scala @@ -28,6 +28,7 @@ import org.apache.kafka.common.metadata.{ConfigRecord, PartitionRecord, RemoveTo import org.apache.kafka.common.utils.MockTime import org.apache.kafka.common.{TopicPartition, Uuid} import org.apache.kafka.metadata.ApiMessageAndVersion +import org.apache.kafka.raft.BatchReader import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.Test import org.mockito.ArgumentMatchers._ @@ -39,6 +40,7 @@ import scala.jdk.CollectionConverters._ class BrokerMetadataListenerTest { + private val leaderEpoch = 5 private val brokerId = 1 private val time = new MockTime() private val configRepository = new CachedConfigRepository @@ -82,11 +84,10 @@ class BrokerMetadataListenerTest { ): Unit = { val deleteRecord = new RemoveTopicRecord() .setTopicId(topicId) - lastMetadataOffset += 1 - listener.execCommits(lastOffset = lastMetadataOffset, List[ApiMessageAndVersion]( - new ApiMessageAndVersion(deleteRecord, 0.toShort), - ).asJava) + applyBatch(List[ApiMessageAndVersion]( + new ApiMessageAndVersion(deleteRecord, 0.toShort), + )) assertFalse(metadataCache.contains(topic)) assertEquals(new Properties, configRepository.topicConfig(topic)) @@ -108,6 +109,18 @@ class BrokerMetadataListenerTest { assertEquals(localPartitions, localRemoved.map(_.toTopicPartition).toSet) } + private def applyBatch( + records: List[ApiMessageAndVersion] + ): Unit = { + val baseOffset = lastMetadataOffset + 1 + lastMetadataOffset += records.size + listener.execCommits(new BatchReader.Batch( + baseOffset, + leaderEpoch, + records.asJava + )) + } + private def createAndAssert( topicId: Uuid, topic: String, @@ -115,10 +128,10 @@ class BrokerMetadataListenerTest { numPartitions: Int, numBrokers: Int ): Set[TopicPartition] = { - val records = new java.util.ArrayList[ApiMessageAndVersion] - records.add(new ApiMessageAndVersion(new TopicRecord() + val records = mutable.ListBuffer.empty[ApiMessageAndVersion] + records += new ApiMessageAndVersion(new TopicRecord() .setName(topic) - .setTopicId(topicId), 0)) + .setTopicId(topicId), 0) val localTopicPartitions = mutable.Set.empty[TopicPartition] (0 until numPartitions).map { partitionId => @@ -133,26 +146,25 @@ class BrokerMetadataListenerTest { localTopicPartitions.add(new TopicPartition(topic, partitionId)) } - records.add(new ApiMessageAndVersion(new PartitionRecord() + records += new ApiMessageAndVersion(new PartitionRecord() .setTopicId(topicId) .setPartitionId(partitionId) .setLeader(preferredLeaderId) .setLeaderEpoch(0) .setPartitionEpoch(0) .setReplicas(replicas) - .setIsr(replicas), 0)) + .setIsr(replicas), 0) } topicConfig.forKeyValue { (key, value) => - records.add(new ApiMessageAndVersion(new ConfigRecord() + records += new ApiMessageAndVersion(new ConfigRecord() .setResourceName(topic) .setResourceType(ConfigResource.Type.TOPIC.id()) .setName(key) - .setValue(value), 0)) + .setValue(value), 0) } - lastMetadataOffset += records.size() - listener.execCommits(lastOffset = lastMetadataOffset, records) + applyBatch(records.toList) assertTrue(metadataCache.contains(topic)) assertEquals(Some(numPartitions), metadataCache.numPartitions(topic)) assertEquals(topicConfig, configRepository.topicConfig(topic).asScala) diff --git a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java index 80a4716064a92..4d4b9f69fdf9e 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java +++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java @@ -616,58 +616,56 @@ private CompletableFuture appendWriteEvent(String name, class QuorumMetaLogListener implements RaftClient.Listener { - private void handleCommittedBatch(BatchReader.Batch batch) { - long offset = batch.lastOffset(); - List messages = batch.records(); - - appendControlEvent("handleCommits[" + offset + "]", () -> { - if (curClaimEpoch == -1) { - // If the controller is a standby, replay the records that were - // created by the active controller. - if (log.isDebugEnabled()) { - if (log.isTraceEnabled()) { - log.trace("Replaying commits from the active node up to " + - "offset {}: {}.", offset, messages.stream() - .map(ApiMessageAndVersion::toString) - .collect(Collectors.joining(", "))); + @Override + public void handleCommit(BatchReader reader) { + appendControlEvent("handleCommits[baseOffset=" + reader.baseOffset() + "]", () -> { + try { + boolean isActiveController = curClaimEpoch != -1; + while (reader.hasNext()) { + BatchReader.Batch batch = reader.next(); + long offset = batch.lastOffset(); + List messages = batch.records(); + + if (isActiveController) { + // If the controller is active, the records were already replayed, + // so we don't need to do it here. + log.debug("Completing purgatory items up to offset {}.", offset); + + // Complete any events in the purgatory that were waiting for this offset. + purgatory.completeUpTo(offset); + + // Delete all the in-memory snapshots that we no longer need. + // If we are writing a new snapshot, then we need to keep that around; + // otherwise, we should delete up to the current committed offset. + snapshotRegistry.deleteSnapshotsUpTo( + Math.min(offset, snapshotGeneratorManager.snapshotEpoch())); + } else { - log.debug("Replaying commits from the active node up to " + - "offset {}.", offset); + // If the controller is a standby, replay the records that were + // created by the active controller. + if (log.isDebugEnabled()) { + if (log.isTraceEnabled()) { + log.trace("Replaying commits from the active node up to " + + "offset {}: {}.", offset, messages.stream() + .map(ApiMessageAndVersion::toString) + .collect(Collectors.joining(", "))); + } else { + log.debug("Replaying commits from the active node up to " + + "offset {}.", offset); + } + } + for (ApiMessageAndVersion messageAndVersion : messages) { + replay(messageAndVersion.message(), -1, offset); + } } + lastCommittedOffset = offset; } - for (ApiMessageAndVersion messageAndVersion : messages) { - replay(messageAndVersion.message(), -1, offset); - } - } else { - // If the controller is active, the records were already replayed, - // so we don't need to do it here. - log.debug("Completing purgatory items up to offset {}.", offset); - - // Complete any events in the purgatory that were waiting for this offset. - purgatory.completeUpTo(offset); - - // Delete all the in-memory snapshots that we no longer need. - // If we are writing a new snapshot, then we need to keep that around; - // otherwise, we should delete up to the current committed offset. - snapshotRegistry.deleteSnapshotsUpTo( - Math.min(offset, snapshotGeneratorManager.snapshotEpoch())); + } finally { + reader.close(); } - lastCommittedOffset = offset; }); } - @Override - public void handleCommit(BatchReader reader) { - try { - while (reader.hasNext()) { - BatchReader.Batch batch = reader.next(); - handleCommittedBatch(batch); - } - } finally { - reader.close(); - } - } - @Override public void handleLeaderChange(LeaderAndEpoch newLeader) { if (newLeader.isLeader(nodeId)) {